activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r376861 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Topic.java TopicRegion.java
Date Fri, 10 Feb 2006 21:14:55 GMT
Author: chirino
Date: Fri Feb 10 13:14:53 2006
New Revision: 376861

URL: http://svn.apache.org/viewcvs?rev=376861&view=rev
Log:
Fix for DurableConsumerCloseAndReconnectTest,
 - The eager loading of durable subs had broken the test a little..  fixed it up now.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=376861&r1=376860&r2=376861&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Feb 10 13:14:53 2006
@@ -149,7 +149,7 @@
 
             boolean persistenceWasOptimized = canOptimizeOutPersistence();
             if (initialActivation) {
-                synchronized(consumers) {                    
+                synchronized(consumers) {           
                     consumers.add(sub);
                     durableSubscriberCounter.incrementAndGet();
                 }
@@ -169,7 +169,7 @@
                         info = null;
                     }
                 }
-                // Do we need to crate the subscription?
+                // Do we need to create the subscription?
                 if (info == null) {
                     store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
                 }
@@ -222,9 +222,6 @@
         destinationStatistics.getConsumers().decrement();
         synchronized(consumers) {
             consumers.remove(sub);
-            if( sub.getConsumerInfo().isDurable() ) {
-                durableSubscriberCounter.decrementAndGet();
-            }
         }
         sub.remove(context, this);
     }
@@ -265,9 +262,14 @@
         return durableSubscriberCounter.get()==0;
     }
 
+    public void createSubscription(SubscriptionKey key) {
+        durableSubscriberCounter.incrementAndGet();
+    }
+    
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
IOException {
         if (store != null) {
             store.deleteSubscription(key.clientId, key.subscriptionName);
+            durableSubscriberCounter.decrementAndGet();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=376861&r1=376860&r2=376861&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Fri Feb 10 13:14:53 2006
@@ -59,6 +59,13 @@
 
     public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable
{
         if (info.isDurable()) {
+
+            ActiveMQDestination destination = info.getDestination();
+            if( !destination.isPattern() ) {
+                // Make sure the destination is created.
+                lookup(context, destination);
+            }
+
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
             if (sub != null) {
@@ -148,7 +155,7 @@
             SubscriptionInfo[] infos = store.getAllSubscriptions();
             for (int i = 0; i < infos.length; i++) {
                 log.info("Restoring durable subscription: "+infos[i]);
-                createDurableSubscription(infos[i]);
+                createDurableSubscription(topic, infos[i]);
             }            
         }
         
@@ -182,10 +189,12 @@
         }
     }
     
-    public Subscription createDurableSubscription(SubscriptionInfo info) throws JMSException
{
+    public Subscription createDurableSubscription(Topic topic, SubscriptionInfo info) throws
Throwable {
         SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
+        topic.createSubscription(key);
         DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
         sub = new DurableTopicSubscription(broker,info);
+        sub.add(null, topic);
         durableSubscriptions.put(key, sub);
         return sub;
     }



Mime
View raw message