activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1091853 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Date Wed, 13 Apr 2011 17:19:49 GMT
Author: chirino
Date: Wed Apr 13 17:19:49 2011
New Revision: 1091853

URL: http://svn.apache.org/viewvc?rev=1091853&view=rev
Log:
Better dispatch execution assertion checking to catch usage errors.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1091853&r1=1091852&r2=1091853&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Apr 13 17:19:49 2011
@@ -674,7 +674,7 @@ class QueueEntry(val queue:Queue, val se
    * Dispatches this entry to the consumers and continues dispatching subsequent
    * entries as long as the dispatch results in advancing in their dispatch position.
    */
-  def run() = {
+  def run() = queue.dispatch_queue {
     var next = this;
     while( next!=null && next.dispatch) {
       next = next.getNext
@@ -1018,6 +1018,8 @@ class QueueEntry(val queue:Queue, val se
 
     override def dispatch():Boolean = {
 
+      queue.assert_executing
+
       // Nothing to dispatch if we don't have subs..
       if( parked.isEmpty ) {
         return false
@@ -1328,7 +1330,7 @@ object Subscription extends Log
  * tracks the entries which the consumer has acquired.
  *
  */
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer
{
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer
with Dispatched {
   import Subscription._
 
   def dispatch_queue = queue.dispatch_queue
@@ -1501,6 +1503,11 @@ class Subscription(val queue:Queue, val 
     acquired_size += entry.size
 
     def ack(sb:StoreUOW):Unit = {
+      assert_executing
+      if(!isLinked) {
+        warn("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
+        return
+      }
       // The session may have already been closed..
       if( session == null ) {
         return;
@@ -1539,6 +1546,11 @@ class Subscription(val queue:Queue, val 
     }
 
     def nack:Unit = {
+      assert_executing
+      if(!isLinked) {
+        warn("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
+        return
+      }
       // The session may have already been closed..
       if( session == null ) {
         return;

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala?rev=1091853&r1=1091852&r2=1091853&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Dispatched.scala
Wed Apr 13 17:19:49 2011
@@ -28,7 +28,7 @@ import org.fusesource.hawtdispatch._
 trait Dispatched {
   def dispatch_queue:DispatchQueue
 
-  protected def assert_executing = assert( dispatch_queue.isExecuting,
+  def assert_executing = assert( dispatch_queue.isExecuting,
     "Dispatch queue '%s' was not executing, (currently executing: %s)".format(
       Option(dispatch_queue.getLabel).getOrElse(""),
       Option(getCurrentQueue).map(_.getLabel).getOrElse("None") )



Mime
View raw message