activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1195045 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQConnection.java test/java/org/apache/activemq/bugs/AMQ3529Test.java
Date Sat, 29 Oct 2011 21:42:50 GMT
Author: tabish
Date: Sat Oct 29 21:42:49 2011
New Revision: 1195045

URL: http://svn.apache.org/viewvc?rev=1195045&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3547

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1195045&r1=1195044&r2=1195045&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Sat Oct 29 21:42:49 2011
@@ -107,7 +107,6 @@ public class ActiveMQConnection implemen
     public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
-    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
 
     public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>
activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
 
@@ -599,7 +598,12 @@ public class ActiveMQConnection implemen
      *                 cause this exception to be thrown.
      */
     public void close() throws JMSException {
+        // Store the interrupted state and clear so that cleanup happens without
+        // leaking connection resources.  Reset in finally to preserve state.
+        boolean interrupted = Thread.interrupted();
+
         try {
+
             // If we were running, lets stop first.
             if (!closed.get() && !transportFailed.get()) {
                 stop();
@@ -663,8 +667,6 @@ public class ActiveMQConnection implemen
                         doAsyncSendPacket(new ShutdownInfo());
                     }
 
-                    ServiceSupport.dispose(this.transport);
-
                     started.set(false);
 
                     // TODO if we move the TaskRunnerFactory to the connection
@@ -680,13 +682,19 @@ public class ActiveMQConnection implemen
             }
         } finally {
             try {
-                if (executor != null){
+                if (executor != null) {
                     executor.shutdown();
                 }
-            }catch(Throwable e) {
-                LOG.error("Error shutting down thread pool " + e,e);
+            } catch (Throwable e) {
+                LOG.error("Error shutting down thread pool " + e, e);
             }
+
+            ServiceSupport.dispose(this.transport);
+
             factoryStats.removeConnection(this);
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
@@ -2426,8 +2434,8 @@ public class ActiveMQConnection implemen
             ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>
entry = entries.next();
             try {
                 // Only delete this temp destination if it was created from this connection.
The connection used
-                // for the advisory consumer may also have a reference to this temp destination.

-                ActiveMQTempDestination dest = entry.getValue();                        
    
+                // for the advisory consumer may also have a reference to this temp destination.
+                ActiveMQTempDestination dest = entry.getValue();
                 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
                 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId))
{
                     this.deleteTempDestination(entry.getValue());

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java?rev=1195045&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java Sat
Oct 29 21:42:49 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3529Test {
+
+	private ConnectionFactory connectionFactory;
+	private Connection connection;
+	private Session session;
+	private BrokerService broker;
+	private String connectionUri;
+
+	@Before
+	public void startBroker() throws Exception {
+		broker = new BrokerService();
+		broker.setDeleteAllMessagesOnStartup(true);
+		broker.setPersistent(false);
+		broker.setUseJmx(false);
+		broker.addConnector("tcp://0.0.0.0:0");
+		broker.start();
+		broker.waitUntilStarted();
+
+		connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+		connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+	}
+
+	@After
+	public void stopBroker() throws Exception {
+		broker.stop();
+		broker.waitUntilStopped();
+	}
+
+	@Test(timeout = 60000)
+	public void testInterruptionAffects() throws Exception {
+		ThreadGroup tg = new ThreadGroup("tg");
+
+		assertEquals(0, tg.activeCount());
+
+		Thread client = new Thread(tg, "client") {
+
+			@Override
+			public void run() {
+				try {
+					connection = connectionFactory.createConnection();
+					session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+					assertNotNull(session);
+				} catch (JMSException e) {
+					fail(e.getMessage());
+				}
+				// next line is the nature of the test, if I remove this line, everything works OK
+				Thread.currentThread().interrupt();
+				try {
+					connection.close();
+				} catch (JMSException e) {
+				}
+			}
+		};
+		client.start();
+		client.join();
+		Thread.sleep(2000);
+		Thread[] remainThreads = new Thread[tg.activeCount()];
+		tg.enumerate(remainThreads);
+		for (Thread t : remainThreads) {
+			if (t.isAlive() && !t.isDaemon())
+				fail("Remaining thread: " + t.toString());
+		}
+	}
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message