activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r379767 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/network/ test/resources/org/apache/activemq/network/
Date Wed, 22 Feb 2006 13:28:07 GMT
Author: rajdavies
Date: Wed Feb 22 05:28:04 2006
New Revision: 379767

URL: http://svn.apache.org/viewcvs?rev=379767&view=rev
Log:
updated to test durable subscriptions get propagated

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=379767&r1=379766&r2=379767&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Feb 22 05:28:04 2006
@@ -329,7 +329,7 @@
 
         addShutdownHook();
         if (deleteAllMessagesOnStartup) {
-            getPersistenceAdapter().deleteAllMessages();
+            deleteAllMessages();
         }
 
         if (isUseJmx()) {
@@ -687,6 +687,14 @@
      */
     public void setPlugins(BrokerPlugin[] plugins) {
         this.plugins = plugins;
+    }
+    
+    /**
+     * Delete all messages from the persistent store
+     * @throws IOException
+     */
+    public void deleteAllMessages() throws IOException{
+        getPersistenceAdapter().deleteAllMessages();
     }
 
     // Implementation methods

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=379767&r1=379766&r2=379767&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Wed Feb 22 05:28:04 2006
@@ -1,18 +1,15 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.network;
 
@@ -31,102 +28,115 @@
 import org.springframework.context.support.AbstractApplicationContext;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.Resource;
-
-public class SimpleNetworkTest extends TestCase  {
-    
-    protected static final int MESSAGE_COUNT = 10;
+public class SimpleNetworkTest extends TestCase{
+    protected static final int MESSAGE_COUNT=10;
     protected AbstractApplicationContext context;
     protected Connection localConnection;
     protected Connection remoteConnection;
     protected BrokerService localBroker;
     protected BrokerService remoteBroker;
-    
-   
-
-    protected void setUp() throws Exception {
-        
-        super.setUp();
-        Resource resource = new ClassPathResource("org/apache/activemq/network/localBroker.xml");
-        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
-        factory.afterPropertiesSet();
-        localBroker = factory.getBroker();
-        
-        resource = new ClassPathResource("org/apache/activemq/network/remoteBroker.xml");
-        factory = new BrokerFactoryBean(resource);
-        factory.afterPropertiesSet();
-        remoteBroker = factory.getBroker();
-        
-        localBroker.start();
-        remoteBroker.start();
-        
-        URI localURI = localBroker.getVmConnectorURI();
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
-        localConnection = fac.createConnection();
-        localConnection.start();
-        
-        URI remoteURI = remoteBroker.getVmConnectorURI();
-        fac = new ActiveMQConnectionFactory(remoteURI);
-        remoteConnection = fac.createConnection();
-        remoteConnection.start();
-        
-    }
+    protected Session localSession;
+    protected Session remoteSession;
+    protected ActiveMQTopic included;
+    protected ActiveMQTopic excluded;
+    protected String consumerName="durableSubs";
 
-    
-    protected void tearDown() throws Exception {
-        localConnection.close();
-        remoteConnection.close();
-        localBroker.stop();
-        remoteBroker.stop();
-        super.tearDown();
-    }
-    
-      
     public void testFiltering() throws Exception{
-       ActiveMQTopic included = new ActiveMQTopic("include.test.bar");
-       ActiveMQTopic excluded = new ActiveMQTopic("exclude.test.bar");
-       Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-       Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-       MessageConsumer includedConsumer = remoteSession.createConsumer(included);
-       MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
-       MessageProducer includedProducer = localSession.createProducer(included);
-       MessageProducer excludedProducer = localSession.createProducer(excluded);
-       Thread.sleep(1000);
-       
-       Message test = localSession.createTextMessage("test");
-       includedProducer.send(test);
-       excludedProducer.send(test);
-       
-       assertNull(excludedConsumer.receive(500));
-       assertNotNull(includedConsumer.receive(500));
+        MessageConsumer includedConsumer=remoteSession.createConsumer(included);
+        MessageConsumer excludedConsumer=remoteSession.createConsumer(excluded);
+        MessageProducer includedProducer=localSession.createProducer(included);
+        MessageProducer excludedProducer=localSession.createProducer(excluded);
+        Thread.sleep(1000);
+        Message test=localSession.createTextMessage("test");
+        includedProducer.send(test);
+        excludedProducer.send(test);
+        assertNull(excludedConsumer.receive(500));
+        assertNotNull(includedConsumer.receive(500));
     }
-    
+
     public void testConduitBridge() throws Exception{
-        ActiveMQTopic included = new ActiveMQTopic("include.test.bar");
-       
-        Session localSession = localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        Session remoteSession = remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer1 = remoteSession.createConsumer(included);
-        MessageConsumer consumer2 = remoteSession.createConsumer(included);
-        MessageProducer producer = localSession.createProducer(included);
+        MessageConsumer consumer1=remoteSession.createConsumer(included);
+        MessageConsumer consumer2=remoteSession.createConsumer(included);
+        MessageProducer producer=localSession.createProducer(included);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-       
         Thread.sleep(1000);
-        
-        
-        int count = 10;
-        for (int i = 0; i < count; i++){
-            Message test = localSession.createTextMessage("test-" + i);
+        for(int i=0;i<MESSAGE_COUNT;i++){
+            Message test=localSession.createTextMessage("test-"+i);
             producer.send(test);
             assertNotNull(consumer1.receive(500));
             assertNotNull(consumer2.receive(500));
         }
-        
-        
-        //ensure no more messages received
+        // ensure no more messages received
         assertNull(consumer1.receive(500));
         assertNull(consumer2.receive(500));
-     }
-    
-    
+    }
 
+    public void testDurableStoreAndForward() throws Exception{
+        // create a remote durable consumer
+        MessageConsumer remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
+        Thread.sleep(1000);
+        // now close everything down and restart
+        doTearDown();
+        doSetUp();
+        MessageProducer producer=localSession.createProducer(included);
+        for(int i=0;i<MESSAGE_COUNT;i++){
+            Message test=localSession.createTextMessage("test-"+i);
+            producer.send(test);
+        }
+        Thread.sleep(1000);
+        // close everything down and restart
+        doTearDown();
+        doSetUp();
+        remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
+        for(int i=0;i<MESSAGE_COUNT;i++){
+            Message test=localSession.createTextMessage("test-"+i);
+            assertNotNull(remoteConsumer.receive(500));
+        }
+    }
+
+    protected void setUp() throws Exception{
+        super.setUp();
+        doSetUp();
+    }
+
+    protected void tearDown() throws Exception{
+        localBroker.deleteAllMessages();
+        remoteBroker.deleteAllMessages();
+        doTearDown();
+        super.tearDown();
+    }
+
+    protected void doTearDown() throws Exception{
+        localConnection.close();
+        remoteConnection.close();
+        localBroker.stop();
+        remoteBroker.stop();
+    }
+
+    protected void doSetUp() throws Exception{
+        Resource resource=new ClassPathResource("org/apache/activemq/network/localBroker.xml");
+        BrokerFactoryBean factory=new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        localBroker=factory.getBroker();
+        resource=new ClassPathResource("org/apache/activemq/network/remoteBroker.xml");
+        factory=new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        remoteBroker=factory.getBroker();
+        localBroker.start();
+        remoteBroker.start();
+        URI localURI=localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
+        localConnection=fac.createConnection();
+        localConnection.setClientID("local");
+        localConnection.start();
+        URI remoteURI=remoteBroker.getVmConnectorURI();
+        fac=new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection=fac.createConnection();
+        remoteConnection.setClientID("remote");
+        remoteConnection.start();
+        included=new ActiveMQTopic("include.test.bar");
+        excluded=new ActiveMQTopic("exclude.test.bar");
+        localSession=localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        remoteSession=remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml?rev=379767&r1=379766&r2=379767&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
Wed Feb 22 05:28:04 2006
@@ -16,13 +16,16 @@
 -->
 <beans xmlns="http://activemq.org/config/1.0">
 
-<broker brokerName="localBroker" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
+<broker brokerName="localBroker" persistent="true" useShutdownHook="false">
     <transportConnectors>
       <transportConnector uri="tcp://localhost:61616"/>
     </transportConnectors>
 
     <networkConnectors>
       <networkConnector uri="static://(tcp://localhost:61617)">
+         dynamicOnly = false
+         conduitSubscriptions = true
+         decreaseNetworkConsumerPriority = false
       	<excludedDestinations>
       		<queue physicalName="exclude.test.foo"/>
       		<topic physicalName="exclude.test.bar"/>

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml?rev=379767&r1=379766&r2=379767&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
Wed Feb 22 05:28:04 2006
@@ -16,7 +16,7 @@
 -->
 <beans xmlns="http://activemq.org/config/1.0">
 
-  <broker brokerName="remoteBroker" persistent="false" useShutdownHook="false" deleteAllMessagesOnStartup="true">
+  <broker brokerName="remoteBroker" persistent="true" useShutdownHook="false">
     <transportConnectors>
       <transportConnector uri="tcp://localhost:61617"/>
     </transportConnectors>



Mime
View raw message