activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1096642 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Date Tue, 26 Apr 2011 01:43:11 GMT
Author: chirino
Date: Tue Apr 26 01:43:11 2011
New Revision: 1096642

URL: http://svn.apache.org/viewvc?rev=1096642&view=rev
Log:
Delay shutting down on disconnect to allow the receipt to be sent back to the client.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1096642&r1=1096641&r2=1096642&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Tue Apr 26 01:43:11 2011
@@ -451,14 +451,8 @@ class StompProtocolHandler extends Proto
                 on_stomp_connect(frame.headers)
               case CONNECT =>
                 on_stomp_connect(frame.headers)
-              case DISCONNECT =>
-                send_receipt(frame.headers)
-                on_transport_disconnected
-                queue.after(die_delay, TimeUnit.MILLISECONDS) {
-                  connection.stop()
-                }
               case _ =>
-                die("Client must first send a connect frame");
+                die("Expecting a STOMP or CONNECT frame, but got: "+frame.action.ascii);
             }
 
           } else {
@@ -481,7 +475,18 @@ class StompProtocolHandler extends Proto
                 on_stomp_nack(frame)
 
               case DISCONNECT =>
-                connection.stop
+
+                val delay = send_receipt(frame.headers)
+                on_transport_disconnected
+                if( delay ) {
+                  queue.after(die_delay, TimeUnit.MILLISECONDS) {
+                    connection.stop()
+                  }
+                } else {
+                  // no point in delaying the connection shutdown
+                  // if the client does not want a receipt..
+                  connection.stop()
+                }
 
               case _ =>
                 die("Invalid frame: "+frame.action);
@@ -860,6 +865,8 @@ class StompProtocolHandler extends Proto
     val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser,
exclusive);
     consumers += (id -> consumer)
 
+    def unit:Unit = {}
+
     reset {
       val rc = host.router.bind(destination, consumer, security_context)
       consumer.release
@@ -869,6 +876,7 @@ class StompProtocolHandler extends Proto
           async_die(reason)
         case _=>
           send_receipt(headers)
+          unit
       }
     }
 
@@ -989,13 +997,15 @@ class StompProtocolHandler extends Proto
   }
 
 
-  def send_receipt(headers:HeaderMap):Unit = {
+  def send_receipt(headers:HeaderMap):Boolean = {
     get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
         dispatchQueue <<| ^{
           connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
         }
+        true
       case None=>
+        false
     }
   }
 



Mime
View raw message