activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5963
Date Tue, 15 Sep 2015 14:53:03 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/4cddd2c0/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
new file mode 100644
index 0000000..e27279c
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
@@ -0,0 +1,3147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.security.Provider;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionMetaData;
+import org.apache.activemq.ConfigurationException;
+import org.apache.activemq.Service;
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.ConnectorView;
+import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.HealthView;
+import org.apache.activemq.broker.jmx.HealthViewMBean;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
+import org.apache.activemq.broker.jmx.JobSchedulerView;
+import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
+import org.apache.activemq.broker.jmx.Log4JConfigView;
+import org.apache.activemq.broker.jmx.ManagedRegionBroker;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.broker.jmx.NetworkConnectorView;
+import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
+import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
+import org.apache.activemq.broker.region.DestinationFactoryImpl;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.broker.scheduler.SchedulerBroker;
+import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.network.ConnectionFilter;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.network.jms.JmsConnector;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.proxy.ProxyConnector;
+import org.apache.activemq.security.MessageAuthorizationPolicy;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.PListStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.TransportFactorySupport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.InetAddressUtil;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.StoreUtil;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
+ * number of transport connectors, network connectors and a bunch of properties
+ * which can be used to configure the broker as its lazily created.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class BrokerService implements Service {
+    public static final String DEFAULT_PORT = "61616";
+    public static final String LOCAL_HOST_NAME;
+    public static final String BROKER_VERSION;
+    public static final String DEFAULT_BROKER_NAME = "localhost";
+    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    public static final long DEFAULT_START_TIMEOUT = 600000L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
+
+    @SuppressWarnings("unused")
+    private static final long serialVersionUID = 7353129142305630237L;
+
+    private boolean useJmx = true;
+    private boolean enableStatistics = true;
+    private boolean persistent = true;
+    private boolean populateJMSXUserID;
+    private boolean useAuthenticatedPrincipalForJMSXUserID;
+    private boolean populateUserNameInMBeans;
+    private long mbeanInvocationTimeout = 0;
+
+    private boolean useShutdownHook = true;
+    private boolean useLoggingForShutdownErrors;
+    private boolean shutdownOnMasterFailure;
+    private boolean shutdownOnSlaveFailure;
+    private boolean waitForSlave;
+    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
+    private boolean passiveSlave;
+    private String brokerName = DEFAULT_BROKER_NAME;
+    private File dataDirectoryFile;
+    private File tmpDataDirectory;
+    private Broker broker;
+    private BrokerView adminView;
+    private ManagementContext managementContext;
+    private ObjectName brokerObjectName;
+    private TaskRunnerFactory taskRunnerFactory;
+    private TaskRunnerFactory persistenceTaskRunnerFactory;
+    private SystemUsage systemUsage;
+    private SystemUsage producerSystemUsage;
+    private SystemUsage consumerSystemUsaage;
+    private PersistenceAdapter persistenceAdapter;
+    private PersistenceAdapterFactory persistenceFactory;
+    protected DestinationFactory destinationFactory;
+    private MessageAuthorizationPolicy messageAuthorizationPolicy;
+    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
+    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
+    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
+    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
+    private final List<Service> services = new ArrayList<Service>();
+    private transient Thread shutdownHook;
+    private String[] transportConnectorURIs;
+    private String[] networkConnectorURIs;
+    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
+    // to other jms messaging systems
+    private boolean deleteAllMessagesOnStartup;
+    private boolean advisorySupport = true;
+    private URI vmConnectorURI;
+    private String defaultSocketURIString;
+    private PolicyMap destinationPolicy;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
+    private BrokerPlugin[] plugins;
+    private boolean keepDurableSubsActive = true;
+    private boolean useVirtualTopics = true;
+    private boolean useMirroredQueues = false;
+    private boolean useTempMirroredQueues = true;
+    private BrokerId brokerId;
+    private volatile DestinationInterceptor[] destinationInterceptors;
+    private ActiveMQDestination[] destinations;
+    private PListStore tempDataStore;
+    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
+    private boolean useLocalHostBrokerName;
+    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
+    private final CountDownLatch startedLatch = new CountDownLatch(1);
+    private Broker regionBroker;
+    private int producerSystemUsagePortion = 60;
+    private int consumerSystemUsagePortion = 40;
+    private boolean splitSystemUsageForProducersConsumers;
+    private boolean monitorConnectionSplits = false;
+    private int taskRunnerPriority = Thread.NORM_PRIORITY;
+    private boolean dedicatedTaskRunner;
+    private boolean cacheTempDestinations = false;// useful for failover
+    private int timeBeforePurgeTempDestinations = 5000;
+    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
+    private boolean systemExitOnShutdown;
+    private int systemExitOnShutdownExitCode;
+    private SslContext sslContext;
+    private boolean forceStart = false;
+    private IOExceptionHandler ioExceptionHandler;
+    private boolean schedulerSupport = false;
+    private File schedulerDirectoryFile;
+    private Scheduler scheduler;
+    private ThreadPoolExecutor executor;
+    private int schedulePeriodForDestinationPurge= 0;
+    private int maxPurgedDestinationsPerSweep = 0;
+    private int schedulePeriodForDiskUsageCheck = 0;
+    private boolean diskUsageCheckRegrowPercentChange = true;
+    private BrokerContext brokerContext;
+    private boolean networkConnectorStartAsync = false;
+    private boolean allowTempAutoCreationOnSend;
+    private JobSchedulerStore jobSchedulerStore;
+    private final AtomicLong totalConnections = new AtomicLong();
+    private final AtomicInteger currentConnections = new AtomicInteger();
+
+    private long offlineDurableSubscriberTimeout = -1;
+    private long offlineDurableSubscriberTaskSchedule = 300000;
+    private DestinationFilter virtualConsumerDestinationFilter;
+
+    private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false);
+    private Throwable startException = null;
+    private boolean startAsync = false;
+    private Date startDate;
+    private boolean slave = true;
+
+    private boolean restartAllowed = true;
+    private boolean restartRequested = false;
+    private boolean rejectDurableConsumers = false;
+
+    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
+
+    static {
+
+        try {
+            ClassLoader loader = BrokerService.class.getClassLoader();
+            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
+            Provider bouncycastle = (Provider) clazz.newInstance();
+            Security.insertProviderAt(bouncycastle, 2);
+            LOG.info("Loaded the Bouncy Castle security provider.");
+        } catch(Throwable e) {
+            // No BouncyCastle found so we use the default Java Security Provider
+        }
+
+        String localHostName = "localhost";
+        try {
+            localHostName =  InetAddressUtil.getLocalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error("Failed to resolve localhost");
+        }
+        LOCAL_HOST_NAME = localHostName;
+
+        String version = null;
+        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
+            if (in != null) {
+                try(InputStreamReader isr = new InputStreamReader(in);
+                    BufferedReader reader = new BufferedReader(isr)) {
+                    version = reader.readLine();
+                }
+            }
+        } catch (IOException ie) {
+            LOG.warn("Error reading broker version ", ie);
+        }
+        BROKER_VERSION = version;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerService[" + getBrokerName() + "]";
+    }
+
+    private String getBrokerVersion() {
+        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
+        if (version == null) {
+            version = BROKER_VERSION;
+        }
+
+        return version;
+    }
+
+    /**
+     * Adds a new transport connector for the given bind address
+     *
+     * @return the newly created and added transport connector
+     * @throws Exception
+     */
+    public TransportConnector addConnector(String bindAddress) throws Exception {
+        return addConnector(new URI(bindAddress));
+    }
+
+    /**
+     * Adds a new transport connector for the given bind address
+     *
+     * @return the newly created and added transport connector
+     * @throws Exception
+     */
+    public TransportConnector addConnector(URI bindAddress) throws Exception {
+        return addConnector(createTransportConnector(bindAddress));
+    }
+
+    /**
+     * Adds a new transport connector for the given TransportServer transport
+     *
+     * @return the newly created and added transport connector
+     * @throws Exception
+     */
+    public TransportConnector addConnector(TransportServer transport) throws Exception {
+        return addConnector(new TransportConnector(transport));
+    }
+
+    /**
+     * Adds a new transport connector
+     *
+     * @return the transport connector
+     * @throws Exception
+     */
+    public TransportConnector addConnector(TransportConnector connector) throws Exception {
+        transportConnectors.add(connector);
+        return connector;
+    }
+
+    /**
+     * Stops and removes a transport connector from the broker.
+     *
+     * @param connector
+     * @return true if the connector has been previously added to the broker
+     * @throws Exception
+     */
+    public boolean removeConnector(TransportConnector connector) throws Exception {
+        boolean rc = transportConnectors.remove(connector);
+        if (rc) {
+            unregisterConnectorMBean(connector);
+        }
+        return rc;
+    }
+
+    /**
+     * Adds a new network connector using the given discovery address
+     *
+     * @return the newly created and added network connector
+     * @throws Exception
+     */
+    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
+        return addNetworkConnector(new URI(discoveryAddress));
+    }
+
+    /**
+     * Adds a new proxy connector using the given bind address
+     *
+     * @return the newly created and added network connector
+     * @throws Exception
+     */
+    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
+        return addProxyConnector(new URI(bindAddress));
+    }
+
+    /**
+     * Adds a new network connector using the given discovery address
+     *
+     * @return the newly created and added network connector
+     * @throws Exception
+     */
+    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
+        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
+        return addNetworkConnector(connector);
+    }
+
+    /**
+     * Adds a new proxy connector using the given bind address
+     *
+     * @return the newly created and added network connector
+     * @throws Exception
+     */
+    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
+        ProxyConnector connector = new ProxyConnector();
+        connector.setBind(bindAddress);
+        connector.setRemote(new URI("fanout:multicast://default"));
+        return addProxyConnector(connector);
+    }
+
+    /**
+     * Adds a new network connector to connect this broker to a federated
+     * network
+     */
+    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
+        connector.setBrokerService(this);
+        connector.setLocalUri(getVmConnectorURI());
+        // Set a connection filter so that the connector does not establish loop
+        // back connections.
+        connector.setConnectionFilter(new ConnectionFilter() {
+            @Override
+            public boolean connectTo(URI location) {
+                List<TransportConnector> transportConnectors = getTransportConnectors();
+                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
+                    try {
+                        TransportConnector tc = iter.next();
+                        if (location.equals(tc.getConnectUri())) {
+                            return false;
+                        }
+                    } catch (Throwable e) {
+                    }
+                }
+                return true;
+            }
+        });
+        networkConnectors.add(connector);
+        return connector;
+    }
+
+    /**
+     * Removes the given network connector without stopping it. The caller
+     * should call {@link NetworkConnector#stop()} to close the connector
+     */
+    public boolean removeNetworkConnector(NetworkConnector connector) {
+        boolean answer = networkConnectors.remove(connector);
+        if (answer) {
+            unregisterNetworkConnectorMBean(connector);
+        }
+        return answer;
+    }
+
+    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
+        URI uri = getVmConnectorURI();
+        connector.setLocalUri(uri);
+        proxyConnectors.add(connector);
+        if (isUseJmx()) {
+            registerProxyConnectorMBean(connector);
+        }
+        return connector;
+    }
+
+    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
+        connector.setBrokerService(this);
+        jmsConnectors.add(connector);
+        if (isUseJmx()) {
+            registerJmsConnectorMBean(connector);
+        }
+        return connector;
+    }
+
+    public JmsConnector removeJmsConnector(JmsConnector connector) {
+        if (jmsConnectors.remove(connector)) {
+            return connector;
+        }
+        return null;
+    }
+
+    public void masterFailed() {
+        if (shutdownOnMasterFailure) {
+            LOG.error("The Master has failed ... shutting down");
+            try {
+                stop();
+            } catch (Exception e) {
+                LOG.error("Failed to stop for master failure", e);
+            }
+        } else {
+            LOG.warn("Master Failed - starting all connectors");
+            try {
+                startAllConnectors();
+                broker.nowMasterBroker();
+            } catch (Exception e) {
+                LOG.error("Failed to startAllConnectors", e);
+            }
+        }
+    }
+
+    public String getUptime() {
+        long delta = getUptimeMillis();
+
+        if (delta == 0) {
+            return "not started";
+        }
+
+        return TimeUtils.printDuration(delta);
+    }
+
+    public long getUptimeMillis() {
+        if (startDate == null) {
+            return 0;
+        }
+
+        return new Date().getTime() - startDate.getTime();
+    }
+
+    public boolean isStarted() {
+        return started.get() && startedLatch.getCount() == 0;
+    }
+
+    /**
+     * Forces a start of the broker.
+     * By default a BrokerService instance that was
+     * previously stopped using BrokerService.stop() cannot be restarted
+     * using BrokerService.start().
+     * This method enforces a restart.
+     * It is not recommended to force a restart of the broker and will not work
+     * for most but some very trivial broker configurations.
+     * For restarting a broker instance we recommend to first call stop() on
+     * the old instance and then recreate a new BrokerService instance.
+     *
+     * @param force - if true enforces a restart.
+     * @throws Exception
+     */
+    public void start(boolean force) throws Exception {
+        forceStart = force;
+        stopped.set(false);
+        started.set(false);
+        start();
+    }
+
+    // Service interface
+    // -------------------------------------------------------------------------
+
+    protected boolean shouldAutostart() {
+        return true;
+    }
+
+    /**
+     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
+     *
+     * delegates to autoStart, done to prevent backwards incompatible signature change
+     */
+    @PostConstruct
+    private void postConstruct() {
+        try {
+            autoStart();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     * @org. apache.xbean.InitMethod
+     */
+    public void autoStart() throws Exception {
+        if(shouldAutostart()) {
+            start();
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (stopped.get() || !started.compareAndSet(false, true)) {
+            // lets just ignore redundant start() calls
+            // as its way too easy to not be completely sure if start() has been
+            // called or not with the gazillion of different configuration
+            // mechanisms
+            // throw new IllegalStateException("Already started.");
+            return;
+        }
+
+        stopping.set(false);
+        startDate = new Date();
+        MDC.put("activemq.broker", brokerName);
+
+        try {
+            if (systemExitOnShutdown && useShutdownHook) {
+                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
+            }
+            processHelperProperties();
+            if (isUseJmx()) {
+                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
+                // we cannot cleanup clear that during shutdown of the broker.
+                MDC.remove("activemq.broker");
+                try {
+                    startManagementContext();
+                    for (NetworkConnector connector : getNetworkConnectors()) {
+                        registerNetworkConnectorMBean(connector);
+                    }
+                } finally {
+                    MDC.put("activemq.broker", brokerName);
+                }
+            }
+
+            // in jvm master slave, lets not publish over existing broker till we get the lock
+            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
+            if (brokerRegistry.lookup(getBrokerName()) == null) {
+                brokerRegistry.bind(getBrokerName(), BrokerService.this);
+            }
+            startPersistenceAdapter(startAsync);
+            startBroker(startAsync);
+            brokerRegistry.bind(getBrokerName(), BrokerService.this);
+        } catch (Exception e) {
+            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
+            try {
+                if (!stopped.get()) {
+                    stop();
+                }
+            } catch (Exception ex) {
+                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
+            }
+            throw e;
+        } finally {
+            MDC.remove("activemq.broker");
+        }
+    }
+
+    private void startPersistenceAdapter(boolean async) throws Exception {
+        if (async) {
+            new Thread("Persistence Adapter Starting Thread") {
+                @Override
+                public void run() {
+                    try {
+                        doStartPersistenceAdapter();
+                    } catch (Throwable e) {
+                        startException = e;
+                    } finally {
+                        synchronized (persistenceAdapterStarted) {
+                            persistenceAdapterStarted.set(true);
+                            persistenceAdapterStarted.notifyAll();
+                        }
+                    }
+                }
+            }.start();
+        } else {
+            doStartPersistenceAdapter();
+        }
+    }
+
+    private void doStartPersistenceAdapter() throws Exception {
+        getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
+        getPersistenceAdapter().setBrokerName(getBrokerName());
+        LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter());
+        if (deleteAllMessagesOnStartup) {
+            deleteAllMessages();
+        }
+        getPersistenceAdapter().start();
+
+        getJobSchedulerStore();
+        if (jobSchedulerStore != null) {
+            try {
+                jobSchedulerStore.start();
+            } catch (Exception e) {
+                RuntimeException exception = new RuntimeException(
+                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
+                LOG.error(exception.getLocalizedMessage(), e);
+                throw exception;
+            }
+        }
+    }
+
+    private void startBroker(boolean async) throws Exception {
+        if (async) {
+            new Thread("Broker Starting Thread") {
+                @Override
+                public void run() {
+                    try {
+                        synchronized (persistenceAdapterStarted) {
+                            if (!persistenceAdapterStarted.get()) {
+                                persistenceAdapterStarted.wait();
+                            }
+                        }
+                        doStartBroker();
+                    } catch (Throwable t) {
+                        startException = t;
+                    }
+                }
+            }.start();
+        } else {
+            doStartBroker();
+        }
+    }
+
+    private void doStartBroker() throws Exception {
+        if (startException != null) {
+            return;
+        }
+        startDestinations();
+        addShutdownHook();
+
+        broker = getBroker();
+        brokerId = broker.getBrokerId();
+
+        // need to log this after creating the broker so we have its id and name
+        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
+        broker.start();
+
+        if (isUseJmx()) {
+            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
+                // try to restart management context
+                // typical for slaves that use the same ports as master
+                managementContext.stop();
+                startManagementContext();
+            }
+            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
+            managedBroker.setContextBroker(broker);
+            adminView.setBroker(managedBroker);
+        }
+
+        if (ioExceptionHandler == null) {
+            setIoExceptionHandler(new DefaultIOExceptionHandler());
+        }
+
+        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
+            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
+            Log4JConfigView log4jConfigView = new Log4JConfigView();
+            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
+        }
+
+        startAllConnectors();
+
+        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
+        LOG.info("For help or more information please see: http://activemq.apache.org");
+
+        getBroker().brokerServiceStarted();
+        checkSystemUsageLimits();
+        startedLatch.countDown();
+        getBroker().nowMasterBroker();
+    }
+
+    /**
+     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
+     *
+     * delegates to stop, done to prevent backwards incompatible signature change
+     */
+    @PreDestroy
+    private void preDestroy () {
+        try {
+            stop();
+        } catch (Exception ex) {
+            throw new RuntimeException();
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     * @org.apache .xbean.DestroyMethod
+     */
+    @Override
+    public void stop() throws Exception {
+        if (!stopping.compareAndSet(false, true)) {
+            LOG.trace("Broker already stopping/stopped");
+            return;
+        }
+
+        MDC.put("activemq.broker", brokerName);
+
+        if (systemExitOnShutdown) {
+            new Thread() {
+                @Override
+                public void run() {
+                    System.exit(systemExitOnShutdownExitCode);
+                }
+            }.start();
+        }
+
+        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
+
+        removeShutdownHook();
+        if (this.scheduler != null) {
+            this.scheduler.stop();
+            this.scheduler = null;
+        }
+        ServiceStopper stopper = new ServiceStopper();
+        if (services != null) {
+            for (Service service : services) {
+                stopper.stop(service);
+            }
+        }
+        stopAllConnectors(stopper);
+        this.slave = true;
+        // remove any VMTransports connected
+        // this has to be done after services are stopped,
+        // to avoid timing issue with discovery (spinning up a new instance)
+        BrokerRegistry.getInstance().unbind(getBrokerName());
+        VMTransportFactory.stopped(getBrokerName());
+        if (broker != null) {
+            stopper.stop(broker);
+            broker = null;
+        }
+
+        if (jobSchedulerStore != null) {
+            jobSchedulerStore.stop();
+            jobSchedulerStore = null;
+        }
+        if (tempDataStore != null) {
+            tempDataStore.stop();
+            tempDataStore = null;
+        }
+        try {
+            stopper.stop(persistenceAdapter);
+            persistenceAdapter = null;
+            if (isUseJmx()) {
+                stopper.stop(getManagementContext());
+                managementContext = null;
+            }
+            // Clear SelectorParser cache to free memory
+            SelectorParser.clearCache();
+        } finally {
+            started.set(false);
+            stopped.set(true);
+            stoppedLatch.countDown();
+        }
+
+        if (this.taskRunnerFactory != null) {
+            this.taskRunnerFactory.shutdown();
+            this.taskRunnerFactory = null;
+        }
+        if (this.executor != null) {
+            ThreadPoolUtils.shutdownNow(executor);
+            this.executor = null;
+        }
+
+        this.destinationInterceptors = null;
+        this.destinationFactory = null;
+
+        if (startDate != null) {
+            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
+        }
+        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
+
+        synchronized (shutdownHooks) {
+            for (Runnable hook : shutdownHooks) {
+                try {
+                    hook.run();
+                } catch (Throwable e) {
+                    stopper.onException(hook, e);
+                }
+            }
+        }
+
+        MDC.remove("activemq.broker");
+
+        // and clear start date
+        startDate = null;
+
+        stopper.throwFirstException();
+    }
+
+    public boolean checkQueueSize(String queueName) {
+        long count = 0;
+        long queueSize = 0;
+        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
+        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
+            if (entry.getKey().isQueue()) {
+                if (entry.getValue().getName().matches(queueName)) {
+                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
+                    count += queueSize;
+                    if (queueSize > 0) {
+                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
+                    }
+                }
+            }
+        }
+        return count == 0;
+    }
+
+    /**
+     * This method (both connectorName and queueName are using regex to match)
+     * 1. stop the connector (supposed the user input the connector which the
+     * clients connect to) 2. to check whether there is any pending message on
+     * the queues defined by queueName 3. supposedly, after stop the connector,
+     * client should failover to other broker and pending messages should be
+     * forwarded. if no pending messages, the method finally call stop to stop
+     * the broker.
+     *
+     * @param connectorName
+     * @param queueName
+     * @param timeout
+     * @param pollInterval
+     * @throws Exception
+     */
+    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
+        if (isUseJmx()) {
+            if (connectorName == null || queueName == null || timeout <= 0) {
+                throw new Exception(
+                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
+            }
+            if (pollInterval <= 0) {
+                pollInterval = 30;
+            }
+            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
+                    connectorName, queueName, timeout, pollInterval
+            });
+            TransportConnector connector;
+            for (int i = 0; i < transportConnectors.size(); i++) {
+                connector = transportConnectors.get(i);
+                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
+                    connector.stop();
+                }
+            }
+            long start = System.currentTimeMillis();
+            while (System.currentTimeMillis() - start < timeout * 1000) {
+                // check quesize until it gets zero
+                if (checkQueueSize(queueName)) {
+                    stop();
+                    break;
+                } else {
+                    Thread.sleep(pollInterval * 1000);
+                }
+            }
+            if (stopped.get()) {
+                LOG.info("Successfully stop the broker.");
+            } else {
+                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
+            }
+        }
+    }
+
+    /**
+     * A helper method to block the caller thread until the broker has been
+     * stopped
+     */
+    public void waitUntilStopped() {
+        while (isStarted() && !stopped.get()) {
+            try {
+                stoppedLatch.await();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    public boolean isStopped() {
+        return stopped.get();
+    }
+
+    /**
+     * A helper method to block the caller thread until the broker has fully started
+     * @return boolean true if wait succeeded false if broker was not started or was stopped
+     */
+    public boolean waitUntilStarted() {
+        return waitUntilStarted(DEFAULT_START_TIMEOUT);
+    }
+
+    /**
+     * A helper method to block the caller thread until the broker has fully started
+     *
+     * @param timeout
+     *        the amount of time to wait before giving up and returning false.
+     *
+     * @return boolean true if wait succeeded false if broker was not started or was stopped
+     */
+    public boolean waitUntilStarted(long timeout) {
+        boolean waitSucceeded = isStarted();
+        long expiration = Math.max(0, timeout + System.currentTimeMillis());
+        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
+            try {
+                if (startException != null) {
+                    return waitSucceeded;
+                }
+                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        return waitSucceeded;
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+    /**
+     * Returns the message broker
+     */
+    public Broker getBroker() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        return broker;
+    }
+
+    /**
+     * Returns the administration view of the broker; used to create and destroy
+     * resources such as queues and topics. Note this method returns null if JMX
+     * is disabled.
+     */
+    public BrokerView getAdminView() throws Exception {
+        if (adminView == null) {
+            // force lazy creation
+            getBroker();
+        }
+        return adminView;
+    }
+
+    public void setAdminView(BrokerView adminView) {
+        this.adminView = adminView;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    /**
+     * Sets the name of this broker; which must be unique in the network
+     *
+     * @param brokerName
+     */
+    public void setBrokerName(String brokerName) {
+        if (brokerName == null) {
+            throw new NullPointerException("The broker name cannot be null");
+        }
+        String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
+        if (!str.equals(brokerName)) {
+            LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str);
+        }
+        this.brokerName = str.trim();
+    }
+
+    public PersistenceAdapterFactory getPersistenceFactory() {
+        return persistenceFactory;
+    }
+
+    public File getDataDirectoryFile() {
+        if (dataDirectoryFile == null) {
+            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
+        }
+        return dataDirectoryFile;
+    }
+
+    public File getBrokerDataDirectory() {
+        String brokerDir = getBrokerName();
+        return new File(getDataDirectoryFile(), brokerDir);
+    }
+
+    /**
+     * Sets the directory in which the data files will be stored by default for
+     * the JDBC and Journal persistence adaptors.
+     *
+     * @param dataDirectory
+     *            the directory to store data files
+     */
+    public void setDataDirectory(String dataDirectory) {
+        setDataDirectoryFile(new File(dataDirectory));
+    }
+
+    /**
+     * Sets the directory in which the data files will be stored by default for
+     * the JDBC and Journal persistence adaptors.
+     *
+     * @param dataDirectoryFile
+     *            the directory to store data files
+     */
+    public void setDataDirectoryFile(File dataDirectoryFile) {
+        this.dataDirectoryFile = dataDirectoryFile;
+    }
+
+    /**
+     * @return the tmpDataDirectory
+     */
+    public File getTmpDataDirectory() {
+        if (tmpDataDirectory == null) {
+            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
+        }
+        return tmpDataDirectory;
+    }
+
+    /**
+     * @param tmpDataDirectory
+     *            the tmpDataDirectory to set
+     */
+    public void setTmpDataDirectory(File tmpDataDirectory) {
+        this.tmpDataDirectory = tmpDataDirectory;
+    }
+
+    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
+        this.persistenceFactory = persistenceFactory;
+    }
+
+    public void setDestinationFactory(DestinationFactory destinationFactory) {
+        this.destinationFactory = destinationFactory;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    /**
+     * Sets whether or not persistence is enabled or disabled.
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
+     */
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public boolean isPopulateJMSXUserID() {
+        return populateJMSXUserID;
+    }
+
+    /**
+     * Sets whether or not the broker should populate the JMSXUserID header.
+     */
+    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
+        this.populateJMSXUserID = populateJMSXUserID;
+    }
+
+    public SystemUsage getSystemUsage() {
+        try {
+            if (systemUsage == null) {
+
+                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
+                systemUsage.setExecutor(getExecutor());
+                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
+                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
+                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
+                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
+                addService(this.systemUsage);
+            }
+            return systemUsage;
+        } catch (IOException e) {
+            LOG.error("Cannot create SystemUsage", e);
+            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
+        }
+    }
+
+    public void setSystemUsage(SystemUsage memoryManager) {
+        if (this.systemUsage != null) {
+            removeService(this.systemUsage);
+        }
+        this.systemUsage = memoryManager;
+        if (this.systemUsage.getExecutor()==null) {
+            this.systemUsage.setExecutor(getExecutor());
+        }
+        addService(this.systemUsage);
+    }
+
+    /**
+     * @return the consumerUsageManager
+     * @throws IOException
+     */
+    public SystemUsage getConsumerSystemUsage() throws IOException {
+        if (this.consumerSystemUsaage == null) {
+            if (splitSystemUsageForProducersConsumers) {
+                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
+                float portion = consumerSystemUsagePortion / 100f;
+                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
+                addService(this.consumerSystemUsaage);
+            } else {
+                consumerSystemUsaage = getSystemUsage();
+            }
+        }
+        return this.consumerSystemUsaage;
+    }
+
+    /**
+     * @param consumerSystemUsaage
+     *            the storeSystemUsage to set
+     */
+    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
+        if (this.consumerSystemUsaage != null) {
+            removeService(this.consumerSystemUsaage);
+        }
+        this.consumerSystemUsaage = consumerSystemUsaage;
+        addService(this.consumerSystemUsaage);
+    }
+
+    /**
+     * @return the producerUsageManager
+     * @throws IOException
+     */
+    public SystemUsage getProducerSystemUsage() throws IOException {
+        if (producerSystemUsage == null) {
+            if (splitSystemUsageForProducersConsumers) {
+                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
+                float portion = producerSystemUsagePortion / 100f;
+                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
+                addService(producerSystemUsage);
+            } else {
+                producerSystemUsage = getSystemUsage();
+            }
+        }
+        return producerSystemUsage;
+    }
+
+    /**
+     * @param producerUsageManager
+     *            the producerUsageManager to set
+     */
+    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
+        if (this.producerSystemUsage != null) {
+            removeService(this.producerSystemUsage);
+        }
+        this.producerSystemUsage = producerUsageManager;
+        addService(this.producerSystemUsage);
+    }
+
+    public PersistenceAdapter getPersistenceAdapter() throws IOException {
+        if (persistenceAdapter == null) {
+            persistenceAdapter = createPersistenceAdapter();
+            configureService(persistenceAdapter);
+            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
+        }
+        return persistenceAdapter;
+    }
+
+    /**
+     * Sets the persistence adaptor implementation to use for this broker
+     *
+     * @throws IOException
+     */
+    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
+        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
+            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
+            return;
+        }
+        this.persistenceAdapter = persistenceAdapter;
+        configureService(this.persistenceAdapter);
+        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
+    }
+
+    public TaskRunnerFactory getTaskRunnerFactory() {
+        if (this.taskRunnerFactory == null) {
+            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
+                    isDedicatedTaskRunner());
+            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
+        }
+        return this.taskRunnerFactory;
+    }
+
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+        this.taskRunnerFactory = taskRunnerFactory;
+    }
+
+    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
+        if (taskRunnerFactory == null) {
+            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
+                    true, 1000, isDedicatedTaskRunner());
+        }
+        return persistenceTaskRunnerFactory;
+    }
+
+    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
+        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
+    }
+
+    public boolean isUseJmx() {
+        return useJmx;
+    }
+
+    public boolean isEnableStatistics() {
+        return enableStatistics;
+    }
+
+    /**
+     * Sets whether or not the Broker's services enable statistics or not.
+     */
+    public void setEnableStatistics(boolean enableStatistics) {
+        this.enableStatistics = enableStatistics;
+    }
+
+    /**
+     * Sets whether or not the Broker's services should be exposed into JMX or
+     * not.
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
+     */
+    public void setUseJmx(boolean useJmx) {
+        this.useJmx = useJmx;
+    }
+
+    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
+        if (brokerObjectName == null) {
+            brokerObjectName = createBrokerObjectName();
+        }
+        return brokerObjectName;
+    }
+
+    /**
+     * Sets the JMX ObjectName for this broker
+     */
+    public void setBrokerObjectName(ObjectName brokerObjectName) {
+        this.brokerObjectName = brokerObjectName;
+    }
+
+    public ManagementContext getManagementContext() {
+        if (managementContext == null) {
+            managementContext = new ManagementContext();
+        }
+        return managementContext;
+    }
+
+    public void setManagementContext(ManagementContext managementContext) {
+        this.managementContext = managementContext;
+    }
+
+    public NetworkConnector getNetworkConnectorByName(String connectorName) {
+        for (NetworkConnector connector : networkConnectors) {
+            if (connector.getName().equals(connectorName)) {
+                return connector;
+            }
+        }
+        return null;
+    }
+
+    public String[] getNetworkConnectorURIs() {
+        return networkConnectorURIs;
+    }
+
+    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
+        this.networkConnectorURIs = networkConnectorURIs;
+    }
+
+    public TransportConnector getConnectorByName(String connectorName) {
+        for (TransportConnector connector : transportConnectors) {
+            if (connector.getName().equals(connectorName)) {
+                return connector;
+            }
+        }
+        return null;
+    }
+
+    public Map<String, String> getTransportConnectorURIsAsMap() {
+        Map<String, String> answer = new HashMap<String, String>();
+        for (TransportConnector connector : transportConnectors) {
+            try {
+                URI uri = connector.getConnectUri();
+                if (uri != null) {
+                    String scheme = uri.getScheme();
+                    if (scheme != null) {
+                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
+                    }
+                }
+            } catch (Exception e) {
+                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
+            }
+        }
+        return answer;
+    }
+
+    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
+        ProducerBrokerExchange result = null;
+
+        for (TransportConnector connector : transportConnectors) {
+            for (TransportConnection tc: connector.getConnections()){
+                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
+                if (result !=null){
+                    return result;
+                }
+            }
+        }
+        return result;
+    }
+
+    public String[] getTransportConnectorURIs() {
+        return transportConnectorURIs;
+    }
+
+    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
+        this.transportConnectorURIs = transportConnectorURIs;
+    }
+
+    /**
+     * @return Returns the jmsBridgeConnectors.
+     */
+    public JmsConnector[] getJmsBridgeConnectors() {
+        return jmsBridgeConnectors;
+    }
+
+    /**
+     * @param jmsConnectors
+     *            The jmsBridgeConnectors to set.
+     */
+    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
+        this.jmsBridgeConnectors = jmsConnectors;
+    }
+
+    public Service[] getServices() {
+        return services.toArray(new Service[0]);
+    }
+
+    /**
+     * Sets the services associated with this broker.
+     */
+    public void setServices(Service[] services) {
+        this.services.clear();
+        if (services != null) {
+            for (int i = 0; i < services.length; i++) {
+                this.services.add(services[i]);
+            }
+        }
+    }
+
+    /**
+     * Adds a new service so that it will be started as part of the broker
+     * lifecycle
+     */
+    public void addService(Service service) {
+        services.add(service);
+    }
+
+    public void removeService(Service service) {
+        services.remove(service);
+    }
+
+    public boolean isUseLoggingForShutdownErrors() {
+        return useLoggingForShutdownErrors;
+    }
+
+    /**
+     * Sets whether or not we should use commons-logging when reporting errors
+     * when shutting down the broker
+     */
+    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
+        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
+    }
+
+    public boolean isUseShutdownHook() {
+        return useShutdownHook;
+    }
+
+    /**
+     * Sets whether or not we should use a shutdown handler to close down the
+     * broker cleanly if the JVM is terminated. It is recommended you leave this
+     * enabled.
+     */
+    public void setUseShutdownHook(boolean useShutdownHook) {
+        this.useShutdownHook = useShutdownHook;
+    }
+
+    public boolean isAdvisorySupport() {
+        return advisorySupport;
+    }
+
+    /**
+     * Allows the support of advisory messages to be disabled for performance
+     * reasons.
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
+     */
+    public void setAdvisorySupport(boolean advisorySupport) {
+        this.advisorySupport = advisorySupport;
+    }
+
+    public List<TransportConnector> getTransportConnectors() {
+        return new ArrayList<TransportConnector>(transportConnectors);
+    }
+
+    /**
+     * Sets the transport connectors which this broker will listen on for new
+     * clients
+     *
+     * @org.apache.xbean.Property
+     *                            nestedType="org.apache.activemq.broker.TransportConnector"
+     */
+    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
+        for (TransportConnector connector : transportConnectors) {
+            addConnector(connector);
+        }
+    }
+
+    public TransportConnector getTransportConnectorByName(String name){
+        for (TransportConnector transportConnector : transportConnectors){
+           if (name.equals(transportConnector.getName())){
+               return transportConnector;
+           }
+        }
+        return null;
+    }
+
+    public TransportConnector getTransportConnectorByScheme(String scheme){
+        for (TransportConnector transportConnector : transportConnectors){
+            if (scheme.equals(transportConnector.getUri().getScheme())){
+                return transportConnector;
+            }
+        }
+        return null;
+    }
+
+    public List<NetworkConnector> getNetworkConnectors() {
+        return new ArrayList<NetworkConnector>(networkConnectors);
+    }
+
+    public List<ProxyConnector> getProxyConnectors() {
+        return new ArrayList<ProxyConnector>(proxyConnectors);
+    }
+
+    /**
+     * Sets the network connectors which this broker will use to connect to
+     * other brokers in a federated network
+     *
+     * @org.apache.xbean.Property
+     *                            nestedType="org.apache.activemq.network.NetworkConnector"
+     */
+    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
+        for (Object connector : networkConnectors) {
+            addNetworkConnector((NetworkConnector) connector);
+        }
+    }
+
+    /**
+     * Sets the network connectors which this broker will use to connect to
+     * other brokers in a federated network
+     */
+    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
+        for (Object connector : proxyConnectors) {
+            addProxyConnector((ProxyConnector) connector);
+        }
+    }
+
+    public PolicyMap getDestinationPolicy() {
+        return destinationPolicy;
+    }
+
+    /**
+     * Sets the destination specific policies available either for exact
+     * destinations or for wildcard areas of destinations.
+     */
+    public void setDestinationPolicy(PolicyMap policyMap) {
+        this.destinationPolicy = policyMap;
+    }
+
+    public BrokerPlugin[] getPlugins() {
+        return plugins;
+    }
+
+    /**
+     * Sets a number of broker plugins to install such as for security
+     * authentication or authorization
+     */
+    public void setPlugins(BrokerPlugin[] plugins) {
+        this.plugins = plugins;
+    }
+
+    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
+        return messageAuthorizationPolicy;
+    }
+
+    /**
+     * Sets the policy used to decide if the current connection is authorized to
+     * consume a given message
+     */
+    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
+        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
+    }
+
+    /**
+     * Delete all messages from the persistent store
+     *
+     * @throws IOException
+     */
+    public void deleteAllMessages() throws IOException {
+        getPersistenceAdapter().deleteAllMessages();
+    }
+
+    public boolean isDeleteAllMessagesOnStartup() {
+        return deleteAllMessagesOnStartup;
+    }
+
+    /**
+     * Sets whether or not all messages are deleted on startup - mostly only
+     * useful for testing.
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
+     */
+    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
+        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
+    }
+
+    public URI getVmConnectorURI() {
+        if (vmConnectorURI == null) {
+            try {
+                vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
+            } catch (URISyntaxException e) {
+                LOG.error("Badly formed URI from {}", getBrokerName(), e);
+            }
+        }
+        return vmConnectorURI;
+    }
+
+    public void setVmConnectorURI(URI vmConnectorURI) {
+        this.vmConnectorURI = vmConnectorURI;
+    }
+
+    public String getDefaultSocketURIString() {
+        if (started.get()) {
+            if (this.defaultSocketURIString == null) {
+                for (TransportConnector tc:this.transportConnectors) {
+                    String result = null;
+                    try {
+                        result = tc.getPublishableConnectString();
+                    } catch (Exception e) {
+                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
+                    }
+                    if (result != null) {
+                        // find first publishable uri
+                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+                            this.defaultSocketURIString = result;
+                            break;
+                        } else {
+                        // or use the first defined
+                            if (this.defaultSocketURIString == null) {
+                                this.defaultSocketURIString = result;
+                            }
+                        }
+                    }
+                }
+
+            }
+            return this.defaultSocketURIString;
+        }
+       return null;
+    }
+
+    /**
+     * @return Returns the shutdownOnMasterFailure.
+     */
+    public boolean isShutdownOnMasterFailure() {
+        return shutdownOnMasterFailure;
+    }
+
+    /**
+     * @param shutdownOnMasterFailure
+     *            The shutdownOnMasterFailure to set.
+     */
+    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
+        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
+
+    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
+    }
+
+    public boolean isUseVirtualTopics() {
+        return useVirtualTopics;
+    }
+
+    /**
+     * Sets whether or not <a
+     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
+     * Topics</a> should be supported by default if they have not been
+     * explicitly configured.
+     */
+    public void setUseVirtualTopics(boolean useVirtualTopics) {
+        this.useVirtualTopics = useVirtualTopics;
+    }
+
+    public DestinationInterceptor[] getDestinationInterceptors() {
+        return destinationInterceptors;
+    }
+
+    public boolean isUseMirroredQueues() {
+        return useMirroredQueues;
+    }
+
+    /**
+     * Sets whether or not <a
+     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
+     * Queues</a> should be supported by default if they have not been
+     * explicitly configured.
+     */
+    public void setUseMirroredQueues(boolean useMirroredQueues) {
+        this.useMirroredQueues = useMirroredQueues;
+    }
+
+    /**
+     * Sets the destination interceptors to use
+     */
+    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
+        this.destinationInterceptors = destinationInterceptors;
+    }
+
+    public ActiveMQDestination[] getDestinations() {
+        return destinations;
+    }
+
+    /**
+     * Sets the destinations which should be loaded/created on startup
+     */
+    public void setDestinations(ActiveMQDestination[] destinations) {
+        this.destinations = destinations;
+    }
+
+    /**
+     * @return the tempDataStore
+     */
+    public synchronized PListStore getTempDataStore() {
+        if (tempDataStore == null) {
+            if (!isPersistent()) {
+                return null;
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
+                if( pa!=null && pa instanceof PListStore) {
+                    return (PListStore) pa;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            boolean result = true;
+            boolean empty = true;
+            try {
+                File directory = getTmpDataDirectory();
+                if (directory.exists() && directory.isDirectory()) {
+                    File[] files = directory.listFiles();
+                    if (files != null && files.length > 0) {
+                        empty = false;
+                        for (int i = 0; i < files.length; i++) {
+                            File file = files[i];
+                            if (!file.isDirectory()) {
+                                result &= file.delete();
+                            }
+                        }
+                    }
+                }
+                if (!empty) {
+                    String str = result ? "Successfully deleted" : "Failed to delete";
+                    LOG.info("{} temporary storage", str);
+                }
+
+                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
+                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+                this.tempDataStore.setDirectory(getTmpDataDirectory());
+                configureService(tempDataStore);
+                this.tempDataStore.start();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return tempDataStore;
+    }
+
+    /**
+     * @param tempDataStore
+     *            the tempDataStore to set
+     */
+    public void setTempDataStore(PListStore tempDataStore) {
+        this.tempDataStore = tempDataStore;
+        configureService(tempDataStore);
+        try {
+            tempDataStore.start();
+        } catch (Exception e) {
+            RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
+            LOG.error(exception.getLocalizedMessage(), e);
+            throw exception;
+        }
+    }
+
+    public int getPersistenceThreadPriority() {
+        return persistenceThreadPriority;
+    }
+
+    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
+        this.persistenceThreadPriority = persistenceThreadPriority;
+    }
+
+    /**
+     * @return the useLocalHostBrokerName
+     */
+    public boolean isUseLocalHostBrokerName() {
+        return this.useLocalHostBrokerName;
+    }
+
+    /**
+     * @param useLocalHostBrokerName
+     *            the useLocalHostBrokerName to set
+     */
+    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
+        this.useLocalHostBrokerName = useLocalHostBrokerName;
+        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
+            brokerName = LOCAL_HOST_NAME;
+        }
+    }
+
+    /**
+     * Looks up and lazily creates if necessary the destination for the given
+     * JMS name
+     */
+    public Destination getDestination(ActiveMQDestination destination) throws Exception {
+        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
+    }
+
+    public void removeDestination(ActiveMQDestination destination) throws Exception {
+        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
+    }
+
+    public int getProducerSystemUsagePortion() {
+        return producerSystemUsagePortion;
+    }
+
+    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
+        this.producerSystemUsagePortion = producerSystemUsagePortion;
+    }
+
+    public int getConsumerSystemUsagePortion() {
+        return consumerSystemUsagePortion;
+    }
+
+    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
+        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
+    }
+
+    public boolean isSplitSystemUsageForProducersConsumers() {
+        return splitSystemUsageForProducersConsumers;
+    }
+
+    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
+        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
+    }
+
+    public boolean isMonitorConnectionSplits() {
+        return monitorConnectionSplits;
+    }
+
+    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
+        this.monitorConnectionSplits = monitorConnectionSplits;
+    }
+
+    public int getTaskRunnerPriority() {
+        return taskRunnerPriority;
+    }
+
+    public void setTaskRunnerPriority(int taskRunnerPriority) {
+        this.taskRunnerPriority = taskRunnerPriority;
+    }
+
+    public boolean isDedicatedTaskRunner() {
+        return dedicatedTaskRunner;
+    }
+
+    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
+        this.dedicatedTaskRunner = dedicatedTaskRunner;
+    }
+
+    public boolean isCacheTempDestinations() {
+        return cacheTempDestinations;
+    }
+
+    public void setCacheTempDestinations(boolean cacheTempDestinations) {
+        this.cacheTempDestinations = cacheTempDestinations;
+    }
+
+    public int getTimeBeforePurgeTempDestinations() {
+        return timeBeforePurgeTempDestinations;
+    }
+
+    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
+        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
+    }
+
+    public boolean isUseTempMirroredQueues() {
+        return useTempMirroredQueues;
+    }
+
+    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
+        this.useTempMirroredQueues = useTempMirroredQueues;
+    }
+
+    public synchronized JobSchedulerStore getJobSchedulerStore() {
+
+        // If support is off don't allow any scheduler even is user configured their own.
+        if (!isSchedulerSupport()) {
+            return null;
+        }
+
+        // If the user configured their own we use it even if persistence is disabled since
+        // we don't know anything about their implementation.
+        if (jobSchedulerStore == null) {
+
+            if (!isPersistent()) {
+                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
+                configureService(jobSchedulerStore);
+                return this.jobSchedulerStore;
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
+                if (pa != null) {
+                    this.jobSchedulerStore = pa.createJobSchedulerStore();
+                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                    configureService(jobSchedulerStore);
+                    return this.jobSchedulerStore;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (UnsupportedOperationException ex) {
+                // It's ok if the store doesn't implement a scheduler.
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
+                if (pa != null && pa instanceof JobSchedulerStore) {
+                    this.jobSchedulerStore = (JobSchedulerStore) pa;
+                    configureService(jobSchedulerStore);
+                    return this.jobSchedulerStore;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            // Load the KahaDB store as a last resort, this only works if KahaDB is
+            // included at runtime, otherwise this will fail.  User should disable
+            // scheduler support if this fails.
+            try {
+                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+                jobSchedulerStore = adaptor.createJobSchedulerStore();
+                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                configureService(jobSchedulerStore);
+                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return jobSchedulerStore;
+    }
+
+    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
+        this.jobSchedulerStore = jobSchedulerStore;
+        configureService(jobSchedulerStore);
+    }
+
+    //
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    /**
+     * Handles any lazy-creation helper properties which are added to make
+     * things easier to configure inside environments such as Spring
+     *
+     * @throws Exception
+     */
+    protected void processHelperProperties() throws Exception {
+        if (transportConnectorURIs != null) {
+            for (int i = 0; i < transportConnectorURIs.length; i++) {
+                String uri = transportConnectorURIs[i];
+                addConnector(uri);
+            }
+        }
+        if (networkConnectorURIs != null) {
+            for (int i = 0; i < networkConnectorURIs.length; i++) {
+                String uri = networkConnectorURIs[i];
+                addNetworkConnector(uri);
+            }
+        }
+        if (jmsBridgeConnectors != null) {
+            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
+                addJmsConnector(jmsBridgeConnectors[i]);
+            }
+        }
+    }
+
+    public static int t = 0;
+    /**
+     * Check that the store usage limit is not greater than max usable
+     * space and adjust if it is
+     */
+    protected void checkStoreUsageLimits() throws IOException {
+        final SystemUsage usage = getSystemUsage();
+
+        if (getPersistenceAdapter() != null) {
+            PersistenceAdapter adapter = getPersistenceAdapter();
+            File dir = adapter.getDirectory();
+
+            if (dir != null) {
+                dir = StoreUtil.findParentDirectory(dir);
+
+                int usagePercent = usage.getStoreUsage().getPercentLimit();
+                long storeLimit = usage.getStoreUsage().getLimit();
+                long storeCurrent = usage.getStoreUsage().getUsage();
+                long dirFreeSpace = dir.getUsableSpace();
+
+                if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
+                        storeLimit < (dirFreeSpace + storeCurrent)) {
+                    LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
+                            + usagePercent + "% of the partition size.");
+
+                    //trigger the recomputation of the absolute value by
+                    //calling setPercentLimit again
+                    usage.getStoreUsage().setPercentLimit(usagePercent);
+                    storeLimit = usage.getStoreUsage().getLimit();
+                    storeCurrent = usage.getStoreUsage().getUsage();
+                }
+
+                if (storeLimit > (dirFreeSpace + storeCurrent)) {
+                    if (usagePercent > 0) {
+                        LOG.warn("Store limit has been set to "
+                                + usagePercent + "% of the partition size but "
+                                + "there is not enough usable space.");
+                    }
+
+                    LOG.warn("Store limit is " +  storeLimit / (1024 * 1024) +
+                             " mb (current store usage is " + storeCurrent / (1024 * 1024) +
+                             " mb). The data directory: " + dir.getAbsolutePath() +
+                             " only has " + dirFreeSpace / (1024 * 1024) +
+                             " mb of usable space - resetting to maximum available disk space: " +
+                            (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
+                    usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent);
+                }
+            }
+
+            long maxJournalFileSize = 0;
+            long storeLimit = usage.getStoreUsage().getLimit();
+
+            if (adapter instanceof JournaledStore) {
+                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
+            }
+
+            if (storeLimit < maxJournalFileSize) {
+                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
+                          " mb, whilst the max journal file size for the store is: " +
+                          maxJournalFileSize / (1024 * 1024) + " mb, " +
+                          "the store will not accept any data when used.");
+
+            }
+        }
+    }
+
+    /**
+     * Check that temporary usage limit is not greater than max usable
+     * space and adjust if it is
+     */
+    protected void checkTmpStoreUsageLimits() throws IOException {
+        final SystemUsage usage = getSystemUsage();
+
+        File tmpDir = getTmpDataDirectory();
+
+        if (tmpDir != null) {
+            tmpDir = StoreUtil.findParentDirectory(tmpDir);
+
+            long storeLimit = usage.getTempUsage().getLimit();
+<<<<<<< 4f8d56aaf60f99abe643e79c6c4940a571289a86
+            long storeCurrent = usage.getTempUsage().getUsage();
+            while (tmpDir != null && !tmpDir.isDirectory()) {
+                tmpDir = tmpDir.getParentFile();
+            }
+            long dirFreeSpace = tmpDir.getUsableSpace();
+            if (storeLimit > (dirFreeSpace + storeCurrent)) {
+                LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
+                        " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) +
+                        " mb). The temporary data directory: " + tmpDir.getAbsolutePath() +
+                        " only has " + dirFreeSpace / (1024 * 1024) +
+                        " mb of usable space - resetting to maximum available disk space: " +
+                       (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
+               usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent);
+=======
+            long dirFreeSpace = tmpDir.getUsableSpace();
+            int usagePercent = usage.getTempUsage().getPercentLimit();
+
+//            if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
+//                    storeLimit < (dirFreeSpace + storeCurrent)) {
+//                LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
+//                        + usagePercent + "% of the partition size.");
+//
+//                //trigger the recomputation of the absolute value by
+//                //calling setPercentLimit again
+//                usage.getStoreUsage().setPercentLimit(usagePercent);
+//                storeLimit = usage.getStoreUsage().getLimit();
+//                storeCurrent = usage.getStoreUsage().getUsage();
+//            }
+
+            if (storeLimit > dirFreeSpace) {
+                if (usagePercent > 0) {
+                    LOG.warn("Temporary Store limit has been set to "
+                            + usagePercent + "% of the partition size but "
+                            + "there is not enough usable space.");
+                }
+
+                LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
+                        " mb, whilst the temporary data directory: " + tmpDir.getAbsolutePath() +
+                        " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " +
+                        dirFreeSpace / (1024 * 1024) + " mb.");
+                usage.getTempUsage().setLimit(dirFreeSpace);
+>>>>>>> https://issues.apache.org/jira/browse/AMQ-5965
+            }
+
+            if (isPersistent()) {
+                long maxJournalFileSize;
+
+                PListStore store = usage.getTempUsage().getStore();
+                if (store != null && store instanceof JournaledStore) {
+                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
+                } else {
+                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
+                }
+
+                if (storeLimit < maxJournalFileSize) {
+                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
+                              " mb, whilst the max journal file size for the temporary store is: " +
+                              maxJournalFileSize / (1024 * 1024) + " mb, " +
+                              "the temp store will not accept any data when used.");
+                }
+            }
+        }
+    }
+
+    /**
+     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
+     * update store and temporary store limits if the amount of available space
+     * plus current store size is less than the existin configured limit
+     */
+    protected void scheduleDiskUsageLimitsCheck() throws IOException {
+        if (schedulePeriodForDiskUsageCheck > 0 &&
+                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
+            Runnable diskLimitCheckTask = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        checkStoreUsageLimits();
+                    } catch (IOException e) {
+                        LOG.error("Failed to check persistent disk usage limits", e);
+                    }
+
+                    try {
+                        checkTmpStoreUsageLimits();
+                    } catch (IOException e) {
+                        LOG.error("Failed to check temporary store usage limits", e);
+                    }
+                }
+            };
+            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
+        }
+    }
+
+    protected void checkSystemUsageLimits() throws IOException {
+        final SystemUsage usage = getSystemUsage();
+        long memLimit = usage.getMemoryUsage().getLimit();
+        long jvmLimit = Runtime.getRuntime().maxMemory();
+
+        if (memLimit > jvmLimit) {
+            usage.getMemoryUsage().setPercentOfJvmHeap(70);
+            LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
+                    " mb) is more than the maximum available for the JVM: " +
+                    jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
+        }
+
+        //Check the persistent store and temp store limits if they exist
+        //and schedule a periodic check to update disk limits if
+        //schedulePeriodForDiskLimitCheck is set
+        checkStoreUsageLimits();
+        checkTmpStoreUsageLimits();
+        scheduleDiskUsageLimitsCheck();
+
+        if (getJobSchedulerStore() != null) {
+            JobSchedulerStore scheduler = getJobSchedulerStore();
+            File schedulerDir = scheduler.getDirectory();
+            if (schedulerDir != null) {
+
+                String schedulerDirPath = schedulerDir.getAbsolutePath();
+                if (!schedulerDir.isAbsolute()) {
+                    schedulerDir = new File(schedulerDirPath);
+                }
+
+                while (schedulerDir != null && !schedulerDir.isDirectory()) {
+                    schedulerDir = schedulerDir.getParentFile();
+                }
+                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
+                long dirFreeSpace = schedulerDir.getUsableSpace();
+                if (schedulerLimit > dirFreeSpace) {
+                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
+                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
+                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
+                            dirFreeSpace / (1024 * 1024) + " mb.");
+                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
+                }
+            }
+        }
+    }
+
+    public void stopAllConnectors(ServiceStopper stopper) {
+        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
+            NetworkConnector connector = iter.next();
+            unregisterNetworkConnectorMBean(connector);
+            stopper.stop(connector);
+        }
+        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
+            ProxyConnector connector = iter.next();
+            stopper.stop(connector);
+        }
+        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
+            JmsConnector connector = iter.next();
+            stopper.stop(connector);
+        }
+        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
+            TransportConnector connector = iter.next();
+            try {
+                unregisterConnectorMBean(connector);
+            } catch (IOException e) {
+            }
+            stopper.stop(connector);
+        }
+    }
+
+    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
+        try {
+            ObjectName objectName = createConnectorObjectName(connector);
+            connector = connector.asManagedConnector(getManagementContext(), objectName);
+            ConnectorViewMBean view = new ConnectorView(connector);
+            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
+            return connector;
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
+        }
+    }
+
+    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
+        if (isUseJmx()) {
+            try {
+                ObjectName objectName = createConnectorObjectName(connector);
+                getManagementContext().unregisterMBean(objectName);
+            } catch (Throwable e) {
+                throw IOExceptionSupport.create(
+                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+        return adaptor;
+    }
+
+    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+        if (isUseJmx()) {}
+    }
+
+    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
+        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
+    }
+
+    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
+        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
+        try {
+            ObjectName objectName = createNetworkConnectorObjectName(connector);
+            connector.setObjectName(objectName);
+            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
+        }
+    }
+
+    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
+        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
+    }
+
+    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
+        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
+    }
+
+    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
+        if (isUseJmx()) {
+            try {
+                ObjectName objectName = createNetworkConnectorObjectName(connector);
+                getManagementContext().unregisterMBean(objectName);
+            } catch (Exception e) {
+                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
+            }
+        }
+    }
+
+    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
+        ProxyConnectorView view = new ProxyConnectorView(connector);
+        try {
+            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
+            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
+        }
+    }
+
+    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
+        JmsConnectorView view = new JmsConnectorView(connector);
+        try {
+            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
+            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Factory method to create a new broker
+     *
+     * @thro

<TRUNCATED>

Mime
View raw message