activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6086 - add some determinism to interleaved stop and start calls on broker service
Date Fri, 11 Dec 2015 17:33:41 GMT
https://issues.apache.org/jira/browse/AMQ-6086 - add some determinism to interleaved stop and
start calls on broker service

(cherry picked from commit da076f4a632af6ad1d66382523f4c50e9de9e62e)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e5b86116
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e5b86116
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e5b86116

Branch: refs/heads/activemq-5.13.x
Commit: e5b86116c4683f2f7149cddc50d2d7aa1909efb8
Parents: aa2a85a
Author: gtully <gary.tully@gmail.com>
Authored: Fri Dec 11 16:22:31 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Dec 11 17:32:06 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  34 ++-
 .../java/org/apache/activemq/util/LockFile.java |   2 +-
 .../network/DuplexNetworkMBeanTest.java         |   7 +-
 .../StartAndConcurrentStopBrokerTest.java       | 302 +++++++++++++++++++
 4 files changed, 333 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e5b86116/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index a899490..62af182 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -587,6 +587,7 @@ public class BrokerService implements Service {
             return;
         }
 
+        setStartException(null);
         stopping.set(false);
         startDate = new Date();
         MDC.put("activemq.broker", brokerName);
@@ -642,7 +643,7 @@ public class BrokerService implements Service {
                     try {
                         doStartPersistenceAdapter();
                     } catch (Throwable e) {
-                        startException = e;
+                        setStartException(e);
                     } finally {
                         synchronized (persistenceAdapterStarted) {
                             persistenceAdapterStarted.set(true);
@@ -704,7 +705,7 @@ public class BrokerService implements Service {
                         }
                         doStartBroker();
                     } catch (Throwable t) {
-                        startException = t;
+                        setStartException(t);
                     }
                 }
             }.start();
@@ -714,9 +715,7 @@ public class BrokerService implements Service {
     }
 
     private void doStartBroker() throws Exception {
-        if (startException != null) {
-            return;
-        }
+        checkStartException();
         startDestinations();
         addShutdownHook();
 
@@ -786,6 +785,9 @@ public class BrokerService implements Service {
             return;
         }
 
+        if (started.get()) {
+            setStartException(new BrokerStoppedException("Stop invoked"));
+        }
         MDC.put("activemq.broker", brokerName);
 
         if (systemExitOnShutdown) {
@@ -831,7 +833,7 @@ public class BrokerService implements Service {
             tempDataStore = null;
         }
         try {
-            stopper.stop(persistenceAdapter);
+            stopper.stop(getPersistenceAdapter());
             persistenceAdapter = null;
             if (isUseJmx()) {
                 stopper.stop(getManagementContext());
@@ -989,7 +991,7 @@ public class BrokerService implements Service {
         long expiration = Math.max(0, timeout + System.currentTimeMillis());
         while (!isStarted() && !stopped.get() && !waitSucceeded &&
expiration > System.currentTimeMillis()) {
             try {
-                if (startException != null) {
+                if (getStartException() != null) {
                     return waitSucceeded;
                 }
                 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
@@ -1006,6 +1008,7 @@ public class BrokerService implements Service {
      */
     public Broker getBroker() throws Exception {
         if (broker == null) {
+            checkStartException();
             broker = createBroker();
         }
         return broker;
@@ -1225,8 +1228,9 @@ public class BrokerService implements Service {
         addService(this.producerSystemUsage);
     }
 
-    public PersistenceAdapter getPersistenceAdapter() throws IOException {
+    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
         if (persistenceAdapter == null) {
+            checkStartException();
             persistenceAdapter = createPersistenceAdapter();
             configureService(persistenceAdapter);
             this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
@@ -1314,11 +1318,22 @@ public class BrokerService implements Service {
 
     public ManagementContext getManagementContext() {
         if (managementContext == null) {
+            checkStartException();
             managementContext = new ManagementContext();
         }
         return managementContext;
     }
 
+    synchronized private void checkStartException() {
+        if (startException != null) {
+            throw new BrokerStoppedException(startException);
+        }
+    }
+
+    synchronized private void setStartException(Throwable t) {
+        startException = t;
+    }
+
     public void setManagementContext(ManagementContext managementContext) {
         this.managementContext = managementContext;
     }
@@ -2688,6 +2703,7 @@ public class BrokerService implements Service {
     }
 
     protected void startVirtualConsumerDestinations() throws Exception {
+        checkStartException();
         ConnectionContext adminConnectionContext = getAdminConnectionContext();
         Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
         DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
@@ -3063,7 +3079,7 @@ public class BrokerService implements Service {
                getVirtualTopicConsumerDestinationFilter().matches(destination);
     }
 
-    public Throwable getStartException() {
+    synchronized public Throwable getStartException() {
         return startException;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e5b86116/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
index b454c41..2f89bc5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
@@ -111,7 +111,7 @@ public class LockFile {
 
     /**
      */
-    public void unlock() {
+    synchronized public void unlock() {
         if (DISABLE_FILE_LOCK) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e5b86116/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 13a94cb..bac271f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -23,10 +23,12 @@ import java.net.MalformedURLException;
 import java.util.List;
 import java.util.Set;
 
+import javax.management.MBeanServer;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.util.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +42,7 @@ public class DuplexNetworkMBeanTest {
 
     private int primaryBrokerPort;
     private int secondaryBrokerPort;
+    private MBeanServer mBeanServer = new ManagementContext().getMBeanServer();
 
     @Before
     public void setUp() throws Exception {
@@ -155,7 +158,7 @@ public class DuplexNetworkMBeanTest {
             }
 
             LOG.info("Query name: " + beanName);
-            mbeans = broker.getManagementContext().queryNames(beanName, null);
+            mbeans = mBeanServer.queryNames(beanName, null);
             if (mbeans != null) {
                 count = mbeans.size();
             } else {
@@ -175,7 +178,7 @@ public class DuplexNetworkMBeanTest {
     private void logAllMbeans(BrokerService broker) throws MalformedURLException {
         try {
             // trace all existing MBeans
-            Set<?> all = broker.getManagementContext().queryNames(null, null);
+            Set<?> all = mBeanServer.queryNames(null, null);
             LOG.info("Total MBean count=" + all.size());
             for (Object o : all) {
                 ObjectInstance bean = (ObjectInstance)o;

http://git-wip-us.apache.org/repos/asf/activemq/blob/e5b86116/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
new file mode 100755
index 0000000..b2ad1cd
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.ObjectInputStream;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.OperationsException;
+import javax.management.QueryExp;
+import javax.management.ReflectionException;
+import javax.management.loading.ClassLoaderRepository;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerStoppedException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StartAndConcurrentStopBrokerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(StartAndConcurrentStopBrokerTest.class);
+
+
+    @Test(timeout = 30000)
+    public void testConcurrentStop() throws Exception {
+
+        final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
+        final HashMap mbeans = new HashMap();
+        final MBeanServer mBeanServer = new MBeanServer() {
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException,
InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException
{
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName name, ObjectName
loaderName) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException,
MBeanException, NotCompliantMBeanException, InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName name, Object[]
params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException,
MBeanException, NotCompliantMBeanException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName name, ObjectName
loaderName, Object[] params, String[] signature) throws ReflectionException, InstanceAlreadyExistsException,
MBeanRegistrationException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException
{
+                return null;
+            }
+
+            @Override
+            public ObjectInstance registerMBean(Object object, ObjectName name) throws InstanceAlreadyExistsException,
MBeanRegistrationException, NotCompliantMBeanException {
+                if (mbeans.containsKey(name)) {
+                    throw new InstanceAlreadyExistsException("Got one already");
+                }
+                LOG.info("register:" + name);
+
+                try {
+                    if (name.compareTo(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"))
== 0) {
+                        gotBrokerMbean.countDown();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                mbeans.put(name, object);
+                return new ObjectInstance(name, object.getClass().getName());
+
+            }
+
+            @Override
+            public void unregisterMBean(ObjectName name) throws InstanceNotFoundException,
MBeanRegistrationException {
+                mbeans.remove(name);
+            }
+
+            @Override
+            public ObjectInstance getObjectInstance(ObjectName name) throws InstanceNotFoundException
{
+                return null;
+            }
+
+            @Override
+            public Set<ObjectInstance> queryMBeans(ObjectName name, QueryExp query)
{
+                return null;
+            }
+
+            @Override
+            public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {
+                return null;
+            }
+
+            @Override
+            public boolean isRegistered(ObjectName name) {
+                return mbeans.containsKey(name);
+            }
+
+            @Override
+            public Integer getMBeanCount() {
+                return null;
+            }
+
+            @Override
+            public Object getAttribute(ObjectName name, String attribute) throws MBeanException,
AttributeNotFoundException, InstanceNotFoundException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public AttributeList getAttributes(ObjectName name, String[] attributes) throws
InstanceNotFoundException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public void setAttribute(ObjectName name, Attribute attribute) throws InstanceNotFoundException,
AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException
{
+
+            }
+
+            @Override
+            public AttributeList setAttributes(ObjectName name, AttributeList attributes)
throws InstanceNotFoundException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public Object invoke(ObjectName name, String operationName, Object[] params,
String[] signature) throws InstanceNotFoundException, MBeanException, ReflectionException
{
+                return null;
+            }
+
+            @Override
+            public String getDefaultDomain() {
+                return null;
+            }
+
+            @Override
+            public String[] getDomains() {
+                return new String[0];
+            }
+
+            @Override
+            public void addNotificationListener(ObjectName name, NotificationListener listener,
NotificationFilter filter, Object handback) throws InstanceNotFoundException {
+
+            }
+
+            @Override
+            public void addNotificationListener(ObjectName name, ObjectName listener, NotificationFilter
filter, Object handback) throws InstanceNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, ObjectName listener)
throws InstanceNotFoundException, ListenerNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, ObjectName listener,
NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException
{
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, NotificationListener
listener) throws InstanceNotFoundException, ListenerNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, NotificationListener
listener, NotificationFilter filter, Object handback) throws InstanceNotFoundException, ListenerNotFoundException
{
+
+            }
+
+            @Override
+            public MBeanInfo getMBeanInfo(ObjectName name) throws InstanceNotFoundException,
IntrospectionException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public boolean isInstanceOf(ObjectName name, String className) throws InstanceNotFoundException
{
+                return false;
+            }
+
+            @Override
+            public Object instantiate(String className) throws ReflectionException, MBeanException
{
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, ObjectName loaderName) throws ReflectionException,
MBeanException, InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, Object[] params, String[] signature)
throws ReflectionException, MBeanException {
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, ObjectName loaderName, Object[] params,
String[] signature) throws ReflectionException, MBeanException, InstanceNotFoundException
{
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(ObjectName name, byte[] data) throws InstanceNotFoundException,
OperationsException {
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(String className, byte[] data) throws OperationsException,
ReflectionException {
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(String className, ObjectName loaderName,
byte[] data) throws InstanceNotFoundException, OperationsException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public ClassLoader getClassLoaderFor(ObjectName mbeanName) throws InstanceNotFoundException
{
+                return null;
+            }
+
+            @Override
+            public ClassLoader getClassLoader(ObjectName loaderName) throws InstanceNotFoundException
{
+                return null;
+            }
+
+            @Override
+            public ClassLoaderRepository getClassLoaderRepository() {
+                return null;
+            }
+        };
+
+
+        final BrokerService broker = new BrokerService();
+
+        ExecutorService executor = Executors.newFixedThreadPool(4);
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    broker.getManagementContext().setMBeanServer(mBeanServer);
+                    broker.start();
+                } catch (BrokerStoppedException expected) {
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    assertTrue("broker has registered mbean", gotBrokerMbean.await(10, TimeUnit.SECONDS));
+                    broker.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        executor.shutdown();
+        assertTrue("stop tasks done", executor.awaitTermination(20, TimeUnit.SECONDS));
+
+        BrokerService second = new BrokerService();
+        second.getManagementContext().setMBeanServer(mBeanServer);
+        second.start();
+        second.stop();
+
+    }
+
+}


Mime
View raw message