activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r789283 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Date Mon, 29 Jun 2009 10:41:31 GMT
Author: dejanb
Date: Mon Jun 29 10:41:30 2009
New Revision: 789283

URL: http://svn.apache.org/viewvc?rev=789283&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2303 - durable subsciber recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Jun 29 10:41:30 2009
@@ -117,7 +117,6 @@
 
     public void addSubscription(ConnectionContext context, final Subscription sub) throws
Exception {
 
-        sub.add(context, this);
         destinationStatistics.getConsumers().increment();
 
         if (!sub.getConsumerInfo().isDurable()) {
@@ -133,6 +132,7 @@
                 try {
 
                     synchronized (consumers) {
+                        sub.add(context, this);
                         consumers.add(sub);
                     }
                     subscriptionRecoveryPolicy.recover(context, this, sub);
@@ -143,10 +143,12 @@
 
             } else {
                 synchronized (consumers) {
+                    sub.add(context, this);
                     consumers.add(sub);
                 }
             }
         } else {
+            sub.add(context, this);
             DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
             durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
         }
@@ -178,11 +180,7 @@
         // we are recovering a subscription to avoid out of order messages.
         dispatchValve.turnOff();
         try {
-
-            synchronized (consumers) {
-                consumers.add(subscription);
-            }
-
+        	
             if (topicStore == null) {
                 return;
             }
@@ -199,6 +197,10 @@
                     // Need to delete the subscription
                     topicStore.deleteSubscription(clientId, subscriptionName);
                     info = null;
+                } else {
+                    synchronized (consumers) {
+                        consumers.add(subscription);
+                    }
                 }
             }
             // Do we need to create the subscription?
@@ -208,11 +210,15 @@
                 info.setSelector(selector);
                 info.setSubscriptionName(subscriptionName);
                 info.setDestination(getActiveMQDestination()); 
-                // Thi destination is an actual destination id.
+                // This destination is an actual destination id.
                 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());

                 // This destination might be a pattern
-                topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+                synchronized (consumers) {
+                    consumers.add(subscription);
+                    topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+                }
             }
+            
 
             final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
             msgContext.setDestination(destination);
@@ -244,7 +250,6 @@
                     }
                 });
             }
-
         } finally {
             dispatchValve.turnOn();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Mon Jun 29 10:41:30 2009
@@ -565,7 +565,6 @@
      */
     private void recover() throws IllegalStateException, IOException {
         referenceStoreAdapter.clearMessages();
-        referenceStoreAdapter.recoverState();
         Location pos = null;
         int redoCounter = 0;
         LOG.info("Journal Recovery Started from: " + asyncDataManager);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=789283&r1=789282&r2=789283&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Mon Jun 29 10:41:30 2009
@@ -136,11 +136,11 @@
 
             topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
             try {
-            topic = new ActiveMQTopic(TOPIC_NAME);
-            topicConnection = topicConnectionFactory.createTopicConnection();
-            topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            topicPublisher = topicSession.createPublisher(topic);
-            message = topicSession.createMessage();
+            	topic = new ActiveMQTopic(TOPIC_NAME);
+            	topicConnection = topicConnectionFactory.createTopicConnection();
+            	topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            	topicPublisher = topicSession.createPublisher(topic);
+            	message = topicSession.createMessage();
             } catch( Exception ex ) {
             	exceptions.add(ex);
             }
@@ -174,13 +174,12 @@
             } );
             thread.start();
             
-            LOG.info( "subscribed " + i + " of 100" );
         }
 
         Thread.sleep(5000);
         broker.stop();
         broker = createBroker(false);
-        Thread.sleep(5000);
+        Thread.sleep(10000);
         assertEquals(0, exceptions.size());
     }
   



Mime
View raw message