Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 44436 invoked from network); 7 Jul 2010 04:14:12 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 04:14:12 -0000 Received: (qmail 49216 invoked by uid 500); 7 Jul 2010 04:14:12 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 49156 invoked by uid 500); 7 Jul 2010 04:14:11 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49149 invoked by uid 99); 7 Jul 2010 04:14:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:14:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:14:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E5AB92388A64; Wed, 7 Jul 2010 04:13:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961166 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/scala/org/apache/... Date: Wed, 07 Jul 2010 04:13:10 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707041310.E5AB92388A64@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 04:13:10 2010 New Revision: 961166 URL: http://svn.apache.org/viewvc?rev=961166&view=rev Log: - Eliminated the use of tombstone entries in the queue list - Renamed QueueEntryGroup to QueueEntryRange and FlushedGroup to FlushedRange - Simplified serveral methods in the QueueEntry interface - Always start queues - Updated HawtDB store to use the new JournalListener interfaces Removed: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryGroup.java Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:13:10 2010 @@ -26,7 +26,8 @@ import org.apache.activemq.broker.store. import protocol.ProtocolFactory import java.util.concurrent.TimeUnit import java.util.{HashSet, Collections, ArrayList, LinkedList} -import org.apache.activemq.apollo.store.{QueueEntryGroup, QueueEntryRecord, MessageRecord} +import org.apache.activemq.apollo.store.{QueueEntryRange, QueueEntryRecord, MessageRecord} +import collection.mutable.ListBuffer /** * @author Hiram Chirino @@ -87,11 +88,9 @@ class Queue(val host: VirtualHost, val d var message_seq_counter = 1L val entries = new LinkedNodeList[QueueEntry]() - val head_entry = new QueueEntry(this, 0L); - var tail_entry = new QueueEntry(this, next_message_seq) - + val head_entry = new QueueEntry(this, 0L).head + var tail_entry = new QueueEntry(this, next_message_seq).tail entries.addFirst(head_entry) - head_entry.tombstone var loading_size = 0 var flushing_size = 0 @@ -180,31 +179,17 @@ class Queue(val host: VirtualHost, val d } if( tune_persistent ) { - host.store.listQueueEntryGroups(queueKey, tune_entry_group_size) { groups=> + host.store.listQueueEntryRanges(queueKey, tune_entry_group_size) { ranges=> dispatchQueue { - if( !groups.isEmpty ) { - - // adjust the head tombstone. - head_entry.as_tombstone.count = groups.head.firstSeq + if( !ranges.isEmpty ) { - var last:QueueEntryGroup = null - groups.foreach { group => - - // if groups were not adjacent.. create tombstone entry. - if( last!=null && (last.lastSeq+1 != group.firstSeq) ) { - val entry = new QueueEntry(Queue.this, last.lastSeq+1) - entry.tombstone.as_tombstone.count = group.firstSeq - (last.lastSeq+1) - entries.addLast(entry) - } - - val entry = new QueueEntry(Queue.this, group.firstSeq).init(group) + ranges.foreach { range => + val entry = new QueueEntry(Queue.this, range.firstQueueSeq).init(range) entries.addLast(entry) - message_seq_counter = group.lastSeq - enqueue_item_counter += group.count - enqueue_size_counter += group.size - - last = group + message_seq_counter = range.lastQueueSeq + 1 + enqueue_item_counter += range.count + enqueue_size_counter += range.size } debug("restored: "+enqueue_item_counter) @@ -297,7 +282,7 @@ class Queue(val host: VirtualHost, val d if (cur.is_flushed || cur.is_loaded) { total_items += 1 } else if (cur.is_flushed_group ) { - total_items += cur.as_flushed_group.items + total_items += cur.as_flushed_group.count } cur = cur.getNext @@ -541,21 +526,17 @@ class Queue(val host: VirtualHost, val d debug("swapping...") - var entry = entries.getHead + var entry = head_entry.getNext while( entry!=null ) { - if( entry.as_tombstone == null ) { - - val loaded = entry.as_loaded + val loaded = entry.as_loaded - // Keep around prefetched and loaded entries. - if( entry.is_prefetched || (loaded!=null && loaded.acquired)) { - entry.load - } else { - // flush the the others out of memory. - entry.flush - } + // Keep around prefetched and loaded entries. + if( entry.is_prefetched || (loaded!=null && loaded.acquired)) { + entry.load + } else { + // flush the the others out of memory. + entry.flush } - entry = entry.getNext } } @@ -594,7 +575,7 @@ class Queue(val host: VirtualHost, val d def collocate(value:DispatchQueue):Unit = { if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) { - println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel); + debug(dispatchQueue.getLabel+" co-locating with: "+value.getLabel); this.dispatchQueue.setTargetQueue(value.getTargetQueue) } } @@ -610,7 +591,8 @@ class QueueEntry(val queue:Queue, val se import QueueEntry._ // Subscriptions waiting to dispatch this entry. - var subscriptions:List[Subscription] = Nil + var parked:List[Subscription] = Nil + // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's // ready for them to get dispatched. var prefetched = 0 @@ -620,44 +602,56 @@ class QueueEntry(val queue:Queue, val se def is_prefetched = prefetched>0 + def head():QueueEntry = { + state = new Head + this + } + + def tail():QueueEntry = { + state = new Tail + this + } + def init(delivery:Delivery):QueueEntry = { - this.state = new Loaded(delivery, false) + state = new Loaded(delivery, false) queue.size += size this } def init(qer:QueueEntryRecord):QueueEntry = { - this.state = new Flushed(qer.messageKey, qer.size) + state = new Flushed(qer.messageKey, qer.size) this } - def init(group:QueueEntryGroup):QueueEntry = { - val count = (((group.lastSeq+1)-group.firstSeq)).toInt - val tombstones = count-group.count - this.state = new FlushedGroup(count, group.size, tombstones) + def init(range:QueueEntryRange):QueueEntry = { + state = new FlushedRange(range.lastQueueSeq, range.count, range.size) this } - def hasSubs = !(subscriptions == Nil ) + def hasSubs = !parked.isEmpty /** * Dispatches this entry to the consumers and continues dispatching subsequent - * entries if it has subscriptions which accept the dispatch. + * entries as long as the dispatch results in advancing in their dispatch position. */ def run() = { - var next = dispatch() - while( next!=null ) { - next = next.dispatch + var next = this; + while( next!=null && next.dispatch) { + next = next.getNext } } - def addSubscriptions(l:List[Subscription]) = { - subscriptions :::= l + def ::=(sub:Subscription) = { + parked ::= sub + } + + def :::=(l:List[Subscription]) = { + parked :::= l } - def removeSubscriptions(s:Subscription) = { - subscriptions = subscriptions.filterNot(_ == s) + def -=(s:Subscription) = { + parked = parked.filterNot(_ == s) } def nextOrTail():QueueEntry = { @@ -683,7 +677,7 @@ class QueueEntry(val queue:Queue, val se } override def toString = { - "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+subscriptions+"}" + "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}" } ///////////////////////////////////////////////////// @@ -693,11 +687,12 @@ class QueueEntry(val queue:Queue, val se ///////////////////////////////////////////////////// // What state is it in? - def as_tombstone = this.state.as_tombstone - def as_flushed = this.state.as_flushed - def as_flushed_group = this.state.as_flushed_group - def as_loaded = this.state.as_loaded - def as_tail = this.state.as_tail + def as_head = state.as_head + def as_tail = state.as_tail + + def as_flushed = state.as_flushed + def as_flushed_group = state.as_flushed_group + def as_loaded = state.as_loaded def is_tail = this == queue.tail_entry def is_head = this == queue.head_entry @@ -705,18 +700,17 @@ class QueueEntry(val queue:Queue, val se def is_loaded = as_loaded!=null def is_flushed = as_flushed!=null def is_flushed_group = as_flushed_group!=null - def is_tombstone = as_tombstone!=null // These should not change the current state. - def size = this.state.size - def messageKey = this.state.messageKey + def size = state.size + def messageKey = state.messageKey def is_flushed_or_flushing = state.is_flushed_or_flushing - def dispatch():QueueEntry = state.dispatch + def dispatch() = state.dispatch // These methods may cause a change in the current state. - def flush:QueueEntry = this.state.flush - def load:QueueEntry = this.state.load - def tombstone:QueueEntry = this.state.tombstone + def flush = state.flush + def load = state.load + def remove = state.remove trait EntryState { @@ -725,18 +719,46 @@ class QueueEntry(val queue:Queue, val se def as_tail:Tail = null def as_loaded:Loaded = null def as_flushed:Flushed = null - def as_flushed_group:FlushedGroup = null - def as_tombstone:Tombstone = null + def as_flushed_group:FlushedRange = null + def as_head:Head = null - def size:Int - def dispatch():QueueEntry - def messageKey:Long + /** + * Gets the size of this entry in bytes. The head and tail entries allways return 0. + */ + def size = 0 + + /** + * Gets the message key for the entry. + * @returns -1 if it is not known. + */ + def messageKey = -1L + + /** + * Attempts to dispatch the current entry to the subscriptions position at the entry. + * @returns true if at least one subscription advanced to the next entry as a result of dispatching. + */ + def dispatch() = false + + /** + * @returns true if the entry is either flushed or flushing. + */ def is_flushed_or_flushing = false - def load = entry + /** + * Triggers the entry to get loaded if it's not already loaded. + */ + def load = {} - def flush = entry + /** + * Triggers the entry to get flushed if it's not already flushed. + */ + def flush = {} + /** + * Takes the current entry out of the prefetch of all subscriptions + * which have prefetched the entry. Returns the list of subscriptions which + * had prefetched it. + */ def prefetch_remove = { var rc = List[Subscription]() if( queue.tune_flush_to_store ) { @@ -744,7 +766,7 @@ class QueueEntry(val queue:Queue, val se var cur = entry while( cur!=null && is_prefetched ) { if( cur.hasSubs ) { - (cur.subscriptions).foreach { sub => + (cur.parked).foreach { sub => if( sub.is_prefetched(entry) ) { sub.remove_from_prefetch(entry) rc ::= sub @@ -758,45 +780,62 @@ class QueueEntry(val queue:Queue, val se rc } - def tombstone = { - + /** + * Removes the entry from the queue's linked list of entries. This gets called + * as a result of an aquired ack. + */ + def remove = { + // take us out of subscription prefetches.. var refill_preftch_list = prefetch_remove + // advance subscriptions that were on this entry.. + parked.foreach(_.advance(next)) + nextOrTail :::= parked + parked = Nil + // take the entry of the entries list.. + unlink + // refill the subscription prefetches.. + refill_preftch_list.foreach( _.refill_prefetch ) + } - // if rv and lv are both adjacent tombstones, then this merges the rv - // tombstone into lv, unlinks rv, and returns lv, otherwise it returns - // rv. - def merge(lv:QueueEntry, rv:QueueEntry):QueueEntry = { - if( lv==null || rv==null) { - return rv - } - - val lts = lv.state.as_tombstone - val rts = rv.state.as_tombstone + /** + * Advances the specified subscriptions to the next entry in + * the linked list + */ + def advance(advancing: Seq[Subscription]): Unit = { + val nextPos = nextOrTail + nextPos :::= advancing.toList + advancing.foreach(_.advance(nextPos)) + } - if( lts==null || rts==null ) { - return rv - } + } - // Sanity check: the the entries are adjacent.. this should - // always be the case. - assert( lv.seq + lts.count == rv.seq , "entries are not adjacent.") + /** + * Used for the head entry. This is the starting point for all new subscriptions. + */ + class Head extends EntryState { - lts.count += rts.count - rv.dispatch // moves the subs to the next entry. - rv.unlink - return lv - } + override def toString = "head" + override def as_head = this - state = new Tombstone() - merge(entry, getNext) - val rc = merge(getPrevious, entry) + /** + * New subs get parked here at the Head. There is nothing to actually dispatch + * in this entry.. just advance the parked subs onto the next entry. + */ + override def dispatch() = { + if( parked != Nil ) { - refill_preftch_list.foreach( _.refill_prefetch ) + advance(parked) + parked = Nil + true - rc.run // dispatch to move the subs to the next entry.. - rc + } else { + false + } } + override def remove = throw new AssertionError("Head entry cannot be removed") + override def load = throw new AssertionError("Head entry cannot be loaded") + override def flush = throw new AssertionError("Head entry cannot be flushed") } /** @@ -806,30 +845,30 @@ class QueueEntry(val queue:Queue, val se */ class Tail extends EntryState { + override def toString = "tail" override def as_tail:Tail = this - def size = 0 - def messageKey = -1 - def dispatch():QueueEntry = null - - override def toString = { "tail" } + override def remove = throw new AssertionError("Tail entry cannot be removed") override def load = throw new AssertionError("Tail entry cannot be loaded") override def flush = throw new AssertionError("Tail entry cannot be flushed") } /** - * This state is used while a message is loaded in memory. A message must be in this state - * before it can be dispatched to a consumer. It can transition to Flushed or Tombstone. + * The entry is in this state while a message is loaded in memory. A message must be in this state + * before it can be dispatched to a subscription. */ - class Loaded(val delivery: Delivery, var store_completed:Boolean) extends EntryState { + class Loaded(val delivery: Delivery, var stored:Boolean) extends EntryState { + + assert( delivery!=null, "delivery cannot be null") var acquired = false - def messageKey = delivery.storeKey - def size = delivery.size var flushing = false - override def toString = { "loaded:{ flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" } + override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" } + + override def size = delivery.size + override def messageKey = delivery.storeKey override def is_flushed_or_flushing = { flushing @@ -846,7 +885,7 @@ class QueueEntry(val queue:Queue, val se override def flush() = { if( queue.tune_flush_to_store ) { - if( store_completed ) { + if( stored ) { flushing=true flushed } else { @@ -882,12 +921,10 @@ class QueueEntry(val queue:Queue, val se } } } - - entry } def flushed() = { - store_completed = true + stored = true delivery.uow = null if( flushing ) { queue.flushing_size-=size @@ -901,66 +938,56 @@ class QueueEntry(val queue:Queue, val se flushing = false queue.flushing_size-=size } - entry } - override def tombstone = { + override def remove = { if( flushing ) { flushing = false queue.flushing_size-=size } queue.size -= size - super.tombstone + super.remove } - def dispatch():QueueEntry = { + override def dispatch():Boolean = { // Nothing to dispatch if we don't have subs.. - if( subscriptions==Nil ) { - // This usualy happens when a new consumer connects, it's not marked as slow but - // is not at the tail. And this entry is an entry just sent by a producer. - return nextOrTail - } - - // can't dispatch until the delivery is set. - if( delivery==null ) { - // TODO: check to see if this ever happens - return null + if( parked.isEmpty ) { + return false } - var heldBack:List[Subscription] = Nil - var advancing:List[Subscription] = Nil - + var heldBack = ListBuffer[Subscription]() + var advancing = ListBuffer[Subscription]() var acquiringSub: Subscription = null - subscriptions.foreach{ sub=> + parked.foreach{ sub=> if( sub.browser ) { if (!sub.matches(delivery)) { // advance: not interested. - advancing ::= sub + advancing += sub } else { if (sub.offer(delivery)) { // advance: accepted... - advancing ::= sub + advancing == sub } else { // hold back: flow controlled - heldBack ::= sub + heldBack += sub } } } else { if( acquired ) { // advance: another sub already acquired this entry.. - advancing = advancing ::: sub :: Nil + advancing += sub } else { if (!sub.matches(delivery)) { // advance: not interested. - advancing = advancing ::: sub :: Nil + advancing += sub } else { if( sub.full ) { // hold back: flow controlled - heldBack = heldBack ::: sub :: Nil + heldBack += sub } else { // advance: accepted... acquiringSub = sub @@ -983,23 +1010,23 @@ class QueueEntry(val queue:Queue, val se // the other competing subs get first dibs at the next entry. if( acquiringSub != null ) { - // Advancing may need to be held back because the sub's prefetch is full. - if( acquiringSub.prefetchFull ) { - advancing = advancing ::: acquiringSub :: Nil + // Advancing may need to be held back because the sub's prefetch is full + if( acquiringSub.prefetchFull && !acquiringSub.is_prefetched(getNext) ) { + heldBack += acquiringSub } else { - heldBack = heldBack ::: acquiringSub :: Nil + advancing += acquiringSub } } - // The held back subs stay on this entry.. - subscriptions = heldBack + if ( advancing.isEmpty ) { + return false + } else { - // the advancing subs move on to the next entry... - if ( advancing!=Nil ) { + // The held back subs stay on this entry.. + parked = heldBack.toList - val next = nextOrTail - next.addSubscriptions(advancing) - advancing.foreach(_.advance(next)) + // the advancing subs move on to the next entry... + advance(advancing) // flush this entry out if it's not going to be needed soon. def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined @@ -1007,60 +1034,98 @@ class QueueEntry(val queue:Queue, val se // then flush out to make space... flush } - - return next - } else { - return null + return true } } } - /** - * Entries which have been deleted get put into the Tombstone state. Adjacent entries in the - * Tombstone state get merged into a single entry. + * Loaded entries are moved into the Flushed state reduce memory usage. Once a Loaded + * entry is persisted, it can move into this state. This state only holds onto the + * the massage key so that it can reload the message from the store quickly when needed. */ - class Tombstone extends EntryState { + class Flushed(override val messageKey:Long, override val size:Int) extends EntryState { - /** The number of adjacent entries this Tombstone represents. */ - var count = 1L + var loading = false - def size = 0 - def messageKey = -1 + override def as_flushed = this - override def as_tombstone = this + override def is_flushed_or_flushing = true - /** - * Nothing ot dispatch in a Tombstone, move the subscriptions to the next entry. - */ - def dispatch():QueueEntry = { - assert(prefetched==0, "tombstones should never be prefetched.") + override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" } - val next = nextOrTail - next.addSubscriptions(subscriptions) - subscriptions.foreach(_.advance(next)) - subscriptions = Nil - next - } + override def load() = { + if( !loading ) { +// trace("Start entry load of message seq: %s", seq) + // start loading it back... + loading = true + queue.loading_size += size + queue.host.store.loadMessage(messageKey) { delivery => + // pass off to a source so it can aggregate multiple + // loads to reduce cross thread synchronization + if( delivery.isDefined ) { + queue.dispatchQueue { + queue.store_load_source.merge((this, delivery.get)) + } + } else { - override def tombstone = throw new AssertionError("Tombstone entry cannot be tombstoned") - override def toString = { "tombstone:{ count: "+count+"}" } + info("Detected store dropped message at seq: %d", seq) - } + // Looks like someone else removed the message from the store.. lets just + // tombstone this entry now. + queue.dispatchQueue { + remove + } + } + } + } + } + def loaded(messageRecord:MessageRecord) = { + if( loading ) { +// debug("Loaded message seq: ", seq ) + loading = false + queue.loading_size -= size - class FlushedGroup( - /** The number of adjacent entries this FlushedGroup represents. */ - var count:Long, - /** size in bytes of the group */ - var size:Int, - /** The number of tombstone entries in the groups */ - var tombstones:Int ) extends EntryState { + val delivery = new Delivery() + delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value) + delivery.size = messageRecord.size + delivery.storeKey = messageRecord.key + queue.size += size + state = new Loaded(delivery, true) + } else { +// debug("Ignoring store load of: ", messageKey) + } + } - def items = count - tombstones - def messageKey = -1 + override def remove = { + if( loading ) { + loading = false + queue.loading_size -= size + } + super.remove + } + } + + /** + * A FlushedRange stat is assigned entry is used to represent a rage of flushed entries. + * + * Even when entries that are Flushed can us a significant amount of memory if the queue is holding + * thousands of them. Multiple entries in the Flushed state can be combined into a single entry in + * the FlushedRange state thereby conserving even more memory. A FlushedRange entry only tracks + * the first, and last sequnce ids of the range. When the entry needs to be loaded from the range + * it replaces the FlushedRange entry with all the Flushed entries by querying the store of all the + * message keys for the entries in the range. + */ + class FlushedRange( + /** the last seq id in the range */ + var last:Long, + /** the number of items in the range */ + var count:Int, + /** size in bytes of the range */ + override val size:Int) extends EntryState { var loading = false @@ -1068,51 +1133,34 @@ class QueueEntry(val queue:Queue, val se override def is_flushed_or_flushing = true - override def toString = { "flushed_group:{ loading: "+loading+", items: "+items+", size: "+size+"}" } - - // Flushed entries can't be dispatched until - // they get loaded. - def dispatch():QueueEntry = { - null - } + override def toString = { "flushed_group:{ loading: "+loading+", count: "+count+", size: "+size+"}" } override def load() = { if( !loading ) { loading = true - queue.host.store.listQueueEntries(queue.queueKey, seq, seq+count-1) { records => + queue.host.store.listQueueEntries(queue.queueKey, seq, last) { records => queue.dispatchQueue { var item_count=0 var size_count=0 - var last:QueueEntryRecord = null val tmpList = new LinkedNodeList[QueueEntry]() records.foreach { record => - - // if entries were not adjacent.. create tombstone entry. - if( last!=null && (last.queueSeq+1 != record.queueSeq) ) { - val entry = new QueueEntry(queue, last.queueSeq+1) - entry.tombstone.as_tombstone.count = record.queueSeq - (last.queueSeq+1) - tmpList.addLast(entry) - } - val entry = new QueueEntry(queue, record.queueSeq).init(record) tmpList.addLast(entry) - item_count += 1 size_count += record.size - last = record } // we may need to adjust the enqueue count if entries // were dropped at the store level - var item_delta = (items - item_count) + var item_delta = (count - item_count) val size_delta: Int = size - size_count if ( item_delta!=0 || size_delta!=0 ) { assert(item_delta <= 0) assert(size_delta <= 0) - info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, seq+count-1, size_delta) + info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, last, size_delta) queue.enqueue_item_counter += item_delta queue.enqueue_size_counter += size_delta } @@ -1123,103 +1171,21 @@ class QueueEntry(val queue:Queue, val se val next = getNext // move the subs to the first entry that we just loaded. - subscriptions.foreach(_.advance(next)) - next.addSubscriptions(subscriptions) + parked.foreach(_.advance(next)) + next :::= parked unlink refill_preftch_list.foreach( _.refill_prefetch ) } } } - entry } - override def tombstone = { - throw new AssertionError("Flush group cannbot be tombstone."); + override def remove = { + throw new AssertionError("Flushed range cannbot be removed."); } } - /** - * Entries in the Flushed state are not holding the referenced messages in memory anymore. - * This state can transition to Loaded or Tombstone. - * - */ - class Flushed(val messageKey:Long, val size:Int) extends EntryState { - - var loading = false - - override def as_flushed = this - - override def is_flushed_or_flushing = true - - override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" } - - // Flushed entries can't be dispatched until - // they get loaded. - def dispatch():QueueEntry = { - // This dispatch can happen when a subscription is holding onto lots of acquired entries - // it can't prefetch anymore as it's waiting for ack on those messages to avoid - // blowing it's memory limits. - null - } - - override def load() = { - if( !loading ) { -// trace("Start entry load of message seq: %s", seq) - // start loading it back... - loading = true - queue.loading_size += size - queue.host.store.loadMessage(messageKey) { delivery => - // pass off to a source so it can aggregate multiple - // loads to reduce cross thread synchronization - if( delivery.isDefined ) { -// debug("Store found message seq: %d", seq) - queue.store_load_source.merge((this, delivery.get)) - } else { - - info("Detected store dropped message at seq: %d", seq) - - // Looks like someone else removed the message from the store.. lets just - // tombstone this entry now. - queue.dispatchQueue { - tombstone - } - } - } - } - entry - } - - def loaded(messageRecord:MessageRecord) = { - if( loading ) { -// debug("Loaded message seq: ", seq ) - loading = false - queue.loading_size -= size - - val delivery = new Delivery() - delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value) - delivery.size = messageRecord.size - delivery.storeKey = messageRecord.key - - queue.size += size - state = new Loaded(delivery, true) - } else { -// debug("Ignoring store load of: ", messageKey) - } - } - - - override def tombstone = { - if( loading ) { -// debug("Tombstoned, will ignore store load of seq: ", seq) - loading = false - queue.loading_size -= size - } - super.tombstone - } - } - - } @@ -1248,7 +1214,7 @@ class Subscription(queue:Queue) extends override def toString = { def seq(entry:QueueEntry) = if(entry==null) null else entry.seq - "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+" }" + "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+"}" } def browser = session.consumer.browser @@ -1257,7 +1223,7 @@ class Subscription(queue:Queue) extends pos = queue.head_entry; session = consumer.connect(this) session.refiller = pos - queue.head_entry.addSubscriptions(this :: Nil) + queue.head_entry ::= this if( queue.serviceState.isStarted ) { // kick off the initial dispatch. @@ -1267,7 +1233,7 @@ class Subscription(queue:Queue) extends } def close() = { - pos.removeSubscriptions(this) + pos.-=(this) invalidate_prefetch @@ -1298,7 +1264,7 @@ class Subscription(queue:Queue) extends assert(value!=null) // Remove the previous pos from the prefetch counters. - if( prefetch_tail!=null && !pos.is_tombstone) { + if( prefetch_tail!=null && !pos.is_head) { remove_from_prefetch(pos) } advanced_size += pos.size @@ -1329,7 +1295,7 @@ class Subscription(queue:Queue) extends // release the prefetch counters... var cur = pos while (cur.seq <= prefetch_tail.seq) { - if (!cur.is_tombstone) { + if (!cur.is_head) { prefetched_size -= cur.size cur.prefetched -= 1 } @@ -1344,12 +1310,12 @@ class Subscription(queue:Queue) extends * Is the specified queue entry prefeteched by this subscription? */ def is_prefetched(value:QueueEntry) = { - prefetch_tail!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq + prefetch_tail!=null && value!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq } def add_to_prefetch(entry:QueueEntry):Unit = { - assert( !entry.is_tombstone, "tombstones should not be prefetched..") + assert( !entry.is_head, "tombstones should not be prefetched..") prefetched_size += entry.size entry.prefetched += 1 entry.load @@ -1386,7 +1352,7 @@ class Subscription(queue:Queue) extends // attempts to fill the prefetch... var next = next_prefetch_pos while( !prefetchFull && next!=null ) { - if( !next.is_tombstone ) { + if( !next.is_head ) { add_to_prefetch(next) } next = next.getNext @@ -1433,7 +1399,7 @@ class Subscription(queue:Queue) extends // we may now be able to prefetch some messages.. acquired_size -= entry.size - entry.tombstone // entry size changes to 0 + entry.remove // entry size changes to 0 refill_prefetch } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:13:10 2010 @@ -268,9 +268,9 @@ class VirtualHost(val broker: Broker) ex case Some(queueKey) => dispatchQueue { val queue = new Queue(this, dest, queueKey) + queue.start() queues.put(dest.getName, queue) cb(queue) - queue.start() } case None => // store could not create cb(null) @@ -278,6 +278,7 @@ class VirtualHost(val broker: Broker) ex } } else { val queue = new Queue(this, dest) + queue.start() queues.put(dest.getName, queue) cb(queue) } Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:13:10 2010 @@ -206,11 +206,11 @@ class CassandraClient() { } } - def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryGroup] = { + def listQueueEntryGroups(queueKey: Long, limit: Int): Seq[QueueEntryRange] = { withSession { session => - var rc = ListBuffer[QueueEntryGroup]() - var group:QueueEntryGroup = null + var rc = ListBuffer[QueueEntryRange]() + var group:QueueEntryRange = null // TODO: this is going to bring back lots of entries.. not good. session.list(schema.entries \ queueKey).foreach { x=> @@ -218,10 +218,10 @@ class CassandraClient() { val record:QueueEntryRecord = x.value if( group == null ) { - group = new QueueEntryGroup - group.firstSeq = record.queueSeq + group = new QueueEntryRange + group.firstQueueSeq = record.queueSeq } - group.lastSeq = record.queueSeq + group.lastQueueSeq = record.queueSeq group.count += 1 group.size += record.size if( group.count == limit) { Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:13:10 2010 @@ -195,7 +195,7 @@ class CassandraStore extends Store with } - def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = { + def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = { blocking { callback( client.listQueueEntryGroups(queueKey, limit) ) } Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:13:10 2010 @@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.proto.Mess import org.fusesource.hawtbuf.proto.PBMessage import org.apache.activemq.util.LockFile import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location} +import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location} import org.fusesource.hawtdispatch.TaskTracker import org.fusesource.hawtbuf.AsciiBuffer._ @@ -152,6 +152,20 @@ class HawtDBClient(hawtDBStore: HawtDBSt journal.setMaxFileLength(config.journalLogSize) journal.setMaxWriteBatchSize(config.journalBatchSize); journal.setChecksum(true); + journal.setListener( new JournalListener{ + def synced(writes: Array[Write]) = { + var onCompletes = List[Runnable]() + withTx { tx=> + val helper = new TxHelper(tx) + writes.foreach { write=> + val func = write.getAttachment.asInstanceOf[(TxHelper, Location)=>List[Runnable]] + onCompletes = onCompletes ::: func(helper, write.getLocation) + } + helper.storeRootBean + } + onCompletes.foreach( _.run ) + } + }) if( config.archiveDirectory!=null ) { journal.setDirectoryArchive(config.archiveDirectory) @@ -169,10 +183,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt pageFileFactory.open() val initialized = withTx { tx => - val helper = new TxHelper(tx) - import helper._ - if (!tx.allocator().isAllocated(0)) { + val helper = new TxHelper(tx) + import helper._ + val rootPage = tx.alloc() assert(rootPage == 0) @@ -181,9 +195,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY)) rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY)) rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY)) - rootBuffer = rootBean.freeze - tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer) + helper.storeRootBean + true } else { rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0) @@ -296,7 +310,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt } } - def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryGroup] = { + def listQueueEntryGroups(queueKey: Long, limit: Int) : Seq[QueueEntryRange] = { withTx { tx => val helper = new TxHelper(tx) import JavaConversions._ @@ -307,15 +321,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt if (queueRecord != null) { val entryIndex = queueEntryIndex(queueRecord) - var rc = ListBuffer[QueueEntryGroup]() - var group:QueueEntryGroup = null + var rc = ListBuffer[QueueEntryRange]() + var group:QueueEntryRange = null entryIndex.iterator.foreach { entry => if( group == null ) { - group = new QueueEntryGroup - group.firstSeq = entry.getKey.longValue + group = new QueueEntryRange + group.firstQueueSeq = entry.getKey.longValue } - group.lastSeq = entry.getKey.longValue + group.lastQueueSeq = entry.getKey.longValue group.count += 1 group.size += entry.getValue.getSize if( group.count == limit) { @@ -470,11 +484,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt frozen.writeFramed(baos) val buffer = baos.toBuffer() - append(buffer) { - location => - metric_index_update.time { - executeStore(batch, update, onComplete, location) - } + append(buffer) { (helper, location) => + metric_index_update.time { + executeStore(helper, location, batch, update, onComplete) + } } } @@ -484,9 +497,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt val baos = new DataByteArrayOutputStream(5) baos.writeByte(BEGIN) baos.writeInt(batch) - append(baos.toBuffer) { - location => - executeBegin(batch, location) + append(baos.toBuffer) { (helper,location) => + executeBegin(helper, location, batch) } } @@ -496,9 +508,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt val baos = new DataByteArrayOutputStream(5) baos.writeByte(COMMIT) baos.writeInt(batch) - append(baos.toBuffer) { - location => - executeCommit(batch, onComplete, location) + append(baos.toBuffer) { (helper,location) => + executeCommit(helper, location, batch, onComplete) } } @@ -506,9 +517,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt val baos = new DataByteArrayOutputStream(5) baos.writeByte(ROLLBACK) baos.writeInt(batch) - append(baos.toBuffer) { - location => - executeRollback(batch, onComplete, location) + append(baos.toBuffer) { (helper,location) => + executeRollback(helper, location, batch, onComplete) } } @@ -606,12 +616,16 @@ class HawtDBClient(hawtDBStore: HawtDBSt val updateType = editor.readByte() val batch = editor.readInt() - updateType match { - case BEGIN => executeBegin(batch, location) - case COMMIT => executeCommit(batch, null, location) - case _ => - val update = decode(location, updateType, data) - executeStore(batch, update, null, location) + withTx { tx=> + val helper = new TxHelper(tx) + updateType match { + case BEGIN => executeBegin(helper, location, batch) + case COMMIT => executeCommit(helper, location, batch, null) + case _ => + val update = decode(location, updateType, data) + executeStore(helper, location, batch, update, null) + } + helper.storeRootBean } recoveryCounter += 1 @@ -624,14 +638,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt // ///////////////////////////////////////////////////////////////////// - private def append(data: Buffer)(cb: (Location) => Unit): Unit = { + private def append(data: Buffer)(cb: (TxHelper, Location) => List[Runnable]): Unit = { metric_journal_append.start { end => - journal.write(data, new JournalCallback() { - def success(location: Location) = { - end() - cb(location) - } - }) + def cbintercept(tx:TxHelper,location:Location) = { + end() + cb(tx, location) + } + journal.write(data, cbintercept _ ) } } @@ -644,74 +657,70 @@ class HawtDBClient(hawtDBStore: HawtDBSt // ///////////////////////////////////////////////////////////////////// - private def executeBegin(batch: Int, location: Location): Unit = { + private def executeBegin(helper:TxHelper, location: Location, batch: Int):List[Runnable] = { assert(batches.get(batch).isEmpty) batches.put(batch, (location, ListBuffer())) + Nil } - private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = { + private def executeCommit(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable):List[Runnable] = { // apply all the updates in the batch as a single unit of work. batches.remove(batch) match { case Some((_, updates)) => // When recovering.. we only want to redo updates that committed // after the last update location. if (!recovering || isAfterLastUpdateLocation(location)) { - withTx { tx => - val helper = new TxHelper(tx) - // index the updates - updates.foreach { - update => - index(helper, update.update, update.location) - } - helper.updateLocations(location) - } + // index the updates + updates.foreach { + update => + index(helper, update.update, update.location) + } + helper.updateLocations(location) } case None => // when recovering.. we are more lax due recovery starting // in the middle of a stream of in progress batches assert(recovering) } - if (onComplete != null) { - onComplete.run + if(onComplete!=null) { + return List(onComplete) + } else { + Nil } } - private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = { + private def executeRollback(helper:TxHelper, location: Location, batch: Int, onComplete: Runnable): List[Runnable] = { // apply all the updates in the batch as a single unit of work. batches.remove(batch) match { case Some((_, _)) => if (!recovering || isAfterLastUpdateLocation(location)) { - withTx { tx => - val helper = new TxHelper(tx) - helper.updateLocations(location) - } + helper.updateLocations(location) } case None => // when recovering.. we are more lax due recovery starting // in the middle of a stream of in progress batches assert(recovering) } - if (onComplete != null) { - onComplete.run + if(onComplete!=null) { + return List(onComplete) + } else { + Nil } } - private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = { + private def executeStore(helper:TxHelper, location: Location, batch: Int, update: TypeCreatable, onComplete: Runnable): List[Runnable] = { if (batch == -1) { // update is not part of the batch.. // When recovering.. we only want to redo updates that happen // after the last update location. if (!recovering || isAfterLastUpdateLocation(location)) { - withTx { tx => - val helper = new TxHelper(tx) - index(helper, update, location) - helper.updateLocations(location) - } + index(helper, update, location) + helper.updateLocations(location) } - if (onComplete != null) { - onComplete.run + if ( onComplete != null) { + return List(onComplete) } } else { @@ -728,6 +737,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt assert(recovering) } } + return Nil } @@ -1064,6 +1074,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt } else { rootBean.setFirstBatchLocation(batches.head._2._1) } + } + + def storeRootBean() = { rootBuffer = rootBean.freeze _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer) } Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:13:10 2010 @@ -183,7 +183,7 @@ class HawtDBStore extends Store with Bas } } - def listQueueEntryGroups(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryGroup]) => Unit) = { + def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = { executor_pool ^{ callback( client.listQueueEntryGroups(queueKey, limit) ) } Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRange.java Wed Jul 7 04:13:10 2010 @@ -22,8 +22,8 @@ import org.fusesource.hawtbuf.Buffer; * @author Hiram Chirino */ public class QueueEntryRange { - public long firstSeq; - public long lastSeq; + public long firstQueueSeq; + public long lastQueueSeq; public int count; public int size; } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961166&r1=961165&r2=961166&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:13:10 2010 @@ -74,10 +74,11 @@ trait Store extends Service { def listQueues(callback: (Seq[Long])=>Unit ) /** - * Groups all the entries in the specified queue into groups containing limit entries and returns those - * groups. Allows you to quickly get a rough idea of the items in queue without consuming too much memory. + * Groups all the entries in the specified queue into ranges containing up limit entries + * big and returns those ranges. Allows you to incrementally, load all the entries in + * a queue. */ - def listQueueEntryGroups(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryGroup])=>Unit ) + def listQueueEntryRanges(queueKey:Long, limit:Int)(callback:(Seq[QueueEntryRange])=>Unit ) /** * Loads all the queue entry records for the given queue id between the first and last provided