activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/2] activemq git commit: Removing inadvertantly checked in file
Date Mon, 21 Sep 2015 17:20:11 GMT
Repository: activemq
Updated Branches:
  refs/heads/master cc9b9b084 -> 00d19e7b6


http://git-wip-us.apache.org/repos/asf/activemq/blob/00d19e7b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
deleted file mode 100644
index e27279c..0000000
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
+++ /dev/null
@@ -1,3147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.security.Provider;
-import java.security.Security;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionMetaData;
-import org.apache.activemq.ConfigurationException;
-import org.apache.activemq.Service;
-import org.apache.activemq.advisory.AdvisoryBroker;
-import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.Log4JConfigView;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.SchedulerBroker;
-import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.network.ConnectionFilter;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.network.jms.JmsConnector;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.proxy.ProxyConnector;
-import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.store.JournaledStore;
-import org.apache.activemq.store.PListStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.apache.activemq.thread.Scheduler;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransportFactorySupport;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.StoreUtil;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-/**
- * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
- * number of transport connectors, network connectors and a bunch of properties
- * which can be used to configure the broker as its lazily created.
- *
- * @org.apache.xbean.XBean
- */
-public class BrokerService implements Service {
-    public static final String DEFAULT_PORT = "61616";
-    public static final String LOCAL_HOST_NAME;
-    public static final String BROKER_VERSION;
-    public static final String DEFAULT_BROKER_NAME = "localhost";
-    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
-    public static final long DEFAULT_START_TIMEOUT = 600000L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
-
-    @SuppressWarnings("unused")
-    private static final long serialVersionUID = 7353129142305630237L;
-
-    private boolean useJmx = true;
-    private boolean enableStatistics = true;
-    private boolean persistent = true;
-    private boolean populateJMSXUserID;
-    private boolean useAuthenticatedPrincipalForJMSXUserID;
-    private boolean populateUserNameInMBeans;
-    private long mbeanInvocationTimeout = 0;
-
-    private boolean useShutdownHook = true;
-    private boolean useLoggingForShutdownErrors;
-    private boolean shutdownOnMasterFailure;
-    private boolean shutdownOnSlaveFailure;
-    private boolean waitForSlave;
-    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
-    private boolean passiveSlave;
-    private String brokerName = DEFAULT_BROKER_NAME;
-    private File dataDirectoryFile;
-    private File tmpDataDirectory;
-    private Broker broker;
-    private BrokerView adminView;
-    private ManagementContext managementContext;
-    private ObjectName brokerObjectName;
-    private TaskRunnerFactory taskRunnerFactory;
-    private TaskRunnerFactory persistenceTaskRunnerFactory;
-    private SystemUsage systemUsage;
-    private SystemUsage producerSystemUsage;
-    private SystemUsage consumerSystemUsaage;
-    private PersistenceAdapter persistenceAdapter;
-    private PersistenceAdapterFactory persistenceFactory;
-    protected DestinationFactory destinationFactory;
-    private MessageAuthorizationPolicy messageAuthorizationPolicy;
-    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
-    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
-    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
-    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
-    private final List<Service> services = new ArrayList<Service>();
-    private transient Thread shutdownHook;
-    private String[] transportConnectorURIs;
-    private String[] networkConnectorURIs;
-    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
-    // to other jms messaging systems
-    private boolean deleteAllMessagesOnStartup;
-    private boolean advisorySupport = true;
-    private URI vmConnectorURI;
-    private String defaultSocketURIString;
-    private PolicyMap destinationPolicy;
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private final AtomicBoolean stopped = new AtomicBoolean(false);
-    private final AtomicBoolean stopping = new AtomicBoolean(false);
-    private BrokerPlugin[] plugins;
-    private boolean keepDurableSubsActive = true;
-    private boolean useVirtualTopics = true;
-    private boolean useMirroredQueues = false;
-    private boolean useTempMirroredQueues = true;
-    private BrokerId brokerId;
-    private volatile DestinationInterceptor[] destinationInterceptors;
-    private ActiveMQDestination[] destinations;
-    private PListStore tempDataStore;
-    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
-    private boolean useLocalHostBrokerName;
-    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
-    private final CountDownLatch startedLatch = new CountDownLatch(1);
-    private Broker regionBroker;
-    private int producerSystemUsagePortion = 60;
-    private int consumerSystemUsagePortion = 40;
-    private boolean splitSystemUsageForProducersConsumers;
-    private boolean monitorConnectionSplits = false;
-    private int taskRunnerPriority = Thread.NORM_PRIORITY;
-    private boolean dedicatedTaskRunner;
-    private boolean cacheTempDestinations = false;// useful for failover
-    private int timeBeforePurgeTempDestinations = 5000;
-    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
-    private boolean systemExitOnShutdown;
-    private int systemExitOnShutdownExitCode;
-    private SslContext sslContext;
-    private boolean forceStart = false;
-    private IOExceptionHandler ioExceptionHandler;
-    private boolean schedulerSupport = false;
-    private File schedulerDirectoryFile;
-    private Scheduler scheduler;
-    private ThreadPoolExecutor executor;
-    private int schedulePeriodForDestinationPurge= 0;
-    private int maxPurgedDestinationsPerSweep = 0;
-    private int schedulePeriodForDiskUsageCheck = 0;
-    private boolean diskUsageCheckRegrowPercentChange = true;
-    private BrokerContext brokerContext;
-    private boolean networkConnectorStartAsync = false;
-    private boolean allowTempAutoCreationOnSend;
-    private JobSchedulerStore jobSchedulerStore;
-    private final AtomicLong totalConnections = new AtomicLong();
-    private final AtomicInteger currentConnections = new AtomicInteger();
-
-    private long offlineDurableSubscriberTimeout = -1;
-    private long offlineDurableSubscriberTaskSchedule = 300000;
-    private DestinationFilter virtualConsumerDestinationFilter;
-
-    private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false);
-    private Throwable startException = null;
-    private boolean startAsync = false;
-    private Date startDate;
-    private boolean slave = true;
-
-    private boolean restartAllowed = true;
-    private boolean restartRequested = false;
-    private boolean rejectDurableConsumers = false;
-
-    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
-
-    static {
-
-        try {
-            ClassLoader loader = BrokerService.class.getClassLoader();
-            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
-            Provider bouncycastle = (Provider) clazz.newInstance();
-            Security.insertProviderAt(bouncycastle, 2);
-            LOG.info("Loaded the Bouncy Castle security provider.");
-        } catch(Throwable e) {
-            // No BouncyCastle found so we use the default Java Security Provider
-        }
-
-        String localHostName = "localhost";
-        try {
-            localHostName =  InetAddressUtil.getLocalHostName();
-        } catch (UnknownHostException e) {
-            LOG.error("Failed to resolve localhost");
-        }
-        LOCAL_HOST_NAME = localHostName;
-
-        String version = null;
-        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
-            if (in != null) {
-                try(InputStreamReader isr = new InputStreamReader(in);
-                    BufferedReader reader = new BufferedReader(isr)) {
-                    version = reader.readLine();
-                }
-            }
-        } catch (IOException ie) {
-            LOG.warn("Error reading broker version ", ie);
-        }
-        BROKER_VERSION = version;
-    }
-
-    @Override
-    public String toString() {
-        return "BrokerService[" + getBrokerName() + "]";
-    }
-
-    private String getBrokerVersion() {
-        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
-        if (version == null) {
-            version = BROKER_VERSION;
-        }
-
-        return version;
-    }
-
-    /**
-     * Adds a new transport connector for the given bind address
-     *
-     * @return the newly created and added transport connector
-     * @throws Exception
-     */
-    public TransportConnector addConnector(String bindAddress) throws Exception {
-        return addConnector(new URI(bindAddress));
-    }
-
-    /**
-     * Adds a new transport connector for the given bind address
-     *
-     * @return the newly created and added transport connector
-     * @throws Exception
-     */
-    public TransportConnector addConnector(URI bindAddress) throws Exception {
-        return addConnector(createTransportConnector(bindAddress));
-    }
-
-    /**
-     * Adds a new transport connector for the given TransportServer transport
-     *
-     * @return the newly created and added transport connector
-     * @throws Exception
-     */
-    public TransportConnector addConnector(TransportServer transport) throws Exception {
-        return addConnector(new TransportConnector(transport));
-    }
-
-    /**
-     * Adds a new transport connector
-     *
-     * @return the transport connector
-     * @throws Exception
-     */
-    public TransportConnector addConnector(TransportConnector connector) throws Exception {
-        transportConnectors.add(connector);
-        return connector;
-    }
-
-    /**
-     * Stops and removes a transport connector from the broker.
-     *
-     * @param connector
-     * @return true if the connector has been previously added to the broker
-     * @throws Exception
-     */
-    public boolean removeConnector(TransportConnector connector) throws Exception {
-        boolean rc = transportConnectors.remove(connector);
-        if (rc) {
-            unregisterConnectorMBean(connector);
-        }
-        return rc;
-    }
-
-    /**
-     * Adds a new network connector using the given discovery address
-     *
-     * @return the newly created and added network connector
-     * @throws Exception
-     */
-    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
-        return addNetworkConnector(new URI(discoveryAddress));
-    }
-
-    /**
-     * Adds a new proxy connector using the given bind address
-     *
-     * @return the newly created and added network connector
-     * @throws Exception
-     */
-    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
-        return addProxyConnector(new URI(bindAddress));
-    }
-
-    /**
-     * Adds a new network connector using the given discovery address
-     *
-     * @return the newly created and added network connector
-     * @throws Exception
-     */
-    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
-        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
-        return addNetworkConnector(connector);
-    }
-
-    /**
-     * Adds a new proxy connector using the given bind address
-     *
-     * @return the newly created and added network connector
-     * @throws Exception
-     */
-    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
-        ProxyConnector connector = new ProxyConnector();
-        connector.setBind(bindAddress);
-        connector.setRemote(new URI("fanout:multicast://default"));
-        return addProxyConnector(connector);
-    }
-
-    /**
-     * Adds a new network connector to connect this broker to a federated
-     * network
-     */
-    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
-        connector.setBrokerService(this);
-        connector.setLocalUri(getVmConnectorURI());
-        // Set a connection filter so that the connector does not establish loop
-        // back connections.
-        connector.setConnectionFilter(new ConnectionFilter() {
-            @Override
-            public boolean connectTo(URI location) {
-                List<TransportConnector> transportConnectors = getTransportConnectors();
-                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
-                    try {
-                        TransportConnector tc = iter.next();
-                        if (location.equals(tc.getConnectUri())) {
-                            return false;
-                        }
-                    } catch (Throwable e) {
-                    }
-                }
-                return true;
-            }
-        });
-        networkConnectors.add(connector);
-        return connector;
-    }
-
-    /**
-     * Removes the given network connector without stopping it. The caller
-     * should call {@link NetworkConnector#stop()} to close the connector
-     */
-    public boolean removeNetworkConnector(NetworkConnector connector) {
-        boolean answer = networkConnectors.remove(connector);
-        if (answer) {
-            unregisterNetworkConnectorMBean(connector);
-        }
-        return answer;
-    }
-
-    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
-        URI uri = getVmConnectorURI();
-        connector.setLocalUri(uri);
-        proxyConnectors.add(connector);
-        if (isUseJmx()) {
-            registerProxyConnectorMBean(connector);
-        }
-        return connector;
-    }
-
-    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
-        connector.setBrokerService(this);
-        jmsConnectors.add(connector);
-        if (isUseJmx()) {
-            registerJmsConnectorMBean(connector);
-        }
-        return connector;
-    }
-
-    public JmsConnector removeJmsConnector(JmsConnector connector) {
-        if (jmsConnectors.remove(connector)) {
-            return connector;
-        }
-        return null;
-    }
-
-    public void masterFailed() {
-        if (shutdownOnMasterFailure) {
-            LOG.error("The Master has failed ... shutting down");
-            try {
-                stop();
-            } catch (Exception e) {
-                LOG.error("Failed to stop for master failure", e);
-            }
-        } else {
-            LOG.warn("Master Failed - starting all connectors");
-            try {
-                startAllConnectors();
-                broker.nowMasterBroker();
-            } catch (Exception e) {
-                LOG.error("Failed to startAllConnectors", e);
-            }
-        }
-    }
-
-    public String getUptime() {
-        long delta = getUptimeMillis();
-
-        if (delta == 0) {
-            return "not started";
-        }
-
-        return TimeUtils.printDuration(delta);
-    }
-
-    public long getUptimeMillis() {
-        if (startDate == null) {
-            return 0;
-        }
-
-        return new Date().getTime() - startDate.getTime();
-    }
-
-    public boolean isStarted() {
-        return started.get() && startedLatch.getCount() == 0;
-    }
-
-    /**
-     * Forces a start of the broker.
-     * By default a BrokerService instance that was
-     * previously stopped using BrokerService.stop() cannot be restarted
-     * using BrokerService.start().
-     * This method enforces a restart.
-     * It is not recommended to force a restart of the broker and will not work
-     * for most but some very trivial broker configurations.
-     * For restarting a broker instance we recommend to first call stop() on
-     * the old instance and then recreate a new BrokerService instance.
-     *
-     * @param force - if true enforces a restart.
-     * @throws Exception
-     */
-    public void start(boolean force) throws Exception {
-        forceStart = force;
-        stopped.set(false);
-        started.set(false);
-        start();
-    }
-
-    // Service interface
-    // -------------------------------------------------------------------------
-
-    protected boolean shouldAutostart() {
-        return true;
-    }
-
-    /**
-     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
-     *
-     * delegates to autoStart, done to prevent backwards incompatible signature change
-     */
-    @PostConstruct
-    private void postConstruct() {
-        try {
-            autoStart();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-    }
-
-    /**
-     *
-     * @throws Exception
-     * @org. apache.xbean.InitMethod
-     */
-    public void autoStart() throws Exception {
-        if(shouldAutostart()) {
-            start();
-        }
-    }
-
-    @Override
-    public void start() throws Exception {
-        if (stopped.get() || !started.compareAndSet(false, true)) {
-            // lets just ignore redundant start() calls
-            // as its way too easy to not be completely sure if start() has been
-            // called or not with the gazillion of different configuration
-            // mechanisms
-            // throw new IllegalStateException("Already started.");
-            return;
-        }
-
-        stopping.set(false);
-        startDate = new Date();
-        MDC.put("activemq.broker", brokerName);
-
-        try {
-            if (systemExitOnShutdown && useShutdownHook) {
-                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
-            }
-            processHelperProperties();
-            if (isUseJmx()) {
-                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
-                // we cannot cleanup clear that during shutdown of the broker.
-                MDC.remove("activemq.broker");
-                try {
-                    startManagementContext();
-                    for (NetworkConnector connector : getNetworkConnectors()) {
-                        registerNetworkConnectorMBean(connector);
-                    }
-                } finally {
-                    MDC.put("activemq.broker", brokerName);
-                }
-            }
-
-            // in jvm master slave, lets not publish over existing broker till we get the lock
-            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
-            if (brokerRegistry.lookup(getBrokerName()) == null) {
-                brokerRegistry.bind(getBrokerName(), BrokerService.this);
-            }
-            startPersistenceAdapter(startAsync);
-            startBroker(startAsync);
-            brokerRegistry.bind(getBrokerName(), BrokerService.this);
-        } catch (Exception e) {
-            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
-            try {
-                if (!stopped.get()) {
-                    stop();
-                }
-            } catch (Exception ex) {
-                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
-            }
-            throw e;
-        } finally {
-            MDC.remove("activemq.broker");
-        }
-    }
-
-    private void startPersistenceAdapter(boolean async) throws Exception {
-        if (async) {
-            new Thread("Persistence Adapter Starting Thread") {
-                @Override
-                public void run() {
-                    try {
-                        doStartPersistenceAdapter();
-                    } catch (Throwable e) {
-                        startException = e;
-                    } finally {
-                        synchronized (persistenceAdapterStarted) {
-                            persistenceAdapterStarted.set(true);
-                            persistenceAdapterStarted.notifyAll();
-                        }
-                    }
-                }
-            }.start();
-        } else {
-            doStartPersistenceAdapter();
-        }
-    }
-
-    private void doStartPersistenceAdapter() throws Exception {
-        getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
-        getPersistenceAdapter().setBrokerName(getBrokerName());
-        LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter());
-        if (deleteAllMessagesOnStartup) {
-            deleteAllMessages();
-        }
-        getPersistenceAdapter().start();
-
-        getJobSchedulerStore();
-        if (jobSchedulerStore != null) {
-            try {
-                jobSchedulerStore.start();
-            } catch (Exception e) {
-                RuntimeException exception = new RuntimeException(
-                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
-                LOG.error(exception.getLocalizedMessage(), e);
-                throw exception;
-            }
-        }
-    }
-
-    private void startBroker(boolean async) throws Exception {
-        if (async) {
-            new Thread("Broker Starting Thread") {
-                @Override
-                public void run() {
-                    try {
-                        synchronized (persistenceAdapterStarted) {
-                            if (!persistenceAdapterStarted.get()) {
-                                persistenceAdapterStarted.wait();
-                            }
-                        }
-                        doStartBroker();
-                    } catch (Throwable t) {
-                        startException = t;
-                    }
-                }
-            }.start();
-        } else {
-            doStartBroker();
-        }
-    }
-
-    private void doStartBroker() throws Exception {
-        if (startException != null) {
-            return;
-        }
-        startDestinations();
-        addShutdownHook();
-
-        broker = getBroker();
-        brokerId = broker.getBrokerId();
-
-        // need to log this after creating the broker so we have its id and name
-        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
-        broker.start();
-
-        if (isUseJmx()) {
-            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
-                // try to restart management context
-                // typical for slaves that use the same ports as master
-                managementContext.stop();
-                startManagementContext();
-            }
-            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
-            managedBroker.setContextBroker(broker);
-            adminView.setBroker(managedBroker);
-        }
-
-        if (ioExceptionHandler == null) {
-            setIoExceptionHandler(new DefaultIOExceptionHandler());
-        }
-
-        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
-            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
-            Log4JConfigView log4jConfigView = new Log4JConfigView();
-            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
-        }
-
-        startAllConnectors();
-
-        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
-        LOG.info("For help or more information please see: http://activemq.apache.org");
-
-        getBroker().brokerServiceStarted();
-        checkSystemUsageLimits();
-        startedLatch.countDown();
-        getBroker().nowMasterBroker();
-    }
-
-    /**
-     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
-     *
-     * delegates to stop, done to prevent backwards incompatible signature change
-     */
-    @PreDestroy
-    private void preDestroy () {
-        try {
-            stop();
-        } catch (Exception ex) {
-            throw new RuntimeException();
-        }
-    }
-
-    /**
-     *
-     * @throws Exception
-     * @org.apache .xbean.DestroyMethod
-     */
-    @Override
-    public void stop() throws Exception {
-        if (!stopping.compareAndSet(false, true)) {
-            LOG.trace("Broker already stopping/stopped");
-            return;
-        }
-
-        MDC.put("activemq.broker", brokerName);
-
-        if (systemExitOnShutdown) {
-            new Thread() {
-                @Override
-                public void run() {
-                    System.exit(systemExitOnShutdownExitCode);
-                }
-            }.start();
-        }
-
-        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
-
-        removeShutdownHook();
-        if (this.scheduler != null) {
-            this.scheduler.stop();
-            this.scheduler = null;
-        }
-        ServiceStopper stopper = new ServiceStopper();
-        if (services != null) {
-            for (Service service : services) {
-                stopper.stop(service);
-            }
-        }
-        stopAllConnectors(stopper);
-        this.slave = true;
-        // remove any VMTransports connected
-        // this has to be done after services are stopped,
-        // to avoid timing issue with discovery (spinning up a new instance)
-        BrokerRegistry.getInstance().unbind(getBrokerName());
-        VMTransportFactory.stopped(getBrokerName());
-        if (broker != null) {
-            stopper.stop(broker);
-            broker = null;
-        }
-
-        if (jobSchedulerStore != null) {
-            jobSchedulerStore.stop();
-            jobSchedulerStore = null;
-        }
-        if (tempDataStore != null) {
-            tempDataStore.stop();
-            tempDataStore = null;
-        }
-        try {
-            stopper.stop(persistenceAdapter);
-            persistenceAdapter = null;
-            if (isUseJmx()) {
-                stopper.stop(getManagementContext());
-                managementContext = null;
-            }
-            // Clear SelectorParser cache to free memory
-            SelectorParser.clearCache();
-        } finally {
-            started.set(false);
-            stopped.set(true);
-            stoppedLatch.countDown();
-        }
-
-        if (this.taskRunnerFactory != null) {
-            this.taskRunnerFactory.shutdown();
-            this.taskRunnerFactory = null;
-        }
-        if (this.executor != null) {
-            ThreadPoolUtils.shutdownNow(executor);
-            this.executor = null;
-        }
-
-        this.destinationInterceptors = null;
-        this.destinationFactory = null;
-
-        if (startDate != null) {
-            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
-        }
-        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
-
-        synchronized (shutdownHooks) {
-            for (Runnable hook : shutdownHooks) {
-                try {
-                    hook.run();
-                } catch (Throwable e) {
-                    stopper.onException(hook, e);
-                }
-            }
-        }
-
-        MDC.remove("activemq.broker");
-
-        // and clear start date
-        startDate = null;
-
-        stopper.throwFirstException();
-    }
-
-    public boolean checkQueueSize(String queueName) {
-        long count = 0;
-        long queueSize = 0;
-        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
-        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
-            if (entry.getKey().isQueue()) {
-                if (entry.getValue().getName().matches(queueName)) {
-                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
-                    count += queueSize;
-                    if (queueSize > 0) {
-                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
-                    }
-                }
-            }
-        }
-        return count == 0;
-    }
-
-    /**
-     * This method (both connectorName and queueName are using regex to match)
-     * 1. stop the connector (supposed the user input the connector which the
-     * clients connect to) 2. to check whether there is any pending message on
-     * the queues defined by queueName 3. supposedly, after stop the connector,
-     * client should failover to other broker and pending messages should be
-     * forwarded. if no pending messages, the method finally call stop to stop
-     * the broker.
-     *
-     * @param connectorName
-     * @param queueName
-     * @param timeout
-     * @param pollInterval
-     * @throws Exception
-     */
-    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
-        if (isUseJmx()) {
-            if (connectorName == null || queueName == null || timeout <= 0) {
-                throw new Exception(
-                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
-            }
-            if (pollInterval <= 0) {
-                pollInterval = 30;
-            }
-            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
-                    connectorName, queueName, timeout, pollInterval
-            });
-            TransportConnector connector;
-            for (int i = 0; i < transportConnectors.size(); i++) {
-                connector = transportConnectors.get(i);
-                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
-                    connector.stop();
-                }
-            }
-            long start = System.currentTimeMillis();
-            while (System.currentTimeMillis() - start < timeout * 1000) {
-                // check quesize until it gets zero
-                if (checkQueueSize(queueName)) {
-                    stop();
-                    break;
-                } else {
-                    Thread.sleep(pollInterval * 1000);
-                }
-            }
-            if (stopped.get()) {
-                LOG.info("Successfully stop the broker.");
-            } else {
-                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
-            }
-        }
-    }
-
-    /**
-     * A helper method to block the caller thread until the broker has been
-     * stopped
-     */
-    public void waitUntilStopped() {
-        while (isStarted() && !stopped.get()) {
-            try {
-                stoppedLatch.await();
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-    }
-
-    public boolean isStopped() {
-        return stopped.get();
-    }
-
-    /**
-     * A helper method to block the caller thread until the broker has fully started
-     * @return boolean true if wait succeeded false if broker was not started or was stopped
-     */
-    public boolean waitUntilStarted() {
-        return waitUntilStarted(DEFAULT_START_TIMEOUT);
-    }
-
-    /**
-     * A helper method to block the caller thread until the broker has fully started
-     *
-     * @param timeout
-     *        the amount of time to wait before giving up and returning false.
-     *
-     * @return boolean true if wait succeeded false if broker was not started or was stopped
-     */
-    public boolean waitUntilStarted(long timeout) {
-        boolean waitSucceeded = isStarted();
-        long expiration = Math.max(0, timeout + System.currentTimeMillis());
-        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
-            try {
-                if (startException != null) {
-                    return waitSucceeded;
-                }
-                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        return waitSucceeded;
-    }
-
-    // Properties
-    // -------------------------------------------------------------------------
-    /**
-     * Returns the message broker
-     */
-    public Broker getBroker() throws Exception {
-        if (broker == null) {
-            broker = createBroker();
-        }
-        return broker;
-    }
-
-    /**
-     * Returns the administration view of the broker; used to create and destroy
-     * resources such as queues and topics. Note this method returns null if JMX
-     * is disabled.
-     */
-    public BrokerView getAdminView() throws Exception {
-        if (adminView == null) {
-            // force lazy creation
-            getBroker();
-        }
-        return adminView;
-    }
-
-    public void setAdminView(BrokerView adminView) {
-        this.adminView = adminView;
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    /**
-     * Sets the name of this broker; which must be unique in the network
-     *
-     * @param brokerName
-     */
-    public void setBrokerName(String brokerName) {
-        if (brokerName == null) {
-            throw new NullPointerException("The broker name cannot be null");
-        }
-        String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
-        if (!str.equals(brokerName)) {
-            LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str);
-        }
-        this.brokerName = str.trim();
-    }
-
-    public PersistenceAdapterFactory getPersistenceFactory() {
-        return persistenceFactory;
-    }
-
-    public File getDataDirectoryFile() {
-        if (dataDirectoryFile == null) {
-            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
-        }
-        return dataDirectoryFile;
-    }
-
-    public File getBrokerDataDirectory() {
-        String brokerDir = getBrokerName();
-        return new File(getDataDirectoryFile(), brokerDir);
-    }
-
-    /**
-     * Sets the directory in which the data files will be stored by default for
-     * the JDBC and Journal persistence adaptors.
-     *
-     * @param dataDirectory
-     *            the directory to store data files
-     */
-    public void setDataDirectory(String dataDirectory) {
-        setDataDirectoryFile(new File(dataDirectory));
-    }
-
-    /**
-     * Sets the directory in which the data files will be stored by default for
-     * the JDBC and Journal persistence adaptors.
-     *
-     * @param dataDirectoryFile
-     *            the directory to store data files
-     */
-    public void setDataDirectoryFile(File dataDirectoryFile) {
-        this.dataDirectoryFile = dataDirectoryFile;
-    }
-
-    /**
-     * @return the tmpDataDirectory
-     */
-    public File getTmpDataDirectory() {
-        if (tmpDataDirectory == null) {
-            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
-        }
-        return tmpDataDirectory;
-    }
-
-    /**
-     * @param tmpDataDirectory
-     *            the tmpDataDirectory to set
-     */
-    public void setTmpDataDirectory(File tmpDataDirectory) {
-        this.tmpDataDirectory = tmpDataDirectory;
-    }
-
-    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
-        this.persistenceFactory = persistenceFactory;
-    }
-
-    public void setDestinationFactory(DestinationFactory destinationFactory) {
-        this.destinationFactory = destinationFactory;
-    }
-
-    public boolean isPersistent() {
-        return persistent;
-    }
-
-    /**
-     * Sets whether or not persistence is enabled or disabled.
-     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
-     */
-    public void setPersistent(boolean persistent) {
-        this.persistent = persistent;
-    }
-
-    public boolean isPopulateJMSXUserID() {
-        return populateJMSXUserID;
-    }
-
-    /**
-     * Sets whether or not the broker should populate the JMSXUserID header.
-     */
-    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
-        this.populateJMSXUserID = populateJMSXUserID;
-    }
-
-    public SystemUsage getSystemUsage() {
-        try {
-            if (systemUsage == null) {
-
-                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
-                systemUsage.setExecutor(getExecutor());
-                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
-                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
-                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
-                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
-                addService(this.systemUsage);
-            }
-            return systemUsage;
-        } catch (IOException e) {
-            LOG.error("Cannot create SystemUsage", e);
-            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
-        }
-    }
-
-    public void setSystemUsage(SystemUsage memoryManager) {
-        if (this.systemUsage != null) {
-            removeService(this.systemUsage);
-        }
-        this.systemUsage = memoryManager;
-        if (this.systemUsage.getExecutor()==null) {
-            this.systemUsage.setExecutor(getExecutor());
-        }
-        addService(this.systemUsage);
-    }
-
-    /**
-     * @return the consumerUsageManager
-     * @throws IOException
-     */
-    public SystemUsage getConsumerSystemUsage() throws IOException {
-        if (this.consumerSystemUsaage == null) {
-            if (splitSystemUsageForProducersConsumers) {
-                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
-                float portion = consumerSystemUsagePortion / 100f;
-                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
-                addService(this.consumerSystemUsaage);
-            } else {
-                consumerSystemUsaage = getSystemUsage();
-            }
-        }
-        return this.consumerSystemUsaage;
-    }
-
-    /**
-     * @param consumerSystemUsaage
-     *            the storeSystemUsage to set
-     */
-    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
-        if (this.consumerSystemUsaage != null) {
-            removeService(this.consumerSystemUsaage);
-        }
-        this.consumerSystemUsaage = consumerSystemUsaage;
-        addService(this.consumerSystemUsaage);
-    }
-
-    /**
-     * @return the producerUsageManager
-     * @throws IOException
-     */
-    public SystemUsage getProducerSystemUsage() throws IOException {
-        if (producerSystemUsage == null) {
-            if (splitSystemUsageForProducersConsumers) {
-                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
-                float portion = producerSystemUsagePortion / 100f;
-                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
-                addService(producerSystemUsage);
-            } else {
-                producerSystemUsage = getSystemUsage();
-            }
-        }
-        return producerSystemUsage;
-    }
-
-    /**
-     * @param producerUsageManager
-     *            the producerUsageManager to set
-     */
-    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
-        if (this.producerSystemUsage != null) {
-            removeService(this.producerSystemUsage);
-        }
-        this.producerSystemUsage = producerUsageManager;
-        addService(this.producerSystemUsage);
-    }
-
-    public PersistenceAdapter getPersistenceAdapter() throws IOException {
-        if (persistenceAdapter == null) {
-            persistenceAdapter = createPersistenceAdapter();
-            configureService(persistenceAdapter);
-            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
-        }
-        return persistenceAdapter;
-    }
-
-    /**
-     * Sets the persistence adaptor implementation to use for this broker
-     *
-     * @throws IOException
-     */
-    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
-        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
-            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
-            return;
-        }
-        this.persistenceAdapter = persistenceAdapter;
-        configureService(this.persistenceAdapter);
-        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
-    }
-
-    public TaskRunnerFactory getTaskRunnerFactory() {
-        if (this.taskRunnerFactory == null) {
-            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
-                    isDedicatedTaskRunner());
-            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
-        }
-        return this.taskRunnerFactory;
-    }
-
-    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
-        this.taskRunnerFactory = taskRunnerFactory;
-    }
-
-    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
-        if (taskRunnerFactory == null) {
-            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
-                    true, 1000, isDedicatedTaskRunner());
-        }
-        return persistenceTaskRunnerFactory;
-    }
-
-    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
-        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
-    }
-
-    public boolean isUseJmx() {
-        return useJmx;
-    }
-
-    public boolean isEnableStatistics() {
-        return enableStatistics;
-    }
-
-    /**
-     * Sets whether or not the Broker's services enable statistics or not.
-     */
-    public void setEnableStatistics(boolean enableStatistics) {
-        this.enableStatistics = enableStatistics;
-    }
-
-    /**
-     * Sets whether or not the Broker's services should be exposed into JMX or
-     * not.
-     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
-     */
-    public void setUseJmx(boolean useJmx) {
-        this.useJmx = useJmx;
-    }
-
-    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
-        if (brokerObjectName == null) {
-            brokerObjectName = createBrokerObjectName();
-        }
-        return brokerObjectName;
-    }
-
-    /**
-     * Sets the JMX ObjectName for this broker
-     */
-    public void setBrokerObjectName(ObjectName brokerObjectName) {
-        this.brokerObjectName = brokerObjectName;
-    }
-
-    public ManagementContext getManagementContext() {
-        if (managementContext == null) {
-            managementContext = new ManagementContext();
-        }
-        return managementContext;
-    }
-
-    public void setManagementContext(ManagementContext managementContext) {
-        this.managementContext = managementContext;
-    }
-
-    public NetworkConnector getNetworkConnectorByName(String connectorName) {
-        for (NetworkConnector connector : networkConnectors) {
-            if (connector.getName().equals(connectorName)) {
-                return connector;
-            }
-        }
-        return null;
-    }
-
-    public String[] getNetworkConnectorURIs() {
-        return networkConnectorURIs;
-    }
-
-    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
-        this.networkConnectorURIs = networkConnectorURIs;
-    }
-
-    public TransportConnector getConnectorByName(String connectorName) {
-        for (TransportConnector connector : transportConnectors) {
-            if (connector.getName().equals(connectorName)) {
-                return connector;
-            }
-        }
-        return null;
-    }
-
-    public Map<String, String> getTransportConnectorURIsAsMap() {
-        Map<String, String> answer = new HashMap<String, String>();
-        for (TransportConnector connector : transportConnectors) {
-            try {
-                URI uri = connector.getConnectUri();
-                if (uri != null) {
-                    String scheme = uri.getScheme();
-                    if (scheme != null) {
-                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
-                    }
-                }
-            } catch (Exception e) {
-                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
-            }
-        }
-        return answer;
-    }
-
-    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
-        ProducerBrokerExchange result = null;
-
-        for (TransportConnector connector : transportConnectors) {
-            for (TransportConnection tc: connector.getConnections()){
-                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
-                if (result !=null){
-                    return result;
-                }
-            }
-        }
-        return result;
-    }
-
-    public String[] getTransportConnectorURIs() {
-        return transportConnectorURIs;
-    }
-
-    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
-        this.transportConnectorURIs = transportConnectorURIs;
-    }
-
-    /**
-     * @return Returns the jmsBridgeConnectors.
-     */
-    public JmsConnector[] getJmsBridgeConnectors() {
-        return jmsBridgeConnectors;
-    }
-
-    /**
-     * @param jmsConnectors
-     *            The jmsBridgeConnectors to set.
-     */
-    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
-        this.jmsBridgeConnectors = jmsConnectors;
-    }
-
-    public Service[] getServices() {
-        return services.toArray(new Service[0]);
-    }
-
-    /**
-     * Sets the services associated with this broker.
-     */
-    public void setServices(Service[] services) {
-        this.services.clear();
-        if (services != null) {
-            for (int i = 0; i < services.length; i++) {
-                this.services.add(services[i]);
-            }
-        }
-    }
-
-    /**
-     * Adds a new service so that it will be started as part of the broker
-     * lifecycle
-     */
-    public void addService(Service service) {
-        services.add(service);
-    }
-
-    public void removeService(Service service) {
-        services.remove(service);
-    }
-
-    public boolean isUseLoggingForShutdownErrors() {
-        return useLoggingForShutdownErrors;
-    }
-
-    /**
-     * Sets whether or not we should use commons-logging when reporting errors
-     * when shutting down the broker
-     */
-    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
-        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
-    }
-
-    public boolean isUseShutdownHook() {
-        return useShutdownHook;
-    }
-
-    /**
-     * Sets whether or not we should use a shutdown handler to close down the
-     * broker cleanly if the JVM is terminated. It is recommended you leave this
-     * enabled.
-     */
-    public void setUseShutdownHook(boolean useShutdownHook) {
-        this.useShutdownHook = useShutdownHook;
-    }
-
-    public boolean isAdvisorySupport() {
-        return advisorySupport;
-    }
-
-    /**
-     * Allows the support of advisory messages to be disabled for performance
-     * reasons.
-     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
-     */
-    public void setAdvisorySupport(boolean advisorySupport) {
-        this.advisorySupport = advisorySupport;
-    }
-
-    public List<TransportConnector> getTransportConnectors() {
-        return new ArrayList<TransportConnector>(transportConnectors);
-    }
-
-    /**
-     * Sets the transport connectors which this broker will listen on for new
-     * clients
-     *
-     * @org.apache.xbean.Property
-     *                            nestedType="org.apache.activemq.broker.TransportConnector"
-     */
-    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
-        for (TransportConnector connector : transportConnectors) {
-            addConnector(connector);
-        }
-    }
-
-    public TransportConnector getTransportConnectorByName(String name){
-        for (TransportConnector transportConnector : transportConnectors){
-           if (name.equals(transportConnector.getName())){
-               return transportConnector;
-           }
-        }
-        return null;
-    }
-
-    public TransportConnector getTransportConnectorByScheme(String scheme){
-        for (TransportConnector transportConnector : transportConnectors){
-            if (scheme.equals(transportConnector.getUri().getScheme())){
-                return transportConnector;
-            }
-        }
-        return null;
-    }
-
-    public List<NetworkConnector> getNetworkConnectors() {
-        return new ArrayList<NetworkConnector>(networkConnectors);
-    }
-
-    public List<ProxyConnector> getProxyConnectors() {
-        return new ArrayList<ProxyConnector>(proxyConnectors);
-    }
-
-    /**
-     * Sets the network connectors which this broker will use to connect to
-     * other brokers in a federated network
-     *
-     * @org.apache.xbean.Property
-     *                            nestedType="org.apache.activemq.network.NetworkConnector"
-     */
-    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
-        for (Object connector : networkConnectors) {
-            addNetworkConnector((NetworkConnector) connector);
-        }
-    }
-
-    /**
-     * Sets the network connectors which this broker will use to connect to
-     * other brokers in a federated network
-     */
-    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
-        for (Object connector : proxyConnectors) {
-            addProxyConnector((ProxyConnector) connector);
-        }
-    }
-
-    public PolicyMap getDestinationPolicy() {
-        return destinationPolicy;
-    }
-
-    /**
-     * Sets the destination specific policies available either for exact
-     * destinations or for wildcard areas of destinations.
-     */
-    public void setDestinationPolicy(PolicyMap policyMap) {
-        this.destinationPolicy = policyMap;
-    }
-
-    public BrokerPlugin[] getPlugins() {
-        return plugins;
-    }
-
-    /**
-     * Sets a number of broker plugins to install such as for security
-     * authentication or authorization
-     */
-    public void setPlugins(BrokerPlugin[] plugins) {
-        this.plugins = plugins;
-    }
-
-    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
-        return messageAuthorizationPolicy;
-    }
-
-    /**
-     * Sets the policy used to decide if the current connection is authorized to
-     * consume a given message
-     */
-    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
-        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
-    }
-
-    /**
-     * Delete all messages from the persistent store
-     *
-     * @throws IOException
-     */
-    public void deleteAllMessages() throws IOException {
-        getPersistenceAdapter().deleteAllMessages();
-    }
-
-    public boolean isDeleteAllMessagesOnStartup() {
-        return deleteAllMessagesOnStartup;
-    }
-
-    /**
-     * Sets whether or not all messages are deleted on startup - mostly only
-     * useful for testing.
-     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
-     */
-    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
-        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
-    }
-
-    public URI getVmConnectorURI() {
-        if (vmConnectorURI == null) {
-            try {
-                vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
-            } catch (URISyntaxException e) {
-                LOG.error("Badly formed URI from {}", getBrokerName(), e);
-            }
-        }
-        return vmConnectorURI;
-    }
-
-    public void setVmConnectorURI(URI vmConnectorURI) {
-        this.vmConnectorURI = vmConnectorURI;
-    }
-
-    public String getDefaultSocketURIString() {
-        if (started.get()) {
-            if (this.defaultSocketURIString == null) {
-                for (TransportConnector tc:this.transportConnectors) {
-                    String result = null;
-                    try {
-                        result = tc.getPublishableConnectString();
-                    } catch (Exception e) {
-                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
-                    }
-                    if (result != null) {
-                        // find first publishable uri
-                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
-                            this.defaultSocketURIString = result;
-                            break;
-                        } else {
-                        // or use the first defined
-                            if (this.defaultSocketURIString == null) {
-                                this.defaultSocketURIString = result;
-                            }
-                        }
-                    }
-                }
-
-            }
-            return this.defaultSocketURIString;
-        }
-       return null;
-    }
-
-    /**
-     * @return Returns the shutdownOnMasterFailure.
-     */
-    public boolean isShutdownOnMasterFailure() {
-        return shutdownOnMasterFailure;
-    }
-
-    /**
-     * @param shutdownOnMasterFailure
-     *            The shutdownOnMasterFailure to set.
-     */
-    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
-        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
-    }
-
-    public boolean isKeepDurableSubsActive() {
-        return keepDurableSubsActive;
-    }
-
-    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
-        this.keepDurableSubsActive = keepDurableSubsActive;
-    }
-
-    public boolean isUseVirtualTopics() {
-        return useVirtualTopics;
-    }
-
-    /**
-     * Sets whether or not <a
-     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
-     * Topics</a> should be supported by default if they have not been
-     * explicitly configured.
-     */
-    public void setUseVirtualTopics(boolean useVirtualTopics) {
-        this.useVirtualTopics = useVirtualTopics;
-    }
-
-    public DestinationInterceptor[] getDestinationInterceptors() {
-        return destinationInterceptors;
-    }
-
-    public boolean isUseMirroredQueues() {
-        return useMirroredQueues;
-    }
-
-    /**
-     * Sets whether or not <a
-     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
-     * Queues</a> should be supported by default if they have not been
-     * explicitly configured.
-     */
-    public void setUseMirroredQueues(boolean useMirroredQueues) {
-        this.useMirroredQueues = useMirroredQueues;
-    }
-
-    /**
-     * Sets the destination interceptors to use
-     */
-    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
-        this.destinationInterceptors = destinationInterceptors;
-    }
-
-    public ActiveMQDestination[] getDestinations() {
-        return destinations;
-    }
-
-    /**
-     * Sets the destinations which should be loaded/created on startup
-     */
-    public void setDestinations(ActiveMQDestination[] destinations) {
-        this.destinations = destinations;
-    }
-
-    /**
-     * @return the tempDataStore
-     */
-    public synchronized PListStore getTempDataStore() {
-        if (tempDataStore == null) {
-            if (!isPersistent()) {
-                return null;
-            }
-
-            try {
-                PersistenceAdapter pa = getPersistenceAdapter();
-                if( pa!=null && pa instanceof PListStore) {
-                    return (PListStore) pa;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            boolean result = true;
-            boolean empty = true;
-            try {
-                File directory = getTmpDataDirectory();
-                if (directory.exists() && directory.isDirectory()) {
-                    File[] files = directory.listFiles();
-                    if (files != null && files.length > 0) {
-                        empty = false;
-                        for (int i = 0; i < files.length; i++) {
-                            File file = files[i];
-                            if (!file.isDirectory()) {
-                                result &= file.delete();
-                            }
-                        }
-                    }
-                }
-                if (!empty) {
-                    String str = result ? "Successfully deleted" : "Failed to delete";
-                    LOG.info("{} temporary storage", str);
-                }
-
-                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
-                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
-                this.tempDataStore.setDirectory(getTmpDataDirectory());
-                configureService(tempDataStore);
-                this.tempDataStore.start();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return tempDataStore;
-    }
-
-    /**
-     * @param tempDataStore
-     *            the tempDataStore to set
-     */
-    public void setTempDataStore(PListStore tempDataStore) {
-        this.tempDataStore = tempDataStore;
-        configureService(tempDataStore);
-        try {
-            tempDataStore.start();
-        } catch (Exception e) {
-            RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
-            LOG.error(exception.getLocalizedMessage(), e);
-            throw exception;
-        }
-    }
-
-    public int getPersistenceThreadPriority() {
-        return persistenceThreadPriority;
-    }
-
-    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
-        this.persistenceThreadPriority = persistenceThreadPriority;
-    }
-
-    /**
-     * @return the useLocalHostBrokerName
-     */
-    public boolean isUseLocalHostBrokerName() {
-        return this.useLocalHostBrokerName;
-    }
-
-    /**
-     * @param useLocalHostBrokerName
-     *            the useLocalHostBrokerName to set
-     */
-    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
-        this.useLocalHostBrokerName = useLocalHostBrokerName;
-        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
-            brokerName = LOCAL_HOST_NAME;
-        }
-    }
-
-    /**
-     * Looks up and lazily creates if necessary the destination for the given
-     * JMS name
-     */
-    public Destination getDestination(ActiveMQDestination destination) throws Exception {
-        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
-    }
-
-    public void removeDestination(ActiveMQDestination destination) throws Exception {
-        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
-    }
-
-    public int getProducerSystemUsagePortion() {
-        return producerSystemUsagePortion;
-    }
-
-    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
-        this.producerSystemUsagePortion = producerSystemUsagePortion;
-    }
-
-    public int getConsumerSystemUsagePortion() {
-        return consumerSystemUsagePortion;
-    }
-
-    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
-        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
-    }
-
-    public boolean isSplitSystemUsageForProducersConsumers() {
-        return splitSystemUsageForProducersConsumers;
-    }
-
-    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
-        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
-    }
-
-    public boolean isMonitorConnectionSplits() {
-        return monitorConnectionSplits;
-    }
-
-    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
-        this.monitorConnectionSplits = monitorConnectionSplits;
-    }
-
-    public int getTaskRunnerPriority() {
-        return taskRunnerPriority;
-    }
-
-    public void setTaskRunnerPriority(int taskRunnerPriority) {
-        this.taskRunnerPriority = taskRunnerPriority;
-    }
-
-    public boolean isDedicatedTaskRunner() {
-        return dedicatedTaskRunner;
-    }
-
-    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
-        this.dedicatedTaskRunner = dedicatedTaskRunner;
-    }
-
-    public boolean isCacheTempDestinations() {
-        return cacheTempDestinations;
-    }
-
-    public void setCacheTempDestinations(boolean cacheTempDestinations) {
-        this.cacheTempDestinations = cacheTempDestinations;
-    }
-
-    public int getTimeBeforePurgeTempDestinations() {
-        return timeBeforePurgeTempDestinations;
-    }
-
-    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
-        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
-    }
-
-    public boolean isUseTempMirroredQueues() {
-        return useTempMirroredQueues;
-    }
-
-    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
-        this.useTempMirroredQueues = useTempMirroredQueues;
-    }
-
-    public synchronized JobSchedulerStore getJobSchedulerStore() {
-
-        // If support is off don't allow any scheduler even is user configured their own.
-        if (!isSchedulerSupport()) {
-            return null;
-        }
-
-        // If the user configured their own we use it even if persistence is disabled since
-        // we don't know anything about their implementation.
-        if (jobSchedulerStore == null) {
-
-            if (!isPersistent()) {
-                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
-                configureService(jobSchedulerStore);
-                return this.jobSchedulerStore;
-            }
-
-            try {
-                PersistenceAdapter pa = getPersistenceAdapter();
-                if (pa != null) {
-                    this.jobSchedulerStore = pa.createJobSchedulerStore();
-                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
-                    configureService(jobSchedulerStore);
-                    return this.jobSchedulerStore;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            } catch (UnsupportedOperationException ex) {
-                // It's ok if the store doesn't implement a scheduler.
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-
-            try {
-                PersistenceAdapter pa = getPersistenceAdapter();
-                if (pa != null && pa instanceof JobSchedulerStore) {
-                    this.jobSchedulerStore = (JobSchedulerStore) pa;
-                    configureService(jobSchedulerStore);
-                    return this.jobSchedulerStore;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            // Load the KahaDB store as a last resort, this only works if KahaDB is
-            // included at runtime, otherwise this will fail.  User should disable
-            // scheduler support if this fails.
-            try {
-                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
-                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
-                jobSchedulerStore = adaptor.createJobSchedulerStore();
-                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
-                configureService(jobSchedulerStore);
-                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return jobSchedulerStore;
-    }
-
-    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
-        this.jobSchedulerStore = jobSchedulerStore;
-        configureService(jobSchedulerStore);
-    }
-
-    //
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    /**
-     * Handles any lazy-creation helper properties which are added to make
-     * things easier to configure inside environments such as Spring
-     *
-     * @throws Exception
-     */
-    protected void processHelperProperties() throws Exception {
-        if (transportConnectorURIs != null) {
-            for (int i = 0; i < transportConnectorURIs.length; i++) {
-                String uri = transportConnectorURIs[i];
-                addConnector(uri);
-            }
-        }
-        if (networkConnectorURIs != null) {
-            for (int i = 0; i < networkConnectorURIs.length; i++) {
-                String uri = networkConnectorURIs[i];
-                addNetworkConnector(uri);
-            }
-        }
-        if (jmsBridgeConnectors != null) {
-            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
-                addJmsConnector(jmsBridgeConnectors[i]);
-            }
-        }
-    }
-
-    public static int t = 0;
-    /**
-     * Check that the store usage limit is not greater than max usable
-     * space and adjust if it is
-     */
-    protected void checkStoreUsageLimits() throws IOException {
-        final SystemUsage usage = getSystemUsage();
-
-        if (getPersistenceAdapter() != null) {
-            PersistenceAdapter adapter = getPersistenceAdapter();
-            File dir = adapter.getDirectory();
-
-            if (dir != null) {
-                dir = StoreUtil.findParentDirectory(dir);
-
-                int usagePercent = usage.getStoreUsage().getPercentLimit();
-                long storeLimit = usage.getStoreUsage().getLimit();
-                long storeCurrent = usage.getStoreUsage().getUsage();
-                long dirFreeSpace = dir.getUsableSpace();
-
-                if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
-                        storeLimit < (dirFreeSpace + storeCurrent)) {
-                    LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
-                            + usagePercent + "% of the partition size.");
-
-                    //trigger the recomputation of the absolute value by
-                    //calling setPercentLimit again
-                    usage.getStoreUsage().setPercentLimit(usagePercent);
-                    storeLimit = usage.getStoreUsage().getLimit();
-                    storeCurrent = usage.getStoreUsage().getUsage();
-                }
-
-                if (storeLimit > (dirFreeSpace + storeCurrent)) {
-                    if (usagePercent > 0) {
-                        LOG.warn("Store limit has been set to "
-                                + usagePercent + "% of the partition size but "
-                                + "there is not enough usable space.");
-                    }
-
-                    LOG.warn("Store limit is " +  storeLimit / (1024 * 1024) +
-                             " mb (current store usage is " + storeCurrent / (1024 * 1024) +
-                             " mb). The data directory: " + dir.getAbsolutePath() +
-                             " only has " + dirFreeSpace / (1024 * 1024) +
-                             " mb of usable space - resetting to maximum available disk space: " +
-                            (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
-                    usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent);
-                }
-            }
-
-            long maxJournalFileSize = 0;
-            long storeLimit = usage.getStoreUsage().getLimit();
-
-            if (adapter instanceof JournaledStore) {
-                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
-            }
-
-            if (storeLimit < maxJournalFileSize) {
-                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
-                          " mb, whilst the max journal file size for the store is: " +
-                          maxJournalFileSize / (1024 * 1024) + " mb, " +
-                          "the store will not accept any data when used.");
-
-            }
-        }
-    }
-
-    /**
-     * Check that temporary usage limit is not greater than max usable
-     * space and adjust if it is
-     */
-    protected void checkTmpStoreUsageLimits() throws IOException {
-        final SystemUsage usage = getSystemUsage();
-
-        File tmpDir = getTmpDataDirectory();
-
-        if (tmpDir != null) {
-            tmpDir = StoreUtil.findParentDirectory(tmpDir);
-
-            long storeLimit = usage.getTempUsage().getLimit();
-<<<<<<< 4f8d56aaf60f99abe643e79c6c4940a571289a86
-            long storeCurrent = usage.getTempUsage().getUsage();
-            while (tmpDir != null && !tmpDir.isDirectory()) {
-                tmpDir = tmpDir.getParentFile();
-            }
-            long dirFreeSpace = tmpDir.getUsableSpace();
-            if (storeLimit > (dirFreeSpace + storeCurrent)) {
-                LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
-                        " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) +
-                        " mb). The temporary data directory: " + tmpDir.getAbsolutePath() +
-                        " only has " + dirFreeSpace / (1024 * 1024) +
-                        " mb of usable space - resetting to maximum available disk space: " +
-                       (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
-               usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent);
-=======
-            long dirFreeSpace = tmpDir.getUsableSpace();
-            int usagePercent = usage.getTempUsage().getPercentLimit();
-
-//            if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
-//                    storeLimit < (dirFreeSpace + storeCurrent)) {
-//                LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
-//                        + usagePercent + "% of the partition size.");
-//
-//                //trigger the recomputation of the absolute value by
-//                //calling setPercentLimit again
-//                usage.getStoreUsage().setPercentLimit(usagePercent);
-//                storeLimit = usage.getStoreUsage().getLimit();
-//                storeCurrent = usage.getStoreUsage().getUsage();
-//            }
-
-            if (storeLimit > dirFreeSpace) {
-                if (usagePercent > 0) {
-                    LOG.warn("Temporary Store limit has been set to "
-                            + usagePercent + "% of the partition size but "
-                            + "there is not enough usable space.");
-                }
-
-                LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
-                        " mb, whilst the temporary data directory: " + tmpDir.getAbsolutePath() +
-                        " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " +
-                        dirFreeSpace / (1024 * 1024) + " mb.");
-                usage.getTempUsage().setLimit(dirFreeSpace);
->>>>>>> https://issues.apache.org/jira/browse/AMQ-5965
-            }
-
-            if (isPersistent()) {
-                long maxJournalFileSize;
-
-                PListStore store = usage.getTempUsage().getStore();
-                if (store != null && store instanceof JournaledStore) {
-                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
-                } else {
-                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
-                }
-
-                if (storeLimit < maxJournalFileSize) {
-                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
-                              " mb, whilst the max journal file size for the temporary store is: " +
-                              maxJournalFileSize / (1024 * 1024) + " mb, " +
-                              "the temp store will not accept any data when used.");
-                }
-            }
-        }
-    }
-
-    /**
-     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
-     * update store and temporary store limits if the amount of available space
-     * plus current store size is less than the existin configured limit
-     */
-    protected void scheduleDiskUsageLimitsCheck() throws IOException {
-        if (schedulePeriodForDiskUsageCheck > 0 &&
-                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
-            Runnable diskLimitCheckTask = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        checkStoreUsageLimits();
-                    } catch (IOException e) {
-                        LOG.error("Failed to check persistent disk usage limits", e);
-                    }
-
-                    try {
-                        checkTmpStoreUsageLimits();
-                    } catch (IOException e) {
-                        LOG.error("Failed to check temporary store usage limits", e);
-                    }
-                }
-            };
-            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
-        }
-    }
-
-    protected void checkSystemUsageLimits() throws IOException {
-        final SystemUsage usage = getSystemUsage();
-        long memLimit = usage.getMemoryUsage().getLimit();
-        long jvmLimit = Runtime.getRuntime().maxMemory();
-
-        if (memLimit > jvmLimit) {
-            usage.getMemoryUsage().setPercentOfJvmHeap(70);
-            LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
-                    " mb) is more than the maximum available for the JVM: " +
-                    jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
-        }
-
-        //Check the persistent store and temp store limits if they exist
-        //and schedule a periodic check to update disk limits if
-        //schedulePeriodForDiskLimitCheck is set
-        checkStoreUsageLimits();
-        checkTmpStoreUsageLimits();
-        scheduleDiskUsageLimitsCheck();
-
-        if (getJobSchedulerStore() != null) {
-            JobSchedulerStore scheduler = getJobSchedulerStore();
-            File schedulerDir = scheduler.getDirectory();
-            if (schedulerDir != null) {
-
-                String schedulerDirPath = schedulerDir.getAbsolutePath();
-                if (!schedulerDir.isAbsolute()) {
-                    schedulerDir = new File(schedulerDirPath);
-                }
-
-                while (schedulerDir != null && !schedulerDir.isDirectory()) {
-                    schedulerDir = schedulerDir.getParentFile();
-                }
-                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
-                long dirFreeSpace = schedulerDir.getUsableSpace();
-                if (schedulerLimit > dirFreeSpace) {
-                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
-                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
-                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
-                            dirFreeSpace / (1024 * 1024) + " mb.");
-                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
-                }
-            }
-        }
-    }
-
-    public void stopAllConnectors(ServiceStopper stopper) {
-        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
-            NetworkConnector connector = iter.next();
-            unregisterNetworkConnectorMBean(connector);
-            stopper.stop(connector);
-        }
-        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
-            ProxyConnector connector = iter.next();
-            stopper.stop(connector);
-        }
-        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
-            JmsConnector connector = iter.next();
-            stopper.stop(connector);
-        }
-        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
-            TransportConnector connector = iter.next();
-            try {
-                unregisterConnectorMBean(connector);
-            } catch (IOException e) {
-            }
-            stopper.stop(connector);
-        }
-    }
-
-    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
-        try {
-            ObjectName objectName = createConnectorObjectName(connector);
-            connector = connector.asManagedConnector(getManagementContext(), objectName);
-            ConnectorViewMBean view = new ConnectorView(connector);
-            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
-            return connector;
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
-        }
-    }
-
-    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
-        if (isUseJmx()) {
-            try {
-                ObjectName objectName = createConnectorObjectName(connector);
-                getManagementContext().unregisterMBean(objectName);
-            } catch (Throwable e) {
-                throw IOExceptionSupport.create(
-                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
-            }
-        }
-    }
-
-    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
-        return adaptor;
-    }
-
-    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
-        if (isUseJmx()) {}
-    }
-
-    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
-        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
-    }
-
-    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
-        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
-        try {
-            ObjectName objectName = createNetworkConnectorObjectName(connector);
-            connector.setObjectName(objectName);
-            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
-        }
-    }
-
-    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
-        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
-    }
-
-    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
-        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
-    }
-
-    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
-        if (isUseJmx()) {
-            try {
-                ObjectName objectName = createNetworkConnectorObjectName(connector);
-                getManagementContext().unregisterMBean(objectName);
-            } catch (Exception e) {
-                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
-            }
-        }
-    }
-
-    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
-        ProxyConnectorView view = new ProxyConnectorView(connector);
-        try {
-            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
-            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
-        }
-    }
-
-    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
-        JmsConnectorView view = new JmsConnectorView(connector);
-        try {
-            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
-            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
-        }
-    }
-
-    /**
-     * Factory method to create a new broker
-     *
-     * @

<TRUNCATED>

Mime
View raw message