Author: dejanb Date: Mon May 11 14:15:58 2009 New Revision: 773569 URL: http://svn.apache.org/viewvc?rev=773569&view=rev Log: fix for https://issues.apache.org/activemq/browse/AMQ-2245 - broker restart Modified: activemq/trunk/activemq-core/ (props changed) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Propchange: activemq/trunk/activemq-core/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Mon May 11 14:15:58 2009 @@ -1,4 +1,3 @@ - target foo activemq-data @@ -16,3 +15,6 @@ derbydb testJdbcConfig amqstore +broker1 +kahadir +data Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon May 11 14:15:58 2009 @@ -31,12 +31,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; + import org.apache.activemq.ActiveMQConnectionMetaData; import org.apache.activemq.Service; -import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.broker.cluster.ConnectionSplitBroker; import org.apache.activemq.broker.ft.MasterConnector; @@ -72,6 +73,7 @@ import org.apache.activemq.proxy.ProxyConnector; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.SecurityContext; +import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; @@ -178,6 +180,8 @@ private int systemExitOnShutdownExitCode; private SslContext sslContext; + private boolean forceStart = false; + static { String localHostName = "localhost"; try { @@ -418,6 +422,11 @@ return started.get(); } + public void start(boolean force) throws Exception { + forceStart = force; + start(); + } + // Service interface // ------------------------------------------------------------------------- public void start() throws Exception { @@ -456,12 +465,22 @@ startDestinations(); addShutdownHook(); - + + getBroker().start(); + if (isUseJmx()) { - getManagementContext().start(); + getManagementContext().start(); + ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker; + managedBroker.setContextBroker(broker); + adminView = new BrokerView(this, managedBroker); + MBeanServer mbeanServer = getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + ObjectName objectName = getBrokerObjectName(); + mbeanServer.registerMBean(adminView, objectName); + registeredMBeanNames.add(objectName); + } } - getBroker().start(); BrokerRegistry.getInstance().bind(getBrokerName(), this); // see if there is a MasterBroker service and if so, configure @@ -532,6 +551,7 @@ } } } + registeredMBeanNames.clear(); stopper.stop(getManagementContext()); } // Clear SelectorParser cache to free memory @@ -1585,30 +1605,27 @@ // Add a filter that will stop access to the broker once stopped broker = new MutableBrokerFilter(broker) { - public void stop() throws Exception { - Broker old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { + Broker old; + + public void stop() throws Exception { + old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { // Just ignore additional stop actions. public void stop() throws Exception { } + }); old.stop(); } + + public void start() throws Exception { + if (forceStart && old != null) { + this.next.set(old); + } + getNext().start(); + } + }; -// RegionBroker rBroker = (RegionBroker)regionBroker; - - if (isUseJmx()) { - ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker; - managedBroker.setContextBroker(broker); - adminView = new BrokerView(this, managedBroker); - MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - if (mbeanServer != null) { - ObjectName objectName = getBrokerObjectName(); - mbeanServer.registerMBean(adminView, objectName); - registeredMBeanNames.add(objectName); - } - } - return broker; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Mon May 11 14:15:58 2009 @@ -20,6 +20,7 @@ import java.lang.reflect.Method; import java.net.MalformedURLException; import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +63,7 @@ private AtomicBoolean started = new AtomicBoolean(false); private JMXConnectorServer connectorServer; private ObjectName namingServiceObjectName; + private Registry registry; public ManagementContext() { this(null); @@ -121,6 +123,7 @@ MBeanServerFactory.releaseMBeanServer(beanServer); } } + beanServer = null; } } @@ -361,7 +364,9 @@ private void createConnector(MBeanServer mbeanServer) throws MalformedObjectNameException, MalformedURLException, IOException { // Create the NamingService, needed by JSR 160 try { - LocateRegistry.createRegistry(connectorPort); + if (registry == null) { + registry = LocateRegistry.createRegistry(connectorPort); + } namingServiceObjectName = ObjectName.getInstance("naming:type=rmiregistry"); // Do not use the createMBean as the mx4j jar may not be in the // same class loader than the server Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon May 11 14:15:58 2009 @@ -184,6 +184,11 @@ ServiceStopper ss = new ServiceStopper(); doStop(ss); ss.throwFirstException(); + // clear the state + clientIdSet.clear(); + connections.clear(); + destinations.clear(); + brokerInfos.clear(); } public PolicyMap getDestinationPolicy() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon May 11 14:15:58 2009 @@ -90,7 +90,7 @@ protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; protected DataFileAppender appender; - protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); + protected DataFileAccessorPool accessorPool; protected Map fileMap = new HashMap(); protected Map fileByFileMap = new LinkedHashMap(); @@ -120,6 +120,7 @@ preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF); lock(); + accessorPool = new DataFileAccessorPool(this); ByteSequence sequence = controlFile.load(); if (sequence != null && sequence.getLength() > 0) { unmarshallState(sequence); @@ -197,7 +198,7 @@ public void lock() throws IOException { synchronized (this) { - if (controlFile == null) { + if (controlFile == null || controlFile.isDisposed()) { IOHelper.mkdirs(directory); controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Mon May 11 14:15:58 2009 @@ -179,4 +179,8 @@ } } + public boolean isDisposed() { + return disposed; + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon May 11 14:15:58 2009 @@ -203,7 +203,7 @@ asyncDataManager.lock(); break; } catch (IOException e) { - LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked."); + LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e); try { Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); } catch (InterruptedException e1) { @@ -325,6 +325,7 @@ topics.clear(); IOException firstException = null; referenceStoreAdapter.stop(); + referenceStoreAdapter = null; try { LOG.debug("Journal close"); asyncDataManager.close(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java?rev=773569&r1=773568&r2=773569&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Mon May 11 14:15:58 2009 @@ -19,8 +19,16 @@ import java.net.URI; import java.util.List; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; @@ -52,6 +60,39 @@ assertEquals(new ActiveMQTopic("include.test.bar"), dynamicallyIncludedDestinations.get(1)); } + + public void testBrokerRestartFails() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + + try { + brokerService.start(); + } catch (Exception e) { + return; + } + fail("Error broker should have prevented us from starting it again"); + } + + public void testForceBrokerRestart() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + + brokerService.start(true); // force restart + brokerService.waitUntilStarted(); + + //send and receive a message from a restarted broker + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636"); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + Destination dest = new ActiveMQQueue("test"); + MessageProducer producer = sess.createProducer(dest); + MessageConsumer consumer = sess.createConsumer(dest); + producer.send(sess.createTextMessage("test")); + TextMessage msg = (TextMessage)consumer.receive(1000); + assertEquals("test", msg.getText()); + } + protected void setUp() throws Exception { brokerService = createBroker();