Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E92649D70 for ; Wed, 8 Feb 2012 15:06:55 +0000 (UTC) Received: (qmail 18069 invoked by uid 500); 8 Feb 2012 15:06:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 18016 invoked by uid 500); 8 Feb 2012 15:06:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 18008 invoked by uid 99); 8 Feb 2012 15:06:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2012 15:06:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2012 15:06:54 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 244DD23888FD for ; Wed, 8 Feb 2012 15:06:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1241924 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Date: Wed, 08 Feb 2012 15:06:34 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120208150634.244DD23888FD@eris.apache.org> Author: chirino Date: Wed Feb 8 15:06:33 2012 New Revision: 1241924 URL: http://svn.apache.org/viewvc?rev=1241924&view=rev Log: Don't combine swap entries while they are loading. Fixes unlink errors. Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1241924&r1=1241923&r2=1241924&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Feb 8 15:06:33 2012 @@ -1232,6 +1232,11 @@ class QueueEntry(val queue:Queue, val se def is_swapped = as_swapped!=null def is_swapped_range = as_swapped_range!=null def is_swapped_or_swapped_range = is_swapped || is_swapped_range + def is_loading = state match { + case state:SwappedRange => state.loading + case state:Swapped => state.loading + case _ => false + } // These should not change the current state. def count = state.count @@ -1256,7 +1261,7 @@ class QueueEntry(val queue:Queue, val se getPrevious !=null && getPrevious.is_swapped_range && ( (is_swapped && !is_acquired) || is_swapped_range ) && - (getPrevious.count + count < queue.tune_swap_range_size) + (getPrevious.count + count < queue.tune_swap_range_size) && !is_loading } trait EntryState { @@ -1737,6 +1742,8 @@ class QueueEntry(val queue:Queue, val se override def toString = { "swapped:{ swapping_in: "+space+", acquired:"+acquirer+", size:"+size+"}" } + def loading = this.space!=null + override def swap_in(mem_space:MemorySpace) = { if( this.space==null ) { // trace("Start entry load of message seq: %s", seq) @@ -1887,64 +1894,61 @@ class QueueEntry(val queue:Queue, val se override def size = _size override def expiration = _expiration - var swapping_in = false + var loading = false override def as_swapped_range = this override def is_swapped_or_swapping_out = true + def label = { var rc = "swapped_range" - if( swapping_in ) { + if( loading ) { rc = "swapped_range|swapping in" } rc } - override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" } + override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" } override def swap_in(space:MemorySpace) = { - if( !swapping_in ) { - swapping_in = true + if( !loading ) { + loading = true queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records => - if( !records.isEmpty ) { - queue.dispatch_queue { - - var item_count=0 - var size_count=0 - - val tmpList = new LinkedNodeList[QueueEntry]() - records.foreach { record => - val entry = new QueueEntry(queue, record.entry_seq).init(record) - tmpList.addLast(entry) - item_count += 1 - size_count += record.size - } - - // we may need to adjust the enqueue count if entries - // were dropped at the store level - var item_delta = (count - item_count) - val size_delta: Int = size - size_count - - if ( item_delta!=0 || size_delta!=0 ) { - info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta) - queue.enqueue_item_counter += item_delta - queue.enqueue_size_counter += size_delta - } + queue.dispatch_queue { + loading = false + assert(isLinked) + + var item_count=0 + var size_count=0 + + val tmpList = new LinkedNodeList[QueueEntry]() + records.foreach { record => + val entry = new QueueEntry(queue, record.entry_seq).init(record) + tmpList.addLast(entry) + item_count += 1 + size_count += record.size + } - linkAfter(tmpList) - val next = getNext + // we may need to adjust the enqueue count if entries + // were dropped at the store level + var item_delta = (count - item_count) + val size_delta: Int = size - size_count + + if ( item_delta!=0 || size_delta!=0 ) { + info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta) + queue.enqueue_item_counter += item_delta + queue.enqueue_size_counter += size_delta + } - // move the subs to the first entry that we just loaded. - parked.foreach(_.advance(next)) - next :::= parked - queue.trigger_swap + linkAfter(tmpList) + val next = getNext - unlink + // move the subs to the first entry that we just loaded. + parked.foreach(_.advance(next)) + next :::= parked + queue.trigger_swap - // TODO: refill prefetches - } - } else { - warn("range load failed") + unlink } } } @@ -1958,6 +1962,7 @@ class QueueEntry(val queue:Queue, val se assert(value!=null) assert(value.is_swapped || value.is_swapped_range) assert(!value.is_acquired) + assert(!value.is_loading) if( value.is_swapped ) { assert(last < value.seq ) last = value.seq