activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1044465 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Date Fri, 10 Dec 2010 18:14:06 GMT
Author: dejanb
Date: Fri Dec 10 18:14:05 2010
New Revision: 1044465

URL: http://svn.apache.org/viewvc?rev=1044465&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3081 - Durable subscriptions are not removed from
mbean

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1044465&r1=1044464&r2=1044465&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Fri Dec 10 18:14:05 2010
@@ -266,6 +266,7 @@ public class ManagedRegionBroker extends
                 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
                 if (inactiveName != null) {
                     inactiveDurableTopicSubscribers.remove(inactiveName);
+                    managementContext.unregisterMBean(inactiveName);
                 }
             } catch (Exception e) {
                 LOG.error("Failed to unregister subscription " + sub, e);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1044465&r1=1044464&r2=1044465&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
Fri Dec 10 18:14:05 2010
@@ -21,16 +21,20 @@ import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 
 import javax.jms.Connection;
 import javax.jms.Session;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
 import java.io.File;
 import java.lang.management.ManagementFactory;
+import java.util.List;
+
 
 public class DurableSubscriptionUnsubscribeTest extends TestSupport {
 
@@ -63,142 +67,100 @@ public class DurableSubscriptionUnsubscr
     }
 
     public void doJMXUnsubscribe(boolean restart) throws Exception {
-        for (int i = 0; i < 100; i++) {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId" + i);
-            session.close();
-        }
+        createSubscriptions();
 
-        Thread.sleep(2 * 1000);
+        Thread.sleep(1000);
+        assertCount(100, 0);
 
         if (restart) {
-            stopBroker();
-            startBroker(false);
+            restartBroker();
+            assertCount(100, 0);
         }
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-        ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers();
+        ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
 
-        for (ObjectName subscription: subscriptions) {
-            mbs.invoke(subscription, "destroy", null, null);
-        }
-        for (ObjectName subscription: inactive) {
-            mbs.invoke(subscription, "destroy", null, null);
-        }
+        for (int i = 0; i < subs.length; i++) {
+            ObjectName sub = subs[i];
+            mbs.invoke(sub, "destroy", null, null);
 
-        Thread.sleep(2 * 1000);
+            if (i % 20 == 0) {
+                Thread.sleep(1000);
+                assertCount(100 - i - 1, 0);
+            }
+        }
 
-        subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        Thread.sleep(1000);
+        assertCount(0, 0);
 
-        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        if (restart) {
+            restartBroker();
+            assertCount(0, 0);
+        }
     }
 
-    public void testInactiveSubscriptions() throws Exception {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId");
-
-            ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-            assertEquals(1, subscriptions.length);
-
-            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
-
-            session.close();
-
-            Thread.sleep(1000);
-
-            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
-
-            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-            assertEquals(1, subscriptions.length);
-
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId");
-
-            Thread.sleep(1000);
-
-            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-            assertEquals(1, subscriptions.length);
-
-            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
-
-            session.close();
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    public void doConnectionUnsubscribe(boolean restart) throws Exception {
+        createSubscriptions();
 
-            Thread.sleep(1000);
+        Thread.sleep(1000);
+        assertCount(100, 0);
 
-            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId1");
 
-            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-            assertEquals(1, subscriptions.length);
+        Thread.sleep(1000);
+        assertCount(100, 1);
 
-            session.unsubscribe("SubsId");
+        Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2.createDurableSubscriber(topic, "SubsId2");
 
-            Thread.sleep(1000);
+        Thread.sleep(1000);
+        assertCount(100, 2);
 
-            subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
+        session.close();
 
-            subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-            assertEquals(0, subscriptions.length);
+        Thread.sleep(1000);
+        assertCount(100, 1);
 
-            session.close();
+        session2.close();
 
-    }
-
-    public void doConnectionUnsubscribe(boolean restart) throws Exception {
-        for (int i = 0; i < 100; i++) {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId" + i);
-            session.close();
-        }
-
-        Thread.sleep(2 * 1000);
+        Thread.sleep(1000);
+        assertCount(100, 0);
 
         if (restart) {
-            stopBroker();
-            startBroker(false);
+            restartBroker();
+            assertCount(100, 0);
         }
 
-        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
-
-        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-        assertEquals(100, subscriptions.length);
-
         for (int i = 0; i < 100; i++) {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             session.unsubscribe("SubsId" + i);
             session.close();
-        }
 
-        Thread.sleep(2 * 1000);
+            if (i % 20 == 0) {
+                Thread.sleep(1000);
+                assertCount(100 - i - 1, 0);
+            }
+        }
 
-        subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        Thread.sleep(1000);
+        assertCount(0, 0);
 
-        subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        if (restart) {
+            restartBroker();
+            assertCount(0, 0);
+        }
     }
 
     public void doDirectUnsubscribe(boolean restart) throws Exception {
-        for (int i = 0; i < 100; i++) {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            session.createDurableSubscriber(topic, "SubsId" + i);
-            session.close();
-        }
+        createSubscriptions();
 
-        Thread.sleep(2 * 1000);
+        Thread.sleep(1000);
+        assertCount(100, 0);
 
         if (restart) {
-            stopBroker();
-            startBroker(false);
+            restartBroker();
+            assertCount(100, 0);
         }
 
         for (int i = 0; i < 100; i++) {
@@ -209,19 +171,78 @@ public class DurableSubscriptionUnsubscr
             context.setBroker(broker.getRegionBroker());
             context.setClientId(getName());
             broker.getRegionBroker().removeSubscription(context, info);
+
+            if (i % 20 == 0) {
+                assertCount(100 - i - 1, 0);
+            }
         }
 
-        Thread.sleep(2 * 1000);
+        assertCount(0, 0);
 
-        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        if (restart) {
+            restartBroker();
+            assertCount(0, 0);
+        }
+    }
+
+    private void createSubscriptions() throws Exception {
+        for (int i = 0; i < 100; i++) {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            session.createDurableSubscriber(topic, "SubsId" + i);
+            session.close();
+        }        
+    }
+
+
+    private void assertCount(int all, int active) throws Exception {
+        int inactive = all - active;
 
+        // broker check
+        Destination destination = broker.getDestination(topic);
+        List<Subscription> subs = destination.getConsumers();
+        int cActive = 0, cInactive = 0;
+        for (Subscription sub: subs) {
+            if (sub instanceof DurableTopicSubscription) {
+                DurableTopicSubscription durable = (DurableTopicSubscription) sub;
+                if (durable.isActive())
+                    cActive++;
+                else
+                    cInactive++;
+            }
+        }
+        assertEquals(active, cActive);
+        assertEquals(inactive, cInactive);
+
+        // admin view
+        ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(active, subscriptions.length);
         subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
-        assertEquals(0, subscriptions.length);
+        assertEquals(inactive, subscriptions.length);
+
+        // check the strange false MBean
+        if (all == 0)
+            assertEquals(0, countMBean());
+    }
+
+    private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException
{
+        int count = 0;
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        for (int i = 0; i < 100; i++) {
+            String name = "org.apache.activemq:BrokerName=" + getName() + ",Type=Subscription,active=false,name="
+ getName() + "_SubsId" + i;
+            ObjectName sub = new ObjectName(name);
+            try {
+                ObjectInstance oi = mbs.getObjectInstance(sub);
+                count++;
+            }
+            catch (InstanceNotFoundException ignore) {
+                // this should happen
+            }
+        }
+        return count;
     }
 
     private void startBroker(boolean deleteMessages) throws Exception {
-        broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+        broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
         broker.setUseJmx(true);
         broker.setBrokerName(getName());
 
@@ -233,7 +254,11 @@ public class DurableSubscriptionUnsubscr
             broker.setDeleteAllMessagesOnStartup(true);
         }
 
+
+        broker.setKeepDurableSubsActive(true);
+
         broker.start();
+        broker.waitUntilStarted();
 
         connection = createConnection();
     }
@@ -243,11 +268,18 @@ public class DurableSubscriptionUnsubscr
             connection.close();
         connection = null;
 
-        if (broker != null)
+        if (broker != null) {
             broker.stop();
+            broker.waitUntilStopped();
+        }
         broker = null;
     }
 
+    private void restartBroker() throws Exception {
+        stopBroker();
+        startBroker(false);
+    }
+
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
     }
@@ -273,4 +305,4 @@ public class DurableSubscriptionUnsubscr
         rc.start();
         return rc;
     }
-}
+}
\ No newline at end of file



Mime
View raw message