activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r669512 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Date Thu, 19 Jun 2008 15:37:04 GMT
Author: rajdavies
Date: Thu Jun 19 08:37:03 2008
New Revision: 669512

URL: http://svn.apache.org/viewvc?rev=669512&view=rev
Log:
apply fix for https://issues.apache.org/activemq/browse/AMQ-1810

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=669512&r1=669511&r2=669512&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Jun 19 08:37:03 2008
@@ -183,6 +183,7 @@
     private long timeCreated;
     private ConnectionAudit connectionAudit = new ConnectionAudit();
     private DestinationSource destinationSource;
+    private final Object ensureConnectionInfoSentMutex = new Object();
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -524,9 +525,11 @@
     public void stop() throws JMSException {
         checkClosedOrFailed();
         if (started.compareAndSet(true, false)) {
-            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
-                ActiveMQSession s = i.next();
-                s.stop();
+            synchronized(sessions) {
+                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();)
{
+                    ActiveMQSession s = i.next();
+                    s.stop();
+                }
             }
         }
     }
@@ -577,7 +580,9 @@
     public void close() throws JMSException {
         try {
             // If we were running, lets stop first.
-            stop();
+            if (!closed.get() && !transportFailed.get()) {
+                stop();
+            }
 
             synchronized (this) {
                 if (!closed.get()) {
@@ -626,16 +631,18 @@
                     // then we may need to call
                     // factory.onConnectionClose(this);
                     sessionTaskRunner.shutdown();
-                    
-                    if (asyncConnectionThread != null){
-                    	asyncConnectionThread.shutdown();
-                    }
-
                     closed.set(true);
                     closing.set(false);
                 }
             }
         } finally {
+            try {
+                if (asyncConnectionThread != null){
+                    asyncConnectionThread.shutdown();
+                }
+            }catch(Throwable e) {
+                LOG.error("Error shutting down thread pool " + e,e);
+            }
             factoryStats.removeConnection(this);
         }
     }
@@ -1226,6 +1233,9 @@
                     if (er.getException() instanceof JMSException) {
                         throw (JMSException)er.getException();
                     } else {
+                        if (isClosed()||closing.get()) {
+                            LOG.debug("Received an exception but connection is closing");
+                        }
                         JMSException jmsEx = null;
                         try {
                          jmsEx = JMSExceptionSupport.create(er.getException());
@@ -1313,25 +1323,27 @@
      * 
      * @throws JMSException
      */
-    protected synchronized void ensureConnectionInfoSent() throws JMSException {
-        // Can we skip sending the ConnectionInfo packet??
-        if (isConnectionInfoSentToBroker || closed.get()) {
-            return;
-        }
-
-        if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
-            info.setClientId(clientIdGenerator.generateId());
-        }
-        syncSendPacket(info);
-
-        this.isConnectionInfoSentToBroker = true;
-        // Add a temp destination advisory consumer so that
-        // We know what the valid temporary destinations are on the
-        // broker without having to do an RPC to the broker.
-
-        ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),
consumerIdGenerator.getNextSequenceId());
-        if (watchTopicAdvisories) {
-            advisoryConsumer = new AdvisoryConsumer(this, consumerId);
+    protected void ensureConnectionInfoSent() throws JMSException {
+        synchronized(this.ensureConnectionInfoSentMutex) {
+            // Can we skip sending the ConnectionInfo packet??
+            if (isConnectionInfoSentToBroker || closed.get()) {
+                return;
+            }
+    
+            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
+                info.setClientId(clientIdGenerator.generateId());
+            }
+            syncSendPacket(info);
+    
+            this.isConnectionInfoSentToBroker = true;
+            // Add a temp destination advisory consumer so that
+            // We know what the valid temporary destinations are on the
+            // broker without having to do an RPC to the broker.
+    
+            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(),
-1), consumerIdGenerator.getNextSequenceId());
+            if (watchTopicAdvisories) {
+                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
+            }
         }
     }
 



Mime
View raw message