activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1025592 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Wed, 20 Oct 2010 14:12:41 GMT
Author: chirino
Date: Wed Oct 20 14:12:41 2010
New Revision: 1025592

URL: http://svn.apache.org/viewvc?rev=1025592&view=rev
Log:
Fixing nack/redelivery bug.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1025592&r1=1025591&r2=1025592&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Oct 20 14:12:41 2010
@@ -29,7 +29,7 @@ import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
-import org.fusesource.hawtdispatch.{ListEventAggregator, DispatchQueue, BaseRetained}
+import org.fusesource.hawtdispatch.{Dispatch, ListEventAggregator, DispatchQueue, BaseRetained}
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -377,7 +377,7 @@ class Queue(val host: VirtualHost, var i
           // Combine flushed items into flushed ranges
           if( flushed_items > tune_flush_range_size*2 ) {
 
-            println("Looking for flushed entries to combine")
+            debug("Looking for flushed entries to combine")
 
             var distance_from_sub = tune_flush_range_size;
             var cur = entries.getHead
@@ -405,7 +405,7 @@ class Queue(val host: VirtualHost, var i
               cur = next
             }
 
-            println("combined "+combine_counter+" entries")
+            debug("combined %d entries", combine_counter)
 
           }
 
@@ -880,7 +880,6 @@ class QueueEntry(val queue:Queue, val se
      */
     override def dispatch() = {
       if( parked != Nil ) {
-
         advance(parked)
         parked = Nil
         true
@@ -1370,10 +1369,6 @@ class Subscription(queue:Queue) extends 
     pos -= this
     pos = null
 
-    session.refiller = null
-    session.close
-    session = null
-
     // nack all the acquired entries.
     var next = acquired.getHead
     while( next !=null ) {
@@ -1381,6 +1376,10 @@ class Subscription(queue:Queue) extends 
       next = next.getNext
       cur.nack // this unlinks the entry.
     }
+
+    session.refiller = null
+    session.close
+    session = null
   }
 
   /**
@@ -1411,8 +1410,10 @@ class Subscription(queue:Queue) extends 
    */
   def rewind(value:QueueEntry):Unit = {
     assert(value!=null)
+    pos -= this
+    value ::= this
     pos = value
-    session.refiller = pos
+    session.refiller = value
     queue.dispatchQueue << value // queue up the entry to get dispatched..
   }
 



Mime
View raw message