activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1195278 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/bugs/
Date Sun, 30 Oct 2011 23:27:52 GMT
Author: tabish
Date: Sun Oct 30 23:27:52 2011
New Revision: 1195278

URL: http://svn.apache.org/viewvc?rev=1195278&view=rev
Log:
additional fixes for: https://issues.apache.org/jira/browse/AMQ-3547

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1195278&r1=1195277&r2=1195278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Sun Oct 30 23:27:52 2011
@@ -670,6 +670,9 @@ public class ActiveMQMessageConsumer imp
     }
 
     void doClose() throws JMSException {
+        // Store interrupted state and clear so that Transport operations don't
+        // throw InterruptedException and we ensure that resources are clened up.
+        boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
         if (LOG.isDebugEnabled()) {
@@ -677,7 +680,9 @@ public class ActiveMQMessageConsumer imp
         }
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
-    }
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }    }
 
     void inProgressClearRequired() {
         inProgressClearRequiredFlag = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1195278&r1=1195277&r2=1195278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sun
Oct 30 23:27:52 2011
@@ -637,10 +637,14 @@ public class ActiveMQSession implements 
     }
 
     private void doClose() throws JMSException {
+        boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         connection.asyncSendPacket(removeCommand);
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     void clearMessagesInProgress() {
@@ -1963,7 +1967,7 @@ public class ActiveMQSession implements 
         this.blobTransferPolicy = blobTransferPolicy;
     }
 
-    public List getUnconsumedMessages() {
+    public List<MessageDispatch> getUnconsumedMessages() {
         return executor.getUnconsumedMessages();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=1195278&r1=1195277&r2=1195278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Sun Oct 30 23:27:52 2011
@@ -30,8 +30,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A utility class used by the Session for dispatching messages asynchronously
  * to consumers
- * 
- * 
+ *
  * @see javax.jms.Session
  */
 public class ActiveMQSessionExecutor implements Task {
@@ -125,9 +124,7 @@ public class ActiveMQSessionExecutor imp
     }
 
     void dispatch(MessageDispatch message) {
-
         // TODO - we should use a Map for this indexed by consumerId
-
         for (ActiveMQMessageConsumer consumer : this.session.consumers) {
             ConsumerId consumerId = message.getConsumerId();
             if (consumerId.equals(consumer.getConsumerId())) {
@@ -207,8 +204,7 @@ public class ActiveMQSessionExecutor imp
         }
     }
 
-    List getUnconsumedMessages() {
+    List<MessageDispatch> getUnconsumedMessages() {
         return messageQueue.removeAll();
     }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java?rev=1195278&r1=1195277&r2=1195278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java Sun
Oct 30 23:27:52 2011
@@ -6,36 +6,52 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.*;
+import java.util.Properties;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
 import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQ3529Test {
 
+    private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class);
+
     private ConnectionFactory connectionFactory;
     private Connection connection;
     private Session session;
     private BrokerService broker;
     private String connectionUri;
+    private MessageConsumer consumer;
+    private Context ctx = null;
 
     @Before
     public void startBroker() throws Exception {
@@ -72,20 +88,52 @@ public class AMQ3529Test {
                     connection = connectionFactory.createConnection();
                     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                     assertNotNull(session);
-                } catch (JMSException e) {
-                    fail(e.getMessage());
-                }
-                // next line is the nature of the test, if I remove this line, everything
works OK
-                Thread.currentThread().interrupt();
-                try {
-                    connection.close();
-                } catch (JMSException e) {
-                }
 
-                assertTrue(Thread.currentThread().isInterrupted());
+                    Properties props = new Properties();
+                    props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+                    props.setProperty(Context.PROVIDER_URL, "tcp://0.0.0.0:0");
+                    ctx = null;
+                    try {
+                        ctx = new InitialContext(props);
+                    } catch (NoClassDefFoundError e) {
+                        throw new NamingException(e.toString());
+                    } catch (Exception e) {
+                        throw new NamingException(e.toString());
+                    }
+                    Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
+                    consumer = session.createConsumer(destination);
+                    consumer.receive(10000);
+                } catch (Exception e) {
+                    // Expect an exception here from the interrupt.
+                } finally {
+                    // next line is the nature of the test, if I remove this
+                    // line, everything works OK
+                    try {
+                        consumer.close();
+                    } catch (JMSException e) {
+                        fail("Consumer Close failed with" + e.getMessage());
+                    }
+                    try {
+                        session.close();
+                    } catch (JMSException e) {
+                        fail("Session Close failed with" + e.getMessage());
+                    }
+                    try {
+                        connection.close();
+                    } catch (JMSException e) {
+                        fail("Connection Close failed with" + e.getMessage());
+                    }
+                    try {
+                        ctx.close();
+                    } catch (Exception e) {
+                        fail("Connection Close failed with" + e.getMessage());
+                    }
+                }
             }
         };
         client.start();
+        Thread.sleep(5000);
+        client.interrupt();
         client.join();
         Thread.sleep(2000);
         Thread[] remainThreads = new Thread[tg.activeCount()];
@@ -94,5 +142,36 @@ public class AMQ3529Test {
             if (t.isAlive() && !t.isDaemon())
                 fail("Remaining thread: " + t.toString());
         }
+
+        ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
+        while (root.getParent() != null) {
+            root = root.getParent();
+        }
+        visit(root, 0);
+    }
+
+    // This method recursively visits all thread groups under `group'.
+    public static void visit(ThreadGroup group, int level) {
+        // Get threads in `group'
+        int numThreads = group.activeCount();
+        Thread[] threads = new Thread[numThreads * 2];
+        numThreads = group.enumerate(threads, false);
+
+        // Enumerate each thread in `group'
+        for (int i = 0; i < numThreads; i++) {
+            // Get thread
+            Thread thread = threads[i];
+            LOG.debug("Thread:" + thread.getName() + " is still running");
+        }
+
+        // Get thread subgroups of `group'
+        int numGroups = group.activeGroupCount();
+        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
+        numGroups = group.enumerate(groups, false);
+
+        // Recursively visit each subgroup
+        for (int i = 0; i < numGroups; i++) {
+            visit(groups[i], level + 1);
+        }
     }
 }



Mime
View raw message