activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r955505 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java
Date Thu, 17 Jun 2010 08:33:37 GMT
Author: dejanb
Date: Thu Jun 17 08:33:37 2010
New Revision: 955505

URL: http://svn.apache.org/viewvc?rev=955505&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2782 - test case

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java?rev=955505&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerStompTemporaryQueueTest.java
Thu Jun 17 08:33:37 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class ThreeBrokerStompTemporaryQueueTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(ThreeBrokerStompTemporaryQueueTest.class);
+    private StompConnection stompConnection;
+
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker,
boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
+        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+        URI remoteURI;
+        if (!transportConnectors.isEmpty()) {
+            remoteURI = transportConnectors.get(0).getConnectUri();
+            NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:"
+ remoteURI));
+            connector.setName(localBroker.getBrokerName() + remoteBroker.getBrokerName());
+            localBroker.addNetworkConnector(connector);
+            maxSetupTime = 2000;
+            return connector;
+        } else {
+            throw new Exception("Remote broker has no registered connectors.");
+        }
+    }
+
+    public void testStompTemporaryQueue() throws Exception {
+        // Setup broker networks
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
+        bridgeAndConfigureBrokers("BrokerA", "BrokerC");
+        bridgeAndConfigureBrokers("BrokerB", "BrokerA");
+        bridgeAndConfigureBrokers("BrokerB", "BrokerC");
+        bridgeAndConfigureBrokers("BrokerC", "BrokerA");
+        bridgeAndConfigureBrokers("BrokerC", "BrokerB");
+
+        startAllBrokers();
+        waitForBridgeFormation();
+
+        Thread.sleep(1000);
+
+        stompConnection = new StompConnection();
+        stompConnection.open("localhost", 61614);
+        // Creating a temp queue
+        stompConnection.sendFrame("CONNECT\n" + "login: system\n" + "passcode: manager\n\n"
+ Stomp.NULL);
+
+        StompFrame frame = stompConnection.receive();
+        assertTrue(frame.toString().startsWith("CONNECTED"));
+
+        stompConnection.subscribe("/temp-queue/meaningless", "auto");
+        stompConnection.send("/temp-queue/meaningless", "Hello World");
+
+        frame = stompConnection.receive(3000);
+        assertEquals("Hello World", frame.getBody());
+
+        Thread.sleep(1000);
+
+        assertEquals("Destination", 1, brokers.get("BrokerA").broker.getAdminView().getTemporaryQueues().length);
+        assertEquals("Destination", 1, brokers.get("BrokerB").broker.getAdminView().getTemporaryQueues().length);
+        assertEquals("Destination", 1, brokers.get("BrokerC").broker.getAdminView().getTemporaryQueues().length);
+
+        int advisoryTopicsForTempQueues;
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerA", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should be present", 1, advisoryTopicsForTempQueues);
+
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerB", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should be present", 1, advisoryTopicsForTempQueues);
+
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerC", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should be present", 1, advisoryTopicsForTempQueues);
+
+        stompConnection.disconnect();
+        
+        Thread.sleep(1000);
+        
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerA", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerB", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerC", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
 
+        
+
+        LOG.info("Restarting brokerA");
+        BrokerItem brokerItem = brokers.remove("BrokerA");
+        if (brokerItem != null) {
+            brokerItem.destroy();
+        }
+        
+        BrokerService restartedBroker = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616,stomp://localhost:61613)/BrokerA"));
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB");
+        bridgeAndConfigureBrokers("BrokerA", "BrokerC");
+        restartedBroker.start();
+        waitForBridgeFormation();
+        
+        Thread.sleep(3000);
+
+        assertEquals("Destination", 0, brokers.get("BrokerA").broker.getAdminView().getTemporaryQueues().length);
+        assertEquals("Destination", 0, brokers.get("BrokerB").broker.getAdminView().getTemporaryQueues().length);
+        assertEquals("Destination", 0, brokers.get("BrokerC").broker.getAdminView().getTemporaryQueues().length);
+
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerA", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerB", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
+        advisoryTopicsForTempQueues = countTopicsByName("BrokerC", "ActiveMQ.Advisory.Consumer.Queue.ID");
+        assertEquals("Advisory topic should have been deleted", 0, advisoryTopicsForTempQueues);
+
+    }
+
+    private int countTopicsByName(String broker, String name)
+            throws Exception {
+        int advisoryTopicsForTempQueues = 0;
+        for(int i=0; i<brokers.get(broker).broker.getAdminView().getTopics().length; i++){
+            if(brokers.get(broker).broker.getAdminView().getTopics()[i].toString().contains(name)){
+                advisoryTopicsForTempQueues++;
+            }
+        }
+        return advisoryTopicsForTempQueues;
+    }
+
+
+    private void bridgeAndConfigureBrokers(String local, String remote) throws Exception
{
+        NetworkConnector bridge = bridgeBrokers(local, remote);
+    }
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?deleteAllMessagesOnStartup=true");
+        createAndConfigureBroker(new URI("broker:(tcp://localhost:61616,stomp://localhost:61613)/BrokerA"
+ options));
+        createAndConfigureBroker(new URI("broker:(tcp://localhost:61617,stomp://localhost:61614)/BrokerB"
+ options));
+        createAndConfigureBroker(new URI("broker:(tcp://localhost:61618,stomp://localhost:61615)/BrokerC"
+ options));
+
+    }
+
+    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
+        BrokerService broker = createBroker(uri);
+
+        configurePersistenceAdapter(broker);
+
+        return broker;
+    }
+
+    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir);
+        broker.setPersistenceAdapter(kaha);
+    }
+
+}
\ No newline at end of file



Mime
View raw message