activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1213979 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Date Tue, 13 Dec 2011 23:13:35 GMT
Author: tabish
Date: Tue Dec 13 23:13:35 2011
New Revision: 1213979

URL: http://svn.apache.org/viewvc?rev=1213979&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3605

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1213979&r1=1213978&r2=1213979&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Dec 13 23:13:35 2011
@@ -445,7 +445,9 @@ public class TransportConnection impleme
 
     public Response processMessageAck(MessageAck ack) throws Exception {
         ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
-        broker.acknowledge(consumerExchange, ack);
+        if (consumerExchange != null) {
+            broker.acknowledge(consumerExchange, ack);
+        }
         return null;
     }
 
@@ -529,6 +531,7 @@ public class TransportConnection impleme
             broker.addConsumer(cs.getContext(), info);
             try {
                 ss.addConsumer(info);
+                addConsumerBrokerExchange(info.getConsumerId());
             } catch (IllegalStateException e) {
                 broker.removeConsumer(cs.getContext(), info);
             }
@@ -703,8 +706,7 @@ public class TransportConnection impleme
         TransportConnectionState cs = lookupConnectionState(id);
         if (cs != null) {
             // Don't allow things to be added to the connection state while we
-            // are
-            // shutting down.
+            // are shutting down.
             cs.shutdown();
             // Cascade the connection stop to the sessions.
             for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
@@ -757,7 +759,6 @@ public class TransportConnection impleme
     }
 
     public void dispatchSync(Command message) {
-        // getStatistics().getEnqueues().increment();
         try {
             processDispatch(message);
         } catch (IOException e) {
@@ -767,7 +768,6 @@ public class TransportConnection impleme
 
     public void dispatchAsync(Command message) {
         if (!stopping.get()) {
-            // getStatistics().getEnqueues().increment();
             if (taskRunner == null) {
                 dispatchSync(message);
             } else {
@@ -809,13 +809,12 @@ public class TransportConnection impleme
                     sub.run();
                 }
             }
-            // getStatistics().getDequeues().increment();
         }
     }
 
     public boolean iterate() {
         try {
-            if (stopping.get()) {
+            if (pendingStop || stopping.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -931,7 +930,6 @@ public class TransportConnection impleme
         }
     }
 
-
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
         synchronized (this) {
@@ -1328,6 +1326,11 @@ public class TransportConnection impleme
 
     private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
         ConsumerBrokerExchange result = consumerExchanges.get(id);
+        return result;
+    }
+
+    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
+        ConsumerBrokerExchange result = consumerExchanges.get(id);
         if (result == null) {
             synchronized (consumerExchanges) {
                 result = new ConsumerBrokerExchange();



Mime
View raw message