activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r813422 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: BrokerService.java TransportConnection.java ft/MasterConnector.java jmx/BrokerView.java jmx/BrokerViewMBean.java
Date Thu, 10 Sep 2009 13:19:53 GMT
Author: rajdavies
Date: Thu Sep 10 13:19:52 2009
New Revision: 813422

URL: http://svn.apache.org/viewvc?rev=813422&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2071 and 
https://issues.apache.org/activemq/browse/AMQ-2070

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Sep 10 13:19:52 2009
@@ -31,7 +31,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import org.apache.activemq.ActiveMQConnectionMetaData;
@@ -386,7 +385,8 @@
      * @return true if this Broker is a slave to a Master
      */
     public boolean isSlave() {
-        return masterConnector != null && masterConnector.isSlave();
+        return (masterConnector != null && masterConnector.isSlave()) ||
+            (masterConnector != null && masterConnector.isStoppedBeforeStart());
     }
 
     public void masterFailed() {
@@ -420,7 +420,7 @@
     // Service interface
     // -------------------------------------------------------------------------
     public void start() throws Exception {
-        if (!started.compareAndSet(false, true)) {
+        if (stopped.get() || !started.compareAndSet(false, true)) {
             // lets just ignore redundant start() calls
             // as its way too easy to not be completely sure if start() has been
             // called or not with the gazillion of different configuration
@@ -467,8 +467,10 @@
             if (!isSlave()) {
                 startAllConnectors();
             }
-            if (isUseJmx() && masterConnector != null) {
-                registerFTConnectorMBean(masterConnector);
+            if (!stopped.get()) {
+                if (isUseJmx() && masterConnector != null) {
+                    registerFTConnectorMBean(masterConnector);
+                }
             }
             brokerId = broker.getBrokerId();
             LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId
+ ") started");
@@ -477,7 +479,9 @@
         } catch (Exception e) {
             LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
             try {
-                stop();
+                if (!stopped.get()) {
+                    stop();
+                }
             } catch (Exception ex) {
                 LOG.warn("Failed to stop broker after failure in start ", ex);
             }
@@ -517,6 +521,24 @@
         SelectorParser.clearCache();
         stopped.set(true);
         stoppedLatch.countDown();
+        if (masterConnectorURI == null) {
+            // master start has not finished yet
+            if (slaveStartSignal.getCount() == 1) {
+                started.set(false);
+                slaveStartSignal.countDown();
+            }
+        } else {
+            for (Service service : services) {
+                if (service instanceof MasterConnector) {
+                    MasterConnector mConnector = (MasterConnector) service;
+                    if (!mConnector.isSlave()) {
+                        // means should be slave but not connected to master yet
+                        started.set(false);
+                        mConnector.stopBeforeConnected();
+                    }
+                }
+            }
+        }
         LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ")
stopped");
         synchronized (shutdownHooks) {
             for (Runnable hook : shutdownHooks) {
@@ -529,6 +551,77 @@
         }
         stopper.throwFirstException();
     }
+    
+        public boolean checkQueueSize(String queueName) {
+        long count = 0;
+        long queueSize = 0;
+        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
+        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet())
{
+            if (entry.getKey().isQueue()) {
+                if (entry.getValue().getName().matches(queueName)) {
+                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
+                    count = queueSize;
+                    if (queueSize > 0) {
+                        LOG.info("Queue has pending message:" + entry.getValue().getName()
+ " queueSize is:"
+                                + queueSize);
+                    }
+                }
+            }
+        }
+        return count == 0;
+    }
+
+    /**
+     * This method (both connectorName and queueName are using regex to match)
+     * 1. stop the connector (supposed the user input the connector which the
+     * clients connect to) 2. to check whether there is any pending message on
+     * the queues defined by queueName 3. supposedly, after stop the connector,
+     * client should failover to other broker and pending messages should be
+     * forwarded. if no pending messages, the method finally call stop to stop
+     * the broker.
+     * 
+     * @param connectorName
+     * @param queueName
+     * @param timeout
+     * @param pollInterval
+     * @throws Exception
+     */
+    public void stopGracefully(String connectorName, String queueName, long timeout, long
pollInterval)
+            throws Exception {
+        if (isUseJmx()) {
+            if (connectorName == null || queueName == null || timeout <= 0) {
+                throw new Exception(
+                        "connectorName and queueName cannot be null and timeout should be
>0 for stopGracefully.");
+            }
+            if (pollInterval <= 0) {
+                pollInterval = 30;
+            }
+            LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:"
+ queueName + " timeout:"
+                    + timeout + " pollInterval:" + pollInterval);
+            TransportConnector connector;
+            for (int i = 0; i < transportConnectors.size(); i++) {
+                connector = transportConnectors.get(i);
+                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName))
{
+                    connector.stop();
+                }
+            }
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start < timeout * 1000) {
+                // check quesize until it gets zero
+                if (checkQueueSize(queueName)) {
+                    stop();
+                    break;
+                } else {
+                    Thread.sleep(pollInterval * 1000);
+                }
+            }
+            if (stopped.get()) {
+                LOG.info("Successfully stop the broker.");
+            } else {
+                LOG.info("There is still pending message on the queue. Please check and stop
the broker manually.");
+            }
+        }
+    }
 
     /**
      * A helper method to block the caller thread until the broker has been
@@ -1828,24 +1921,26 @@
             if (isWaitForSlave()) {
                 waitForSlave();
             }
-            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator();
iter.hasNext();) {
-                NetworkConnector connector = iter.next();
-                connector.setLocalUri(uri);
-                connector.setBrokerName(getBrokerName());
-                connector.setDurableDestinations(durableDestinations);
-                connector.start();
-            }
-            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();)
{
-                ProxyConnector connector = iter.next();
-                connector.start();
-            }
-            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();)
{
-                JmsConnector connector = iter.next();
-                connector.start();
-            }
-            for (Service service : services) {
-                configureService(service);
-                service.start();
+            if (!stopped.get()) {
+                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator();
iter.hasNext();) {
+                    NetworkConnector connector = iter.next();
+                    connector.setLocalUri(uri);
+                    connector.setBrokerName(getBrokerName());
+                    connector.setDurableDestinations(durableDestinations);
+                    connector.start();
+                }
+                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator();
iter.hasNext();) {
+                    ProxyConnector connector = iter.next();
+                    connector.start();
+                }
+                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();)
{
+                    JmsConnector connector = iter.next();
+                    connector.start();
+                }
+                for (Service service : services) {
+                    configureService(service);
+                    service.start();
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Sep 10 13:19:52 2009
@@ -218,7 +218,7 @@
 		    	if(brokerInfo.isSlaveBroker()){
 		        	LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.",
e);
 		            try {
-		                broker.stop();
+		                doStop();
 		                bService.stop();
 		        	}catch(Exception ex){
 		                LOG.warn("Failed to stop the master",ex);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Thu Sep 10 13:19:52 2009
@@ -38,6 +38,7 @@
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
@@ -63,6 +64,7 @@
     private Transport remoteBroker;
     private TransportConnector connector;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
     private final IdGenerator idGenerator = new IdGenerator();
     private String userName;
     private String password;
@@ -70,6 +72,8 @@
     private SessionInfo sessionInfo;
     private ProducerInfo producerInfo;
     private final AtomicBoolean masterActive = new AtomicBoolean();
+    private BrokerInfo brokerInfo;
+    private boolean firstConnection=true;
 
     public MasterConnector() {
     }
@@ -95,6 +99,15 @@
         return masterActive.get();
     }
 
+    protected void restartBridge() throws Exception {
+        localBroker.oneway(connectionInfo);
+        remoteBroker.oneway(connectionInfo);
+        localBroker.oneway(sessionInfo);
+        remoteBroker.oneway(sessionInfo);
+        remoteBroker.oneway(producerInfo);
+        remoteBroker.oneway(brokerInfo);
+    }
+    
     public void start() throws Exception {
         if (!started.compareAndSet(false, true)) {
             return;
@@ -130,6 +143,35 @@
                     serviceRemoteException(error);
                 }
             }
+            
+            public void transportResumed() {
+            	try{
+            		if(!firstConnection){
+	            		localBroker = TransportFactory.connect(localURI);
+	            		localBroker.setTransportListener(new DefaultTransportListener() {
+	
+	                        public void onCommand(Object command) {
+	                        }
+	
+	                        public void onException(IOException error) {
+	                            if (started.get()) {
+	                                serviceLocalException(error);
+	                            }
+	                        }
+	                    });
+	            		localBroker.start();
+	            		restartBridge();
+	            		LOG.info("Slave connection between " + localBroker + " and " + remoteBroker
+ " has been reestablished.");
+            		}else{
+            			firstConnection=false;
+            		}
+            	}catch(IOException e){
+            		LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:",
e);
+            	}catch(Exception e){
+            		LOG.error("MasterConnector failed to restart localBroker in transportResumed:",
e);
+            	}
+            	
+            }
         });
         try {
             localBroker.start();
@@ -138,8 +180,12 @@
             masterActive.set(true);
         } catch (Exception e) {
             masterActive.set(false);
-            LOG.error("Failed to start network bridge: " + e, e);
-        }   
+            if(!stoppedBeforeStart.get()){
+            	LOG.error("Failed to start network bridge: " + e, e);
+            }else{
+            	LOG.info("Slave stopped before connected to the master.");
+            }
+        }    
     }
 
     protected void startBridge() throws Exception {
@@ -149,15 +195,9 @@
         connectionInfo.setUserName(userName);
         connectionInfo.setPassword(password);
         connectionInfo.setBrokerMasterConnector(true);
-        localBroker.oneway(connectionInfo);
-        remoteBroker.oneway(connectionInfo);
         sessionInfo = new SessionInfo(connectionInfo, 1);
-        localBroker.oneway(sessionInfo);
-        remoteBroker.oneway(sessionInfo);
         producerInfo = new ProducerInfo(sessionInfo, 1);
         producerInfo.setResponseRequired(false);
-        remoteBroker.oneway(producerInfo);
-        BrokerInfo brokerInfo = null;
         if (connector != null) {
             brokerInfo = connector.getBrokerInfo();
         } else {
@@ -166,12 +206,12 @@
         brokerInfo.setBrokerName(broker.getBrokerName());
         brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
         brokerInfo.setSlaveBroker(true);
-        remoteBroker.oneway(brokerInfo);
+        restartBridge();
         LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has
been established.");
     }
 
     public void stop() throws Exception {
-        if (!started.compareAndSet(true, false)) {
+        if (!started.compareAndSet(true, false)||!masterActive.get()) {
             return;
         }
         masterActive.set(false);
@@ -192,6 +232,15 @@
             ss.throwFirstException();
         }
     }
+    
+    public void stopBeforeConnected()throws Exception{
+        masterActive.set(false);
+        started.set(false);
+        stoppedBeforeStart.set(true);
+        ServiceStopper ss = new ServiceStopper();
+        ss.stop(localBroker);
+        ss.stop(remoteBroker);
+    }
 
     protected void serviceRemoteException(IOException error) {
         LOG.error("Network connection between " + localBroker + " and " + remoteBroker +
" shutdown: " + error.getMessage(), error);
@@ -224,8 +273,12 @@
     }
 
     protected void serviceLocalException(Throwable error) {
-        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "
shutdown: " + error.getMessage(), error);
-        ServiceSupport.dispose(this);
+    	if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
+	        LOG.info("Network connection between " + localBroker + " and " + remoteBroker +
" shutdown: " + error.getMessage(), error);
+	        ServiceSupport.dispose(this);
+    	}else{
+    		LOG.info(error.getMessage());
+    	}
     }
 
     /**
@@ -289,4 +342,9 @@
         broker.masterFailed();
         ServiceSupport.dispose(this);
     }
+
+	public boolean isStoppedBeforeStart() {
+		return stoppedBeforeStart.get();
+	}
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Thu Sep 10 13:19:52 2009
@@ -75,7 +75,13 @@
     public void stop() throws Exception {
         brokerService.stop();
     }
+    
+    public void stopGracefully(String connectorName, String queueName, long timeout, long
pollInterval)
+            throws Exception {
+        brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
+    }
 
+    
     public long getTotalEnqueueCount() {
         return broker.getDestinationStatistics().getEnqueues().getCount();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Thu Sep 10 13:19:52 2009
@@ -114,7 +114,10 @@
     /**
      * Stop the broker and all it's components.
      */
+    @MBeanInfo("Stop the broker and all its components.")
     void stop() throws Exception;
+    @MBeanInfo("Poll for queues matching queueName are empty before stopping")
+    void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
throws Exception;
 
     @MBeanInfo("Topics (broadcasted 'queues'); generally system information.")
     ObjectName[] getTopics();



Mime
View raw message