activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1031656 - in /activemq/trunk/activemq-core/src/test: java/org/apache/activemq/ java/org/apache/activemq/usecases/ resources/ resources/org/apache/activemq/usecases/
Date Fri, 05 Nov 2010 16:27:58 GMT
Author: dejanb
Date: Fri Nov  5 16:27:58 2010
New Revision: 1031656

URL: http://svn.apache.org/viewvc?rev=1031656&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3020 - intial tests

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Fri Nov  5 16:27:58 2010
@@ -216,6 +216,18 @@ public class JmsMultipleBrokersTestSuppo
         return null;
     }
 
+    protected MessageConsumer createSyncConsumer(String brokerName, Destination dest) throws
Exception {
+        BrokerItem brokerItem = brokers.get(brokerName);
+        if (brokerItem != null) {
+            Connection con = brokerItem.createConnection();
+            con.start();
+            Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = sess.createConsumer(dest);
+            return consumer;
+        }
+        return null;
+    }
+
     protected MessageConsumer createConsumer(String brokerName, Destination dest) throws
Exception {
         return createConsumer(brokerName, dest, null, null);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
Fri Nov  5 16:27:58 2010
@@ -17,14 +17,19 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Enumeration;
 
 import javax.jms.Destination;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.QueueBrowser;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
 import org.apache.activemq.broker.region.QueueSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -66,7 +71,7 @@ public class BrowseOverNetworkTest exten
                 + msgsB.getMessageCount());
     }
 
-    public void testconsumerInfo() throws Exception {
+    public void testConsumerInfo() throws Exception {
         createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml"));
         createBroker(new ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml"));
 
@@ -74,7 +79,7 @@ public class BrowseOverNetworkTest exten
 
         brokers.get("broker1").broker.waitUntilStarted();
 
-        
+
         Destination dest = createDestination("QUEUE.A,QUEUE.B", false);
 
 
@@ -86,17 +91,138 @@ public class BrowseOverNetworkTest exten
 
     }
 
-    protected int browseMessages(String broker, Destination dest) throws Exception {
-        QueueBrowser browser = createBrowser(broker, dest);
+    public class Browser extends Thread {
+
+        String broker;
+        Destination dest;
+        int totalCount;
+        QueueBrowser browser = null;
+        MessageConsumer consumer = null;
+        boolean consume = false;
+
+        public Browser(String broker, Destination dest) {
+            this.broker = broker;
+            this.dest = dest;
+        }
+
+        public void run() {
+            int retries = 0;
+            while (retries++ < 5) {
+                try {
+                    QueueBrowser browser = createBrowser(broker, dest);
+                    int count  = browseMessages(browser, broker);
+                    LOG.info("browser '" + broker + "' browsed " + totalCount);
+                    if (consume) {
+                        if (count != 0) {
+                            MessageConsumer consumer = createSyncConsumer(broker, dest);
+                            totalCount += count;
+                            for (int i = 0; i < count; i++) {
+                                ActiveMQTextMessage message = (ActiveMQTextMessage)consumer.receive(1000);
+                                LOG.info(broker + " consumer: " + message.getText() + " "
+ message.getDestination() +  " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath()));
+                                if (message == null) break;
+                            }
+                        }
+                    } else {
+                        totalCount = count;
+                    }
+
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                    LOG.info("Exception browsing " + e, e);
+                } finally {
+                    try {
+                        if (browser != null) {
+                            browser.close();
+                        }
+                        if (consumer != null) {
+                            consumer.close();
+                        }
+                    } catch (Exception e) {
+                        LOG.info("Exception closing browser " + e, e);
+                    }
+                }
+            }
+        }
+
+        public int getTotalCount() {
+            return totalCount;
+        }
+    }
+
+    protected NetworkConnector bridgeBrokersWithIncludedDestination(String localBrokerName,
String remoteBrokerName, ActiveMQDestination included, ActiveMQDestination excluded) throws
Exception {
+        NetworkConnector nc = bridgeBrokers(localBrokerName, remoteBrokerName, false, 4,
true);
+        nc.addStaticallyIncludedDestination(included);
+        if (excluded != null) {
+            nc.addExcludedDestination(excluded);
+        }
+        nc.setPrefetchSize(1);
+        return nc;
+    }
+
+
+    public void testMultipleBrowsers() throws Exception {
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false&brokerId=BrokerA"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false&brokerId=BrokerB"));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false&brokerId=BrokerC"));
+        createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD?persistent=false&useJmx=false&brokerId=BrokerD"));
+
+        Destination composite = createDestination("TEST.FOO,TEST.BAR", false);
+        Destination dest1 = createDestination("TEST.FOO", false);
+        Destination dest2 = createDestination("TEST.BAR", false);
+
+        bridgeBrokersWithIncludedDestination("BrokerA", "BrokerC", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerA", "BrokerB", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerA", "BrokerD", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerB", "BrokerA", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerB", "BrokerC", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerB", "BrokerD", (ActiveMQDestination)composite,
null);
+        bridgeBrokersWithIncludedDestination("BrokerC", "BrokerA", (ActiveMQDestination)dest2,
(ActiveMQDestination)dest1);
+        bridgeBrokersWithIncludedDestination("BrokerC", "BrokerB", (ActiveMQDestination)dest2,
(ActiveMQDestination)dest1);
+        bridgeBrokersWithIncludedDestination("BrokerC", "BrokerD", (ActiveMQDestination)dest2,
(ActiveMQDestination)dest1);
+        bridgeBrokersWithIncludedDestination("BrokerD", "BrokerA", (ActiveMQDestination)dest1,
(ActiveMQDestination)dest2);
+        bridgeBrokersWithIncludedDestination("BrokerD", "BrokerB", (ActiveMQDestination)dest1,
(ActiveMQDestination)dest2);
+        bridgeBrokersWithIncludedDestination("BrokerD", "BrokerC", (ActiveMQDestination)dest1,
(ActiveMQDestination)dest2);
+
+        startAllBrokers();
+
+        brokers.get("BrokerA").broker.waitUntilStarted();
+        brokers.get("BrokerC").broker.waitUntilStarted();
+        brokers.get("BrokerD").broker.waitUntilStarted();
+
+        Browser browser1 = new Browser("BrokerC", composite);
+        browser1.start();
+
+        Browser browser2 = new Browser("BrokerD", composite);
+        browser2.start();
+
+        sendMessages("BrokerA", composite, MESSAGE_COUNT);
+
+        browser1.join();
+        browser2.join();
+
+        assertEquals(MESSAGE_COUNT * 2, browser1.getTotalCount() + browser2.getTotalCount()
);
+
+    }
+
+    protected int browseMessages(QueueBrowser browser, String name) throws Exception {
         Enumeration msgs = browser.getEnumeration();
         int browsedMessage = 0;
         while (msgs.hasMoreElements()) {
             browsedMessage++;
-            msgs.nextElement();
+            ActiveMQTextMessage message = (ActiveMQTextMessage)msgs.nextElement();
+            LOG.info(name + " browsed: " + message.getText() + " " + message.getDestination()
+  " " + message.getMessageId() + " " + Arrays.toString(message.getBrokerPath()));
         }
         return browsedMessage;
     }
 
+
+    protected int browseMessages(String broker, Destination dest) throws Exception {
+        QueueBrowser browser = createBrowser(broker, dest);
+        int browsedMessage = browseMessages(browser, "browser");
+        browser.close();
+        return browsedMessage;
+    }
+
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Fri Nov  5 16:27:58 2010
@@ -21,6 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
+#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE
 #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
Fri Nov  5 16:27:58 2010
@@ -25,7 +25,7 @@
   
   <!-- Broker1 ?useQueueForAccept=false -->
   <amq:broker brokerName="broker1" id="broker1" useJmx="true" 
-  			  persistent="true" start="false" advisorySupport="true">
+  			  persistent="true" start="false" advisorySupport="true" deleteAllMessagesOnStartup="true">
  
  
    <amq:destinationInterceptors>

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
(original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
Fri Nov  5 16:27:58 2010
@@ -26,7 +26,7 @@
     
   <!-- Broker2 (lonb) -->
   <amq:broker brokerName="broker2" id="broker2" useJmx="true" 
-  				persistent="true" start="false" advisorySupport="true">
+  				persistent="true" start="false" advisorySupport="true" deleteAllMessagesOnStartup="true">
 
     <!--  Network connectors -->
     <amq:networkConnectors>



Mime
View raw message