activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235399 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Tue, 24 Jan 2012 18:30:43 GMT
Author: chirino
Date: Tue Jan 24 18:30:42 2012
New Revision: 1235399

URL: http://svn.apache.org/viewvc?rev=1235399&view=rev
Log:
Removed stale debug messages and a couple if fixes to consumer shutdown.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1235399&r1=1235398&r2=1235399&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Jan 24 18:30:42 2012
@@ -763,8 +763,6 @@ class Queue(val router: LocalRouter, val
     // Combine swapped items into swapped ranges
     if( individual_swapped_items > tune_swap_range_size*2 ) {
 
-      debug("Looking for swapped entries to combine")
-
       var distance_from_sub = tune_swap_range_size;
       var cur = entries.getHead
       var combine_counter = 0;
@@ -792,7 +790,7 @@ class Queue(val router: LocalRouter, val
         }
         cur = next
       }
-      debug("combined %d entries", combine_counter)
+      trace("combined %d entries", combine_counter)
     }
     
     if(!messages.full) {
@@ -888,14 +886,17 @@ class Queue(val router: LocalRouter, val
       case (entry, consumed, uow) =>
         consumed match {
           case Consumed =>
+//            debug("ack consumed: ("+store_id+","+entry.entry.seq+")")
             entry.ack(uow)
           case Expired=>
+//            debug("ack expired: ("+store_id+","+entry.entry.seq+")")
             entry.entry.queue.expired(entry.entry, false)
             entry.ack(uow)
           case Delivered =>
             entry.entry.redelivered
             entry.nack
-          case Poisoned    =>
+          case Poisoned =>
+            // TODO: send to DLQ once that is supported.
             entry.entry.redelivered
             entry.nack
           case Undelivered =>
@@ -2203,7 +2204,7 @@ class Subscription(val queue:Queue, val 
     def ack(uow:StoreUOW):Unit = {
       assert_executing
       if(!isLinked) {
-        debug("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
+        debug("Unexpected ack: message seq allready acked: "+entry.seq)
         return
       }
 
@@ -2241,7 +2242,7 @@ class Subscription(val queue:Queue, val 
     def nack:Unit = {
       assert_executing
       if(!isLinked) {
-        warn("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
+        debug("Unexpected nack: message seq allready acked: "+entry.seq)
         return
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1235399&r1=1235398&r2=1235399&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Tue Jan 24 18:30:42 2012
@@ -196,16 +196,20 @@ class SinkMux[T](val downstream:Sink[T])
     var rejection_handler:(T)=>Unit = _
     var refiller:Runnable = NOOP
 
+    def full = downstream.full && rejection_handler==null
+
     def offer(value: T) = {
       if ( full ) {
         false
       } else {
-        val accepted = downstream.offer(value)
-        assert(accepted)
-        true
+        if( rejection_handler!=null ) {
+          rejection_handler(value)
+          true
+        } else {
+          downstream.offer(value)
+        }
       }
     }
-    def full = downstream.full && rejection_handler==null
   }
   
   def open():Sink[T] = {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1235399&r1=1235398&r2=1235399&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Tue Jan 24 18:30:42 2012
@@ -799,7 +799,7 @@ class OpenwireProtocolHandler extends Pr
       ack_handler.close
       super.dispose()
       sink_manager.close(consumer_sink,(frame)=>{
-        debug("Got command after consumer was disposed.");
+        // No point in sending the frame down to the socket..
       })
 
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1235399&r1=1235398&r2=1235399&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Jan 24 18:30:42 2012
@@ -426,7 +426,7 @@ class StompProtocolHandler extends Proto
       ack_handler.close
       credit_window_filter.disable
       sink_manager.close(consumer_sink, (frame)=>{
-        debug("Got frame after consumer was disposed.");
+        // No point in sending the frame down to the socket..
       })
       super.dispose()
     }



Mime
View raw message