activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r512764 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/BrokerService.java test/java/org/apache/activemq/broker/Main.java
Date Wed, 28 Feb 2007 14:21:29 GMT
Author: jstrachan
Date: Wed Feb 28 06:21:29 2007
New Revision: 512764

URL: http://svn.apache.org/viewvc?view=rev&rev=512764
Log:
added a little helper method to make it easy to wait on a broker being shut down in Java code

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Feb 28 06:21:29 2007
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -145,6 +146,7 @@
     private URI vmConnectorURI;
     private PolicyMap destinationPolicy;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean stopped = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
     private boolean keepDurableSubsActive=true;
     private boolean useVirtualTopics=true;
@@ -154,9 +156,8 @@
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName = false;
-    
+    private CountDownLatch stoppedLatch = new CountDownLatch(1);
 
-   
     /**
      * Adds a new transport connector for the given bind address
      *
@@ -471,33 +472,27 @@
         // to avoid timimg issue with discovery (spinning up a new instance)
         BrokerRegistry.getInstance().unbind(getBrokerName());
         VMTransportFactory.stopped(getBrokerName());
+        stopped.set(true);
+        stoppedLatch.countDown();
+
         log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", "+brokerId+") stopped");
         stopper.throwFirstException();
     }
 
-	protected void stopAllConnectors(ServiceStopper stopper) {
-		
-		for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
-            NetworkConnector connector = (NetworkConnector) iter.next();
-            unregisterNetworkConnectorMBean(connector);
-            stopper.stop(connector);
+    /**
+     * A helper method to block the caller thread until the broker has been stopped
+     */
+    public void waitUntilStopped() {
+        while (!stopped.get()) {
+            try {
+                stoppedLatch.await();
+            }
+            catch (InterruptedException e) {
+                // ignore
+            }
         }
+    }
 
-        for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
-            ProxyConnector connector = (ProxyConnector) iter.next();
-            stopper.stop(connector);
-        }
-        
-        for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
-            JmsConnector connector = (JmsConnector) iter.next();
-            stopper.stop(connector);
-        }
-        
-        for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
-            TransportConnector connector = (TransportConnector) iter.next();
-            stopper.stop(connector);
-        }
-	}
 
     // Properties
     // -------------------------------------------------------------------------
@@ -1122,6 +1117,30 @@
             }
         }
     }
+
+    protected void stopAllConnectors(ServiceStopper stopper) {
+
+		for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
+            NetworkConnector connector = (NetworkConnector) iter.next();
+            unregisterNetworkConnectorMBean(connector);
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
+            ProxyConnector connector = (ProxyConnector) iter.next();
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
+            JmsConnector connector = (JmsConnector) iter.next();
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
+            TransportConnector connector = (TransportConnector) iter.next();
+            stopper.stop(connector);
+        }
+	}
 
     protected TransportConnector registerConnectorMBean(TransportConnector connector) throws
IOException  {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java Wed Feb
28 06:21:29 2007
@@ -18,12 +18,7 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
-import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
-import org.apache.activemq.broker.view.DestinationDotFilePlugin;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.demo.DefaultQueueSender;
 
@@ -34,10 +29,11 @@
 /**
  * A helper class which can be handy for running a broker in your IDE from the
  * activemq-core module.
- * 
+ *
  * @version $Revision$
  */
 public class Main {
+    protected static boolean createConsumers = false;
 
     /**
      * @param args
@@ -66,23 +62,29 @@
             broker.addConnector("stomp://localhost:61613");
             broker.start();
 
-            // lets create a dummy couple of consumers
-            Connection connection = new ActiveMQConnectionFactory().createConnection();
-            connection.start();
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM"));
-            MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"),
"price > 100");
-            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"),
"price > 200");
 
             // lets publish some messages so that there is some stuff to browse
-            DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" });
-            DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" });
+            DefaultQueueSender.main(new String[]{"Prices.Equity.IBM"});
+            DefaultQueueSender.main(new String[]{"Prices.Equity.MSFT"});
+
+            // lets create a dummy couple of consumers
+            if (createConsumers) {
+                Connection connection = new ActiveMQConnectionFactory().createConnection();
+                connection.start();
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer1 = session.createConsumer(new ActiveMQQueue("Orders.IBM"));
+                MessageConsumer consumer2 = session.createConsumer(new ActiveMQQueue("Orders.MSFT"),
"price > 100");
+                Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer3 = session2.createConsumer(new ActiveMQQueue("Orders.MSFT"),
"price > 200");
+            }
+            else {
+                // Lets wait for the broker
+                broker.waitUntilStopped();
+            }
         }
         catch (Exception e) {
             System.out.println("Failed: " + e);
             e.printStackTrace();
         }
     }
-
 }



Mime
View raw message