activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1509291 - /activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Date Thu, 01 Aug 2013 15:25:23 GMT
Author: tabish
Date: Thu Aug  1 15:25:23 2013
New Revision: 1509291

URL: http://svn.apache.org/r1509291
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-4651

ensure consumers get removed from the session on close. 

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1509291&r1=1509290&r2=1509291&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Thu Aug  1 15:25:23 2013
@@ -179,7 +179,7 @@ class AmqpProtocolConverter {
         long nextProducerId = 0;
         long nextConsumerId = 0;
 
-        final LinkedList<ConsumerContext> consumers = new LinkedList<ConsumerContext>();
+        final Map<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId,
ConsumerContext>();
 
         public AmqpSessionContext(ConnectionId connectionId, long id) {
             sessionId = new SessionId(connectionId, id);
@@ -656,7 +656,7 @@ class AmqpProtocolConverter {
                 }
 
                 AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
-                for (ConsumerContext consumer : context.consumers) {
+                for (ConsumerContext consumer : context.consumers.values()) {
                     if (operation == TransactionInfo.ROLLBACK) {
                         consumer.doRollback();
                     } else {
@@ -682,7 +682,7 @@ class AmqpProtocolConverter {
                     }
                 });
 
-                for (ConsumerContext consumer : context.consumers) {
+                for (ConsumerContext consumer : context.consumers.values()) {
                     if (operation == TransactionInfo.ROLLBACK) {
                         consumer.pumpOutbound();
                     }
@@ -815,6 +815,12 @@ class AmqpProtocolConverter {
         public void onClose() throws Exception {
             if (!closed) {
                 closed = true;
+
+                AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
+                if (session != null) {
+                    session.consumers.remove(info.getConsumerId());
+                }
+
                 sendToActiveMQ(new RemoveInfo(consumerId), null);
             }
         }
@@ -1188,7 +1194,7 @@ class AmqpProtocolConverter {
                         subscriptionsByConsumerId.remove(id);
                         sender.close();
                     } else {
-                        sessionContext.consumers.add(consumerContext);
+                        sessionContext.consumers.put(id, consumerContext);
                         sender.open();
                     }
                     pumpProtonToSocket();



Mime
View raw message