activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961166 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/scala/org/apache/...
Date Wed, 07 Jul 2010 04:13:10 GMT
Author: chirino
Date: Wed Jul  7 04:13:10 2010
New Revision: 961166

URL: http://svn.apache.org/viewvc?rev=961166&view=rev
Log:
- Eliminated the use of tombstone entries in the queue list
- Renamed QueueEntryGroup to QueueEntryRange and FlushedGroup to FlushedRange 
- Simplified serveral methods in the QueueEntry interface
- Always start queues
- Updated HawtDB store to use the new JournalListener interfaces

Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:13:10 2010
@@ -26,7 +26,8 @@ import org.apache.activemq.broker.store.
 import protocol.ProtocolFactory
 import java.util.concurrent.TimeUnit
 import java.util.{HashSet, Collections, ArrayList, LinkedList}
-import org.apache.activemq.apollo.store.{QueueEntryGroup, QueueEntryRecord, MessageRecord}
+import org.apache.activemq.apollo.store.{QueueEntryRange, QueueEntryRecord, MessageRecord}
+import collection.mutable.ListBuffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -87,11 +88,9 @@ class Queue(val host: VirtualHost, val d
   var message_seq_counter = 1L
 
   val entries = new LinkedNodeList[QueueEntry]()
-  val head_entry = new QueueEntry(this, 0L);
-  var tail_entry = new QueueEntry(this, next_message_seq)
-
+  val head_entry = new QueueEntry(this, 0L).head
+  var tail_entry = new QueueEntry(this, next_message_seq).tail
   entries.addFirst(head_entry)
-  head_entry.tombstone
 
   var loading_size = 0
   var flushing_size = 0
@@ -180,31 +179,17 @@ class Queue(val host: VirtualHost, val d
     }
 
     if( tune_persistent ) {
-      host.store.listQueueEntryGroups(queueKey, tune_entry_group_size) { groups=>
+      host.store.listQueueEntryRanges(queueKey, tune_entry_group_size) { ranges=>
         dispatchQueue {
-          if( !groups.isEmpty ) {
-
-            // adjust the head tombstone.
-            head_entry.as_tombstone.count = groups.head.firstSeq
+          if( !ranges.isEmpty ) {
 
-            var last:QueueEntryGroup = null
-            groups.foreach { group =>
-
-              // if groups were not adjacent.. create tombstone entry.
-              if( last!=null && (last.lastSeq+1 != group.firstSeq) ) {
-                val entry = new QueueEntry(Queue.this, last.lastSeq+1)
-                entry.tombstone.as_tombstone.count = group.firstSeq - (last.lastSeq+1)
-                entries.addLast(entry)
-              }
-
-              val entry = new QueueEntry(Queue.this, group.firstSeq).init(group)
+            ranges.foreach { range =>
+              val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range)
               entries.addLast(entry)
 
-              message_seq_counter = group.lastSeq
-              enqueue_item_counter += group.count
-              enqueue_size_counter += group.size
-
-              last = group
+              message_seq_counter = range.lastQueueSeq + 1
+              enqueue_item_counter += range.count
+              enqueue_size_counter += range.size
             }
 
             debug("restored: "+enqueue_item_counter)
@@ -297,7 +282,7 @@ class Queue(val host: VirtualHost, val d
       if (cur.is_flushed || cur.is_loaded) {
         total_items += 1
       } else if (cur.is_flushed_group ) {
-        total_items += cur.as_flushed_group.items
+        total_items += cur.as_flushed_group.count
       }
       
       cur = cur.getNext
@@ -541,21 +526,17 @@ class Queue(val host: VirtualHost, val d
     
     debug("swapping...")
 
-    var entry = entries.getHead
+    var entry = head_entry.getNext
     while( entry!=null ) {
-      if( entry.as_tombstone == null ) {
-
-        val loaded = entry.as_loaded
+      val loaded = entry.as_loaded
 
-        // Keep around prefetched and loaded entries.
-        if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
-          entry.load
-        } else {
-          // flush the the others out of memory.
-          entry.flush
-        }
+      // Keep around prefetched and loaded entries.
+      if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
+        entry.load
+      } else {
+        // flush the the others out of memory.
+        entry.flush
       }
-
       entry = entry.getNext
     }
   }
@@ -594,7 +575,7 @@ class Queue(val host: VirtualHost, val d
 
   def collocate(value:DispatchQueue):Unit = {
     if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
-      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+      debug(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
       this.dispatchQueue.setTargetQueue(value.getTargetQueue)
     }
   }
@@ -610,7 +591,8 @@ class QueueEntry(val queue:Queue, val se
   import QueueEntry._
 
   // Subscriptions waiting to dispatch this entry.
-  var subscriptions:List[Subscription] = Nil
+  var parked:List[Subscription] = Nil
+
   // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
   // ready for them to get dispatched.
   var prefetched = 0
@@ -620,44 +602,56 @@ class QueueEntry(val queue:Queue, val se
 
   def is_prefetched = prefetched>0
 
+  def head():QueueEntry = {
+    state = new Head
+    this
+  }
+
+  def tail():QueueEntry = {
+    state = new Tail
+    this
+  }
+
   def init(delivery:Delivery):QueueEntry = {
-    this.state = new Loaded(delivery, false)
+    state = new Loaded(delivery, false)
     queue.size += size
     this
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    this.state = new Flushed(qer.messageKey, qer.size)
+    state = new Flushed(qer.messageKey, qer.size)
     this
   }
 
-  def init(group:QueueEntryGroup):QueueEntry = {
-    val count = (((group.lastSeq+1)-group.firstSeq)).toInt
-    val tombstones = count-group.count
-    this.state = new FlushedGroup(count, group.size, tombstones)
+  def init(range:QueueEntryRange):QueueEntry = {
+    state = new FlushedRange(range.lastQueueSeq, range.count, range.size)
     this
   }
 
-  def hasSubs = !(subscriptions == Nil )
+  def hasSubs = !parked.isEmpty
 
   /**
    * Dispatches this entry to the consumers and continues dispatching subsequent
-   * entries if it has subscriptions which accept the dispatch.
+   * entries as long as the dispatch results in advancing in their dispatch position.
    */
   def run() = {
-    var next = dispatch()
-    while( next!=null ) {
-      next = next.dispatch
+    var next = this;
+    while( next!=null && next.dispatch) {
+      next = next.getNext
     }
   }
 
-  def addSubscriptions(l:List[Subscription]) = {
-    subscriptions :::= l
+  def ::=(sub:Subscription) = {
+    parked ::= sub
+  }
+
+  def :::=(l:List[Subscription]) = {
+    parked :::= l
   }
 
 
-  def removeSubscriptions(s:Subscription) = {
-    subscriptions = subscriptions.filterNot(_ == s)
+  def -=(s:Subscription) = {
+    parked = parked.filterNot(_ == s)
   }
 
   def nextOrTail():QueueEntry = {
@@ -683,7 +677,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   override def toString = {
-    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+subscriptions+"}"
+    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}"
   }
 
   /////////////////////////////////////////////////////
@@ -693,11 +687,12 @@ class QueueEntry(val queue:Queue, val se
   /////////////////////////////////////////////////////
 
   // What state is it in?
-  def as_tombstone = this.state.as_tombstone
-  def as_flushed = this.state.as_flushed
-  def as_flushed_group = this.state.as_flushed_group
-  def as_loaded = this.state.as_loaded
-  def as_tail = this.state.as_tail
+  def as_head = state.as_head
+  def as_tail = state.as_tail
+
+  def as_flushed = state.as_flushed
+  def as_flushed_group = state.as_flushed_group
+  def as_loaded = state.as_loaded
 
   def is_tail = this == queue.tail_entry
   def is_head = this == queue.head_entry
@@ -705,18 +700,17 @@ class QueueEntry(val queue:Queue, val se
   def is_loaded = as_loaded!=null
   def is_flushed = as_flushed!=null
   def is_flushed_group = as_flushed_group!=null
-  def is_tombstone = as_tombstone!=null
 
   // These should not change the current state.
-  def size = this.state.size
-  def messageKey = this.state.messageKey
+  def size = state.size
+  def messageKey = state.messageKey
   def is_flushed_or_flushing = state.is_flushed_or_flushing
-  def dispatch():QueueEntry = state.dispatch
+  def dispatch() = state.dispatch
 
   // These methods may cause a change in the current state.
-  def flush:QueueEntry = this.state.flush
-  def load:QueueEntry = this.state.load
-  def tombstone:QueueEntry = this.state.tombstone
+  def flush = state.flush
+  def load = state.load
+  def remove = state.remove
 
   trait EntryState {
 
@@ -725,18 +719,46 @@ class QueueEntry(val queue:Queue, val se
     def as_tail:Tail = null
     def as_loaded:Loaded = null
     def as_flushed:Flushed = null
-    def as_flushed_group:FlushedGroup = null
-    def as_tombstone:Tombstone = null
+    def as_flushed_group:FlushedRange = null
+    def as_head:Head = null
 
-    def size:Int
-    def dispatch():QueueEntry
-    def messageKey:Long
+    /**
+     * Gets the size of this entry in bytes.  The head and tail entries allways return 0.
+     */
+    def size = 0
+
+    /**
+     * Gets the message key for the entry.
+     * @returns -1 if it is not known.
+     */
+    def messageKey = -1L
+
+    /**
+     * Attempts to dispatch the current entry to the subscriptions position at the entry.
+     * @returns true if at least one subscription advanced to the next entry as a result of dispatching.
+     */
+    def dispatch() = false
+
+    /**
+     * @returns true if the entry is either flushed or flushing.
+     */
     def is_flushed_or_flushing = false
 
-    def load = entry
+    /**
+     * Triggers the entry to get loaded if it's not already loaded.
+     */
+    def load = {}
 
-    def flush = entry
+    /**
+     * Triggers the entry to get flushed if it's not already flushed.
+     */
+    def flush = {}
 
+    /**
+     * Takes the current entry out of the prefetch of all subscriptions
+     * which have prefetched the entry.  Returns the list of subscriptions which
+     * had prefetched it.
+     */
     def prefetch_remove = {
       var rc = List[Subscription]()
       if( queue.tune_flush_to_store ) {
@@ -744,7 +766,7 @@ class QueueEntry(val queue:Queue, val se
         var cur = entry
         while( cur!=null && is_prefetched ) {
           if( cur.hasSubs ) {
-            (cur.subscriptions).foreach { sub =>
+            (cur.parked).foreach { sub =>
               if( sub.is_prefetched(entry) ) {
                 sub.remove_from_prefetch(entry)
                 rc ::= sub
@@ -758,45 +780,62 @@ class QueueEntry(val queue:Queue, val se
       rc
     }
 
-    def tombstone = {
-
+    /**
+     * Removes the entry from the queue's linked list of entries.  This gets called
+     * as a result of an aquired ack.
+     */
+    def remove = {
+      // take us out of subscription prefetches..
       var refill_preftch_list = prefetch_remove
+      // advance subscriptions that were on this entry..
+      parked.foreach(_.advance(next))
+      nextOrTail :::= parked
+      parked = Nil
+      // take the entry of the entries list..
+      unlink
+      // refill the subscription prefetches..
+      refill_preftch_list.foreach( _.refill_prefetch )
+    }
 
-      // if rv and lv are both adjacent tombstones, then this merges the rv
-      // tombstone into lv, unlinks rv, and returns lv, otherwise it returns
-      // rv.
-      def merge(lv:QueueEntry, rv:QueueEntry):QueueEntry = {
-        if( lv==null || rv==null) {
-          return rv
-        }
-
-        val lts = lv.state.as_tombstone
-        val rts = rv.state.as_tombstone
+    /**
+     * Advances the specified subscriptions to the next entry in
+     * the linked list
+     */
+    def advance(advancing: Seq[Subscription]): Unit = {
+      val nextPos = nextOrTail
+      nextPos :::= advancing.toList
+      advancing.foreach(_.advance(nextPos))
+    }
 
-        if( lts==null ||  rts==null ) {
-          return rv
-        }
+  }
 
-        // Sanity check: the the entries are adjacent.. this should
-        // always be the case.
-        assert( lv.seq + lts.count  == rv.seq , "entries are not adjacent.")
+  /**
+   *  Used for the head entry.  This is the starting point for all new subscriptions.
+   */
+  class Head extends EntryState {
 
-        lts.count += rts.count
-        rv.dispatch // moves the subs to the next entry.
-        rv.unlink
-        return lv
-      }
+    override  def toString = "head"
+    override def as_head = this
 
-      state = new Tombstone()
-      merge(entry, getNext)
-      val rc = merge(getPrevious, entry)
+    /**
+     * New subs get parked here at the Head.  There is nothing to actually dispatch
+     * in this entry.. just advance the parked subs onto the next entry.
+     */
+    override def dispatch() = {
+      if( parked != Nil ) {
 
-      refill_preftch_list.foreach( _.refill_prefetch )
+        advance(parked)
+        parked = Nil
+        true
 
-      rc.run // dispatch to move the subs to the next entry..
-      rc
+      } else {
+        false
+      }
     }
 
+    override def remove = throw new AssertionError("Head entry cannot be removed")
+    override def load = throw new AssertionError("Head entry cannot be loaded")
+    override def flush = throw new AssertionError("Head entry cannot be flushed")
   }
 
   /**
@@ -806,30 +845,30 @@ class QueueEntry(val queue:Queue, val se
    */
   class Tail extends EntryState {
 
+    override  def toString = "tail"
     override def as_tail:Tail = this
-    def size = 0
-    def messageKey = -1
-    def dispatch():QueueEntry = null
-
-    override  def toString = { "tail" }
 
+    override def remove = throw new AssertionError("Tail entry cannot be removed")
     override def load = throw new AssertionError("Tail entry cannot be loaded")
     override def flush = throw new AssertionError("Tail entry cannot be flushed")
 
   }
 
   /**
-   * This state is used while a message is loaded in memory.  A message must be in this state
-   * before it can be dispatched to a consumer.  It can transition to Flushed or Tombstone.
+   * The entry is in this state while a message is loaded in memory.  A message must be in this state
+   * before it can be dispatched to a subscription.
    */
-  class Loaded(val delivery: Delivery, var store_completed:Boolean) extends EntryState {
+  class Loaded(val delivery: Delivery, var stored:Boolean) extends EntryState {
+
+    assert( delivery!=null, "delivery cannot be null")
 
     var acquired = false
-    def messageKey = delivery.storeKey
-    def size = delivery.size
     var flushing = false
 
-    override def toString = { "loaded:{ flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+    override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+
+    override def size = delivery.size
+    override def messageKey = delivery.storeKey
 
     override def is_flushed_or_flushing = {
       flushing
@@ -846,7 +885,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def flush() = {
       if( queue.tune_flush_to_store ) {
-        if( store_completed ) {
+        if( stored ) {
           flushing=true
           flushed
         } else {
@@ -882,12 +921,10 @@ class QueueEntry(val queue:Queue, val se
           }
         }
       }
-
-      entry
     }
 
     def flushed() = {
-      store_completed = true
+      stored = true
       delivery.uow = null
       if( flushing ) {
         queue.flushing_size-=size
@@ -901,66 +938,56 @@ class QueueEntry(val queue:Queue, val se
         flushing = false
         queue.flushing_size-=size
       }
-      entry
     }
 
-    override def tombstone = {
+    override def remove = {
       if( flushing ) {
         flushing = false
         queue.flushing_size-=size
       }
       queue.size -= size
-      super.tombstone
+      super.remove
     }
 
-    def dispatch():QueueEntry = {
+    override def dispatch():Boolean = {
 
       // Nothing to dispatch if we don't have subs..
-      if( subscriptions==Nil ) {
-        // This usualy happens when a new consumer connects, it's not marked as slow but
-        // is not at the tail.  And this entry is an entry just sent by a producer.
-        return nextOrTail
-      }
-
-      // can't dispatch until the delivery is set.
-      if( delivery==null ) {
-        // TODO: check to see if this ever happens
-        return null
+      if( parked.isEmpty ) {
+        return false
       }
 
-      var heldBack:List[Subscription] = Nil
-      var advancing:List[Subscription] = Nil
-
+      var heldBack = ListBuffer[Subscription]()
+      var advancing = ListBuffer[Subscription]()
 
       var acquiringSub: Subscription = null
-      subscriptions.foreach{ sub=>
+      parked.foreach{ sub=>
 
         if( sub.browser ) {
           if (!sub.matches(delivery)) {
             // advance: not interested.
-            advancing ::= sub
+            advancing += sub
           } else {
             if (sub.offer(delivery)) {
               // advance: accepted...
-              advancing ::= sub
+              advancing == sub
             } else {
               // hold back: flow controlled
-              heldBack ::= sub
+              heldBack += sub
             }
           }
 
         } else {
           if( acquired ) {
             // advance: another sub already acquired this entry..
-            advancing = advancing ::: sub :: Nil
+            advancing += sub
           } else {
             if (!sub.matches(delivery)) {
               // advance: not interested.
-              advancing = advancing ::: sub :: Nil
+              advancing += sub
             } else {
               if( sub.full ) {
                 // hold back: flow controlled
-                heldBack = heldBack ::: sub :: Nil
+                heldBack += sub
               } else {
                 // advance: accepted...
                 acquiringSub = sub
@@ -983,23 +1010,23 @@ class QueueEntry(val queue:Queue, val se
       // the other competing subs get first dibs at the next entry.
       if( acquiringSub != null ) {
 
-        // Advancing may need to be held back because the sub's prefetch is full.
-        if( acquiringSub.prefetchFull ) {
-          advancing = advancing ::: acquiringSub :: Nil
+        // Advancing may need to be held back because the sub's prefetch is full
+        if( acquiringSub.prefetchFull && !acquiringSub.is_prefetched(getNext) ) {
+          heldBack += acquiringSub
         } else {
-          heldBack = heldBack ::: acquiringSub :: Nil
+          advancing += acquiringSub
         }
       }
 
-      // The held back subs stay on this entry..
-      subscriptions = heldBack
+      if ( advancing.isEmpty ) {
+        return false
+      } else {
 
-      // the advancing subs move on to the next entry...
-      if ( advancing!=Nil ) {
+        // The held back subs stay on this entry..
+        parked = heldBack.toList
 
-        val next = nextOrTail
-        next.addSubscriptions(advancing)
-        advancing.foreach(_.advance(next))
+        // the advancing subs move on to the next entry...
+        advance(advancing)
 
         // flush this entry out if it's not going to be needed soon.
         def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
@@ -1007,60 +1034,98 @@ class QueueEntry(val queue:Queue, val se
           // then flush out to make space...
           flush
         }
-
-        return next
-      } else {
-        return null
+        return true
       }
     }
   }
 
-
   /**
-   * Entries which have been deleted get put into the Tombstone state.  Adjacent entries in the
-   * Tombstone state get merged into a single entry.
+   * Loaded entries are moved into the Flushed state reduce memory usage.  Once a Loaded
+   * entry is persisted, it can move into this state.  This state only holds onto the
+   * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Tombstone extends EntryState {
+  class Flushed(override val messageKey:Long, override val size:Int) extends EntryState {
 
-    /** The number of adjacent entries this Tombstone represents. */
-    var count = 1L
+    var loading = false
 
-    def size = 0
-    def messageKey = -1
+    override def as_flushed = this
 
-    override def as_tombstone = this
+    override def is_flushed_or_flushing = true
 
-    /**
-     * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry.
-     */
-    def dispatch():QueueEntry = {
-      assert(prefetched==0, "tombstones should never be prefetched.")
+    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
 
-      val next = nextOrTail
-      next.addSubscriptions(subscriptions)
-      subscriptions.foreach(_.advance(next))
-      subscriptions = Nil
-      next
-    }
+    override def load() = {
+      if( !loading ) {
+//        trace("Start entry load of message seq: %s", seq)
+        // start loading it back...
+        loading = true
+        queue.loading_size += size
+        queue.host.store.loadMessage(messageKey) { delivery =>
+          // pass off to a source so it can aggregate multiple
+          // loads to reduce cross thread synchronization
+          if( delivery.isDefined ) {
+            queue.dispatchQueue {
+              queue.store_load_source.merge((this, delivery.get))
+            }
+          } else {
 
-    override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned")
-    override  def toString = { "tombstone:{ count: "+count+"}" }
+            info("Detected store dropped message at seq: %d", seq)
 
-  }
+            // Looks like someone else removed the message from the store.. lets just
+            // tombstone this entry now.
+            queue.dispatchQueue {
+              remove
+            }
+          }
+        }
+      }
+    }
 
+    def loaded(messageRecord:MessageRecord) = {
+      if( loading ) {
+//        debug("Loaded message seq: ", seq )
+        loading = false
+        queue.loading_size -= size
 
-  class FlushedGroup(
-   /** The number of adjacent entries this FlushedGroup represents. */
-   var count:Long,
-   /** size in bytes of the group */
-   var size:Int,
-   /** The number of tombstone entries in the groups */
-   var tombstones:Int ) extends EntryState {
+        val delivery = new Delivery()
+        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+        delivery.size = messageRecord.size
+        delivery.storeKey = messageRecord.key
 
+        queue.size += size
+        state = new Loaded(delivery, true)
+      } else {
+//        debug("Ignoring store load of: ", messageKey)
+      }
+    }
 
-    def items = count - tombstones
 
-    def messageKey = -1
+    override def remove = {
+      if( loading ) {
+        loading = false
+        queue.loading_size -= size
+      }
+      super.remove
+    }
+  }
+
+  /**
+   * A FlushedRange stat is assigned entry is used to represent a rage of flushed entries.
+   *
+   * Even when entries that are Flushed can us a significant amount of memory if the queue is holding
+   * thousands of them.  Multiple entries in the Flushed state can be combined into a single entry in
+   * the FlushedRange state thereby conserving even more memory.  A FlushedRange entry only tracks
+   * the first, and last sequnce ids of the range.  When the entry needs to be loaded from the range
+   * it replaces the FlushedRange entry with all the Flushed entries by querying the store of all the
+   * message keys for the entries in the range.
+   */
+  class FlushedRange(
+    /** the last seq id in the range */
+    var last:Long,
+    /** the number of items in the range */
+    var count:Int,
+    /** size in bytes of the range */
+    override val size:Int) extends EntryState {
 
     var loading = false
 
@@ -1068,51 +1133,34 @@ class QueueEntry(val queue:Queue, val se
 
     override def is_flushed_or_flushing = true
 
-    override def toString = { "flushed_group:{ loading: "+loading+", items: "+items+", size: "+size+"}" }
-
-    // Flushed entries can't be dispatched until
-    // they get loaded.
-    def dispatch():QueueEntry = {
-      null
-    }
+    override def toString = { "flushed_group:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
 
     override def load() = {
       if( !loading ) {
         loading = true
-        queue.host.store.listQueueEntries(queue.queueKey, seq, seq+count-1) { records =>
+        queue.host.store.listQueueEntries(queue.queueKey, seq, last) { records =>
           queue.dispatchQueue {
 
             var item_count=0
             var size_count=0
-            var last:QueueEntryRecord = null
 
             val tmpList = new LinkedNodeList[QueueEntry]()
             records.foreach { record =>
-
-              // if entries were not adjacent.. create tombstone entry.
-              if( last!=null && (last.queueSeq+1 != record.queueSeq) ) {
-                val entry = new QueueEntry(queue, last.queueSeq+1)
-                entry.tombstone.as_tombstone.count = record.queueSeq - (last.queueSeq+1)
-                tmpList.addLast(entry)
-              }
-
               val entry = new QueueEntry(queue, record.queueSeq).init(record)
               tmpList.addLast(entry)
-
               item_count += 1
               size_count += record.size
-              last = record
             }
 
             // we may need to adjust the enqueue count if entries
             // were dropped at the store level
-            var item_delta = (items - item_count)
+            var item_delta = (count - item_count)
             val size_delta: Int = size - size_count
 
             if ( item_delta!=0 || size_delta!=0 ) {
               assert(item_delta <= 0)
               assert(size_delta <= 0)
-              info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, seq+count-1, size_delta)
+              info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, last, size_delta)
               queue.enqueue_item_counter += item_delta
               queue.enqueue_size_counter += size_delta
             }
@@ -1123,103 +1171,21 @@ class QueueEntry(val queue:Queue, val se
             val next = getNext
 
             // move the subs to the first entry that we just loaded.
-            subscriptions.foreach(_.advance(next))
-            next.addSubscriptions(subscriptions)
+            parked.foreach(_.advance(next))
+            next :::= parked
 
             unlink
             refill_preftch_list.foreach( _.refill_prefetch )
           }
         }
       }
-      entry
     }
 
-    override def tombstone = {
-      throw new AssertionError("Flush group cannbot be tombstone.");
+    override def remove = {
+      throw new AssertionError("Flushed range cannbot be removed.");
     }
   }
 
-  /**
-   * Entries in the Flushed state are not holding the referenced messages in memory anymore.
-   * This state can transition to Loaded or Tombstone.
-   *
-   */
-  class Flushed(val messageKey:Long, val size:Int) extends EntryState {
-
-    var loading = false
-
-    override def as_flushed = this
-
-    override def is_flushed_or_flushing = true
-
-    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
-
-    // Flushed entries can't be dispatched until
-    // they get loaded.
-    def dispatch():QueueEntry = {
-      // This dispatch can happen when a subscription is holding onto lots of acquired entries
-      // it can't prefetch anymore as it's waiting for ack on those messages to avoid
-      // blowing it's memory limits.
-      null
-    }
-
-    override def load() = {
-      if( !loading ) {
-//        trace("Start entry load of message seq: %s", seq)
-        // start loading it back...
-        loading = true
-        queue.loading_size += size
-        queue.host.store.loadMessage(messageKey) { delivery =>
-          // pass off to a source so it can aggregate multiple
-          // loads to reduce cross thread synchronization
-          if( delivery.isDefined ) {
-//            debug("Store found message seq: %d", seq)
-            queue.store_load_source.merge((this, delivery.get))
-          } else {
-
-            info("Detected store dropped message at seq: %d", seq)
-
-            // Looks like someone else removed the message from the store.. lets just
-            // tombstone this entry now.
-            queue.dispatchQueue {
-              tombstone
-            }
-          }
-        }
-      }
-      entry
-    }
-
-    def loaded(messageRecord:MessageRecord) = {
-      if( loading ) {
-//        debug("Loaded message seq: ", seq )
-        loading = false
-        queue.loading_size -= size
-
-        val delivery = new Delivery()
-        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
-        delivery.size = messageRecord.size
-        delivery.storeKey = messageRecord.key
-
-        queue.size += size
-        state = new Loaded(delivery, true)
-      } else {
-//        debug("Ignoring store load of: ", messageKey)
-      }
-    }
-
-
-    override def tombstone = {
-      if( loading ) {
-//        debug("Tombstoned, will ignore store load of seq: ", seq)
-        loading = false
-        queue.loading_size -= size
-      }
-      super.tombstone
-    }
-  }
-
-
 
 }
 
@@ -1248,7 +1214,7 @@ class Subscription(queue:Queue) extends 
 
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
-    "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+" }"
+    "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+"}"
   }
 
   def browser = session.consumer.browser
@@ -1257,7 +1223,7 @@ class Subscription(queue:Queue) extends 
     pos = queue.head_entry;
     session = consumer.connect(this)
     session.refiller = pos
-    queue.head_entry.addSubscriptions(this :: Nil)
+    queue.head_entry ::= this
 
     if( queue.serviceState.isStarted ) {
       // kick off the initial dispatch.
@@ -1267,7 +1233,7 @@ class Subscription(queue:Queue) extends 
   }
 
   def close() = {
-    pos.removeSubscriptions(this)
+    pos.-=(this)
 
     invalidate_prefetch
 
@@ -1298,7 +1264,7 @@ class Subscription(queue:Queue) extends 
     assert(value!=null)
 
     // Remove the previous pos from the prefetch counters.
-    if( prefetch_tail!=null && !pos.is_tombstone) {
+    if( prefetch_tail!=null && !pos.is_head) {
       remove_from_prefetch(pos)
     }
     advanced_size += pos.size
@@ -1329,7 +1295,7 @@ class Subscription(queue:Queue) extends 
       // release the prefetch counters...
       var cur = pos
       while (cur.seq <= prefetch_tail.seq) {
-        if (!cur.is_tombstone) {
+        if (!cur.is_head) {
           prefetched_size -= cur.size
           cur.prefetched -= 1
         }
@@ -1344,12 +1310,12 @@ class Subscription(queue:Queue) extends 
    * Is the specified queue entry prefeteched by this subscription?
    */
   def is_prefetched(value:QueueEntry) = {
-    prefetch_tail!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
+    prefetch_tail!=null && value!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
   }
 
 
   def add_to_prefetch(entry:QueueEntry):Unit = {
-    assert( !entry.is_tombstone, "tombstones should not be prefetched..")
+    assert( !entry.is_head, "tombstones should not be prefetched..")
     prefetched_size += entry.size
     entry.prefetched += 1
     entry.load
@@ -1386,7 +1352,7 @@ class Subscription(queue:Queue) extends 
       // attempts to fill the prefetch...
       var next = next_prefetch_pos
       while( !prefetchFull && next!=null ) {
-        if( !next.is_tombstone ) {
+        if( !next.is_head ) {
           add_to_prefetch(next)
         }
         next = next.getNext
@@ -1433,7 +1399,7 @@ class Subscription(queue:Queue) extends 
 
       // we may now be able to prefetch some messages..
       acquired_size -= entry.size
-      entry.tombstone // entry size changes to 0
+      entry.remove // entry size changes to 0
       refill_prefetch
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:13:10 2010
@@ -268,9 +268,9 @@ class VirtualHost(val broker: Broker) ex
           case Some(queueKey) =>
             dispatchQueue {
               val queue = new Queue(this, dest, queueKey)
+              queue.start()
               queues.put(dest.getName, queue)
               cb(queue)
-              queue.start()
             }
           case None => // store could not create
             cb(null)
@@ -278,6 +278,7 @@ class VirtualHost(val broker: Broker) ex
       }
     } else {
       val queue = new Queue(this, dest)
+      queue.start()
       queues.put(dest.getName, queue)
       cb(queue)
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:13:10 2010
@@ -206,11 +206,11 @@ class CassandraClient() {
     }
   }
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryGroup] = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryRange] = {
     withSession {
       session =>
-        var rc = ListBuffer[QueueEntryGroup]()
-        var group:QueueEntryGroup = null
+        var rc = ListBuffer[QueueEntryRange]()
+        var group:QueueEntryRange = null
 
         // TODO: this is going to bring back lots of entries.. not good.
         session.list(schema.entries \ queueKey).foreach { x=>
@@ -218,10 +218,10 @@ class CassandraClient() {
           val record:QueueEntryRecord = x.value
 
           if( group == null ) {
-            group = new QueueEntryGroup
-            group.firstSeq = record.queueSeq
+            group = new QueueEntryRange
+            group.firstQueueSeq = record.queueSeq
           }
-          group.lastSeq = record.queueSeq
+          group.lastQueueSeq = record.queueSeq
           group.count += 1
           group.size += record.size
           if( group.count == limit) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:13:10 2010
@@ -195,7 +195,7 @@ class CassandraStore extends Store with 
   }
 
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
+  def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
     blocking {
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:13:10 2010
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.proto.Mess
 import org.fusesource.hawtbuf.proto.PBMessage
 import org.apache.activemq.util.LockFile
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
-import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
+import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location}
 import org.fusesource.hawtdispatch.TaskTracker
 
 import org.fusesource.hawtbuf.AsciiBuffer._
@@ -152,6 +152,20 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       journal.setMaxFileLength(config.journalLogSize)
       journal.setMaxWriteBatchSize(config.journalBatchSize);
       journal.setChecksum(true);
+      journal.setListener( new JournalListener{
+        def synced(writes: Array[Write]) = {
+          var onCompletes = List[Runnable]()
+          withTx { tx=>
+            val helper = new TxHelper(tx)
+            writes.foreach { write=>
+              val func = write.getAttachment.asInstanceOf[(TxHelper, Location)=>List[Runnable]]
+              onCompletes = onCompletes ::: func(helper, write.getLocation)
+            }
+            helper.storeRootBean
+          }
+          onCompletes.foreach( _.run )
+        }
+      })
 
       if( config.archiveDirectory!=null ) {
         journal.setDirectoryArchive(config.archiveDirectory)
@@ -169,10 +183,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       pageFileFactory.open()
 
       val initialized = withTx { tx =>
-          val helper = new TxHelper(tx)
-          import helper._
-
           if (!tx.allocator().isAllocated(0)) {
+            val helper = new TxHelper(tx)
+            import helper._
+
             val rootPage = tx.alloc()
             assert(rootPage == 0)
 
@@ -181,9 +195,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
             rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
             rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
-            rootBuffer = rootBean.freeze
 
-            tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+            helper.storeRootBean
+
             true
           } else {
             rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
@@ -296,7 +310,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryGroup] = {
+  def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryRange] = {
     withTx { tx =>
         val helper = new TxHelper(tx)
         import JavaConversions._
@@ -307,15 +321,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         if (queueRecord != null) {
           val entryIndex = queueEntryIndex(queueRecord)
 
-          var rc = ListBuffer[QueueEntryGroup]()
-          var group:QueueEntryGroup = null
+          var rc = ListBuffer[QueueEntryRange]()
+          var group:QueueEntryRange = null
 
           entryIndex.iterator.foreach { entry =>
             if( group == null ) {
-              group = new QueueEntryGroup
-              group.firstSeq = entry.getKey.longValue
+              group = new QueueEntryRange
+              group.firstQueueSeq = entry.getKey.longValue
             }
-            group.lastSeq = entry.getKey.longValue
+            group.lastQueueSeq = entry.getKey.longValue
             group.count += 1
             group.size += entry.getValue.getSize
             if( group.count == limit) {
@@ -470,11 +484,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     frozen.writeFramed(baos)
 
     val buffer = baos.toBuffer()
-    append(buffer) {
-      location =>
-        metric_index_update.time {
-          executeStore(batch, update, onComplete, location)
-        }
+    append(buffer) { (helper, location) =>
+      metric_index_update.time {
+        executeStore(helper, location, batch, update, onComplete)
+      }
     }
   }
 
@@ -484,9 +497,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val baos = new DataByteArrayOutputStream(5)
     baos.writeByte(BEGIN)
     baos.writeInt(batch)
-    append(baos.toBuffer) {
-      location =>
-        executeBegin(batch, location)
+    append(baos.toBuffer) { (helper,location) =>
+      executeBegin(helper, location, batch)
     }
   }
 
@@ -496,9 +508,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val baos = new DataByteArrayOutputStream(5)
     baos.writeByte(COMMIT)
     baos.writeInt(batch)
-    append(baos.toBuffer) {
-      location =>
-        executeCommit(batch, onComplete, location)
+    append(baos.toBuffer) { (helper,location) =>
+      executeCommit(helper, location, batch, onComplete)
     }
   }
 
@@ -506,9 +517,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val baos = new DataByteArrayOutputStream(5)
     baos.writeByte(ROLLBACK)
     baos.writeInt(batch)
-    append(baos.toBuffer) {
-      location =>
-        executeRollback(batch, onComplete, location)
+    append(baos.toBuffer) { (helper,location) =>
+      executeRollback(helper, location, batch, onComplete)
     }
   }
 
@@ -606,12 +616,16 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     val updateType = editor.readByte()
     val batch = editor.readInt()
 
-    updateType match {
-      case BEGIN => executeBegin(batch, location)
-      case COMMIT => executeCommit(batch, null, location)
-      case _ =>
-        val update = decode(location, updateType, data)
-        executeStore(batch, update, null, location)
+    withTx { tx=>
+      val helper = new TxHelper(tx)
+      updateType match {
+        case BEGIN => executeBegin(helper, location, batch)
+        case COMMIT => executeCommit(helper, location, batch, null)
+        case _ =>
+          val update = decode(location, updateType, data)
+          executeStore(helper, location, batch, update, null)
+      }
+      helper.storeRootBean
     }
 
     recoveryCounter += 1
@@ -624,14 +638,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-  private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
+  private def append(data: Buffer)(cb: (TxHelper, Location) => List[Runnable]): Unit = {
     metric_journal_append.start { end =>
-      journal.write(data, new JournalCallback() {
-        def success(location: Location) = {
-          end()
-          cb(location)
-        }
-      })
+      def cbintercept(tx:TxHelper,location:Location) = {
+        end()
+        cb(tx, location)
+      }
+      journal.write(data, cbintercept _ )
     }
   }
 
@@ -644,74 +657,70 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   //
   /////////////////////////////////////////////////////////////////////
 
-  private def executeBegin(batch: Int, location: Location): Unit = {
+  private def executeBegin(helper:TxHelper, location: Location, batch: Int):List[Runnable] = {
     assert(batches.get(batch).isEmpty)
     batches.put(batch, (location, ListBuffer()))
+    Nil
   }
 
-  private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+  private def executeCommit(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable):List[Runnable] = {
     // apply all the updates in the batch as a single unit of work.
     batches.remove(batch) match {
       case Some((_, updates)) =>
         // When recovering.. we only want to redo updates that committed
         // after the last update location.
         if (!recovering || isAfterLastUpdateLocation(location)) {
-          withTx { tx =>
-              val helper = new TxHelper(tx)
-              // index the updates
-              updates.foreach {
-                update =>
-                  index(helper, update.update, update.location)
-              }
-              helper.updateLocations(location)
-          }
+            // index the updates
+            updates.foreach {
+              update =>
+                index(helper, update.update, update.location)
+            }
+            helper.updateLocations(location)
         }
       case None =>
         // when recovering..  we are more lax due recovery starting
         // in the middle of a stream of in progress batches
         assert(recovering)
     }
-    if (onComplete != null) {
-      onComplete.run
+    if(onComplete!=null) {
+      return List(onComplete)
+    } else {
+      Nil
     }
   }
 
-  private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = {
+  private def executeRollback(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable): List[Runnable] = {
     // apply all the updates in the batch as a single unit of work.
     batches.remove(batch) match {
       case Some((_, _)) =>
         if (!recovering || isAfterLastUpdateLocation(location)) {
-          withTx { tx =>
-            val helper = new TxHelper(tx)
-            helper.updateLocations(location)
-          }
+          helper.updateLocations(location)
         }
       case None =>
         // when recovering..  we are more lax due recovery starting
         // in the middle of a stream of in progress batches
         assert(recovering)
     }
-    if (onComplete != null) {
-      onComplete.run
+    if(onComplete!=null) {
+      return List(onComplete)
+    } else {
+      Nil
     }
   }
 
-  private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+  private def executeStore(helper:TxHelper, location: Location, batch: Int, update: TypeCreatable, onComplete: Runnable): List[Runnable] = {
     if (batch == -1) {
       // update is not part of the batch..
 
       // When recovering.. we only want to redo updates that happen
       // after the last update location.
       if (!recovering || isAfterLastUpdateLocation(location)) {
-        withTx { tx =>
-            val helper = new TxHelper(tx)
-            index(helper, update, location)
-            helper.updateLocations(location)
-        }
+          index(helper, update, location)
+          helper.updateLocations(location)
       }
 
-      if (onComplete != null) {
-        onComplete.run
+      if ( onComplete != null) {
+        return List(onComplete)
       }
     } else {
 
@@ -728,6 +737,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
           assert(recovering)
       }
     }
+    return Nil
   }
 
 
@@ -1064,6 +1074,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       } else {
         rootBean.setFirstBatchLocation(batches.head._2._1)
       }
+    }
+
+    def storeRootBean() = {
       rootBuffer = rootBean.freeze
       _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:13:10 2010
@@ -183,7 +183,7 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = {
+  def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
     executor_pool ^{
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java Wed Jul  7 04:13:10 2010
@@ -22,8 +22,8 @@ import org.fusesource.hawtbuf.Buffer;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class QueueEntryRange {
-    public long firstSeq;
-    public long lastSeq;
+    public long firstQueueSeq;
+    public long lastQueueSeq;
     public int count;
     public int size;
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961166&r1=961165&r2=961166&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:13:10 2010
@@ -74,10 +74,11 @@ trait Store extends Service {
   def listQueues(callback: (Seq[Long])=>Unit )
 
   /**
-   * Groups all the entries in the specified queue into groups containing limit entries and returns those
-   * groups.  Allows you to quickly get a rough idea of the items in queue without consuming too much memory.
+   * Groups all the entries in the specified queue into ranges containing up limit entries
+   * big and returns those ranges.  Allows you to incrementally, load all the entries in
+   * a queue.
    */
-  def listQueueEntryGroups(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryGroup])=>Unit )
+  def listQueueEntryRanges(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryRange])=>Unit )
 
   /**
    * Loads all the queue entry records for the given queue id between the first and last provided



Mime
View raw message