activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1131161 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Date Fri, 03 Jun 2011 19:39:32 GMT
Author: tabish
Date: Fri Jun  3 19:39:32 2011
New Revision: 1131161

URL: http://svn.apache.org/viewvc?rev=1131161&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30

Add support for sending the Queue Browse end MessageDispatch command.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1131161&r1=1131160&r2=1131161&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Fri Jun  3 19:39:32 2011
@@ -17,14 +17,12 @@
 
 package org.apache.activemq.apollo.openwire
 
-import dto.OpenwireConnectionStatusDTO
 import OpenwireConstants._
 
 import org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
 import collection.mutable.{ListBuffer, HashMap}
 
-import AsciiBuffer._
 import org.apache.activemq.apollo.broker._
 import BufferConversions._
 import java.io.IOException
@@ -533,14 +531,43 @@ class OpenwireProtocolHandler extends Pr
 
       def producer = p
       def consumer = ConsumerContext.this
+      var closed = false
 
       val outbound_session = outbound_sessions.open(producer.dispatch_queue)
 
       def downstream = outbound_session
 
       def close = {
-        outbound_sessions.close(outbound_session)
-        release
+
+        assert(producer.dispatch_queue.isExecuting)
+        if( !closed ) {
+          closed = true
+          if( browser ) {
+            // Then send the end of browse message.
+            var dispatch = new MessageDispatch
+            dispatch.setConsumerId(this.consumer.info.getConsumerId)
+            dispatch.setMessage(null)
+            dispatch.setDestination(null)
+
+            if( outbound_session.full ) {
+              // session is full so use an overflow sink so to hold the message,
+              // and then trigger closing the session once it empties out.
+              val sink = new OverflowSink(outbound_session)
+              sink.refiller = ^{
+                outbound_sessions.close(outbound_session)
+                release
+              }
+              sink.offer(dispatch)
+            } else {
+              outbound_session.offer(dispatch)
+              outbound_sessions.close(outbound_session)
+              release
+            }
+          } else {
+            outbound_sessions.close(outbound_session)
+            release
+          }
+        }
       }
 
       def remaining_capacity = outbound_session.remaining_capacity



Mime
View raw message