activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1399302 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Date Wed, 17 Oct 2012 15:16:01 GMT
Author: dejanb
Date: Wed Oct 17 15:16:01 2012
New Revision: 1399302

URL: http://svn.apache.org/viewvc?rev=1399302&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4108 - master broker advisory topic needs to be
retroactive

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Oct 17 15:16:01 2012
@@ -27,10 +27,12 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
@@ -84,7 +86,12 @@ public class Topic extends BaseDestinati
         super(brokerService, store, destination, parentStats);
         this.topicStore = store;
         // set default subscription recovery policy
-        subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
+            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+            setAlwaysRetroactive(true);
+        } else {
+            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+        }
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=1399302&r1=1399301&r2=1399302&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Wed Oct 17 15:16:01 2012
@@ -21,11 +21,13 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -132,4 +134,14 @@ public class QueueMasterSlaveTest extend
         assertNotNull("Get message after failover", message);
         assertEquals("correct message", text, ((TextMessage)message).getText());
     }
+
+    public void testAdvisory() throws Exception {
+        MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+
+        master.stop();
+        assertTrue("slave started", slaveStarted.await(15, TimeUnit.SECONDS));
+        Message advisoryMessage = advConsumer.receive(5000);
+        assertNotNull("Didn't received advisory", advisoryMessage);
+
+    }
 }



Mime
View raw message