activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1487674 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala QueueEntry.scala
Date Wed, 29 May 2013 23:18:49 GMT
Author: chirino
Date: Wed May 29 23:18:48 2013
New Revision: 1487674

URL: http://svn.apache.org/r1487674
Log:
Reduce the number of dispatch sources used by a queue.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1487674&r1=1487673&r2=1487674&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed May 29 23:18:48 2013
@@ -31,6 +31,7 @@ import org.apache.activemq.apollo.dto._
 import java.util.regex.Pattern
 import collection.mutable.ListBuffer
 import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.{DeliveryResult, Subscription}
 
 object Queue extends Log {
   val subscription_counter = new AtomicInteger(0)
@@ -103,14 +104,6 @@ class Queue(val router: LocalRouter, val
 
   debug("created queue: " + id)
 
-  override def dispose: Unit = {
-    ack_source.cancel
-  }
-
-  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry,
DeliveryResult, StoreUOW)](), dispatch_queue)
-  ack_source.setEventHandler(^ {drain_acks});
-  ack_source.resume
-
   val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery,
Integer.MAX_VALUE, 1024*640) {
     override def time_stamp = now
   }
@@ -217,9 +210,17 @@ class Queue(val router: LocalRouter, val
 
   var individual_swapped_items = 0
 
-  val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
-  swap_source.setEventHandler(^{ swap_messages });
-  swap_source.resume
+  var swap_triggered = false
+  def trigger_swap = {
+    dispatch_queue.assertExecuting()
+    if( tune_swap && !swap_triggered ) {
+      swap_triggered = true
+      defer {
+        swap_triggered = false
+        swap_messages
+      }
+    }
+  }
 
   var restored_from_store = false
 
@@ -825,12 +826,6 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def trigger_swap = {
-    if( tune_swap ) {
-      swap_source.merge(1)
-    }
-  }
-
   var keep_up_delivery_rate = 0L
   
   def swap_messages:Unit = {
@@ -1168,40 +1163,38 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def drain_acks = might_unfill {
-    val end = System.nanoTime()
-    ack_source.getData.foreach {
-      case (entry, consumed, uow) =>
-        consumed match {
-          case Consumed =>
-            entry.ack(uow)
-          case Expired=>
-            val actual = create_uow(uow)
-            expired(actual, entry.entry) {
-              entry.ack(actual)
+  def process_ack(entry:Subscription#AcquiredQueueEntry, consumed:DeliveryResult, uow:StoreUOW)
= defer {
+    might_unfill {
+      consumed match {
+        case Consumed =>
+          entry.ack(uow)
+        case Expired=>
+          val actual = create_uow(uow)
+          expired(actual, entry.entry) {
+            entry.ack(actual)
+          }
+          actual.release
+        case Delivered =>
+          entry.increment_nack
+          entry.entry.redelivered
+          entry.nack
+        case Undelivered =>
+          entry.nack
+        case Poisoned =>
+          entry.increment_nack
+          entry.entry.redelivered
+          var limit = dlq_nak_limit
+          if( limit>0 && entry.entry.redelivery_count >= limit ) {
+            dead_letter(uow, entry.entry) { uow =>
+              entry.remove(uow)
             }
-            actual.release
-          case Delivered =>
-            entry.increment_nack
-            entry.entry.redelivered
-            entry.nack
-          case Undelivered =>
+          } else {
             entry.nack
-          case Poisoned =>
-            entry.increment_nack
-            entry.entry.redelivered
-            var limit = dlq_nak_limit
-            if( limit>0 && entry.entry.redelivery_count >= limit ) {
-              dead_letter(uow, entry.entry) { uow =>
-                entry.remove(uow)
-              }
-            } else {
-              entry.nack
-            }
-        }
-        if( uow!=null ) {
-          uow.release
-        }
+          }
+      }
+      if( uow!=null ) {
+        uow.release
+      }
     }
   }
 
@@ -1368,35 +1361,6 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
-  val swap_out_completes_source = createSource(new ListEventAggregator[Task](), dispatch_queue)
-  swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
-  swap_out_completes_source.resume
-
-  def drain_swap_out_completes() = might_unfill {
-    val data = swap_out_completes_source.getData
-    data.foreach { loaded =>
-      loaded.run()
-    }
-  }
-
-  val store_load_source = createSource(new ListEventAggregator[(QueueEntry#Swapped, MessageRecord)](),
dispatch_queue)
-  store_load_source.setEventHandler(^ {drain_store_loads});
-  store_load_source.resume
-
-
-  def drain_store_loads() = {
-    val data = store_load_source.getData
-    data.foreach { case (swapped,message_record) =>
-      swapped.swapped_in(message_record)
-    }
-
-    data.foreach { case (swapped,_) =>
-      if( swapped.entry.hasSubs ) {
-        swapped.entry.task.run
-      }
-    }
-  }
-
 }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1487674&r1=1487673&r2=1487674&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Wed May 29 23:18:48 2013
@@ -448,13 +448,13 @@ class QueueEntry(val queue:Queue, val se
         uow.enqueue(toQueueEntryRecord)
         queue.swapping_out_size+=size
         uow.on_flush { canceled =>
-          queue.swap_out_completes_source.merge(^{
+          queue.defer {
             this.swapped_out(!canceled)
             queue.swapping_out_size-=size
             if( queue.swapping_out_size==0 ) {
               queue.on_queue_flushed
             }
-          })
+          }
         }
       }
     }
@@ -671,7 +671,7 @@ class QueueEntry(val queue:Queue, val se
                   if( uow!=null ) {
                     uow.retain
                   }
-                  queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
+                  queue.process_ack(acquiredQueueEntry, consumed, uow)
                 }
 
                 val accepted = sub.offer(acquiredDelivery)
@@ -760,7 +760,12 @@ class QueueEntry(val queue:Queue, val se
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
-            queue.store_load_source.merge((this, delivery.get))
+            queue.defer {
+              swapped_in(delivery.get)
+              if( entry.hasSubs ) {
+                entry.task.run
+              }
+            }
           } else {
 
             warn("Queue '%s' detected store dropped message at seq: %d", queue.id, seq)



Mime
View raw message