activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [34/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:05 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
new file mode 100644
index 0000000..e71cfe6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.class);
+
+    private long restartDelay = 2000;
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        // master and slave survive db restart and retain master/slave status
+        LeaseLockerIOExceptionHandler stopConnectors = new LeaseLockerIOExceptionHandler();
+        brokerService.setIoExceptionHandler(stopConnectors);
+    }
+
+    @Override
+    protected void delayTillRestartRequired() {
+        if (restartDelay > 2000) {
+            LOG.info("delay for more than lease quantum. While Db is offline, master should stay alive but could loose lease");
+        } else {
+            LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
+        }
+        try {
+            TimeUnit.MILLISECONDS.sleep(restartDelay);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0  || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        } else {
+            // lease expired while DB was offline, either or master/slave can grab it so assert is not deterministic
+            // but we still need to validate sent == received
+        }
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        restartDelay = 2000;
+        super.setUp();
+    }
+
+    public void testSendReceiveWithLeaseExpiry() throws Exception {
+        restartDelay = 10000;
+        testSendReceive();
+    }
+
+    // ignore this test case
+    public void testAdvisory() throws Exception {}
+
+   @Override
+   public void testSendReceive() throws Exception {
+       // Ignore this test for now, see AMQ-4975
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
new file mode 100644
index 0000000..cf4929a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMasterSlaveTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseTest.class);
+
+    @Override
+    protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+        super.configureJdbcPersistenceAdapter(persistenceAdapter);
+        persistenceAdapter.setLocker(new LeaseDatabaseLocker());
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+        persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
+    }
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        //let the brokers die on exception and master should have lease on restart
+        // which will delay slave start till it expires
+        LeaseLockerIOExceptionHandler ioExceptionHandler = new LeaseLockerIOExceptionHandler();
+        ioExceptionHandler.setIgnoreSQLExceptions(false);
+        ioExceptionHandler.setStopStartConnectors(false);
+        ioExceptionHandler.setResumeCheckSleepPeriod(500l);
+        brokerService.setIoExceptionHandler(ioExceptionHandler);
+    }
+
+    private long getLockKeepAlivePeriod() {
+        return 1000;
+    }
+
+    private long getLockAcquireSleepInterval() {
+        return 8000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
new file mode 100644
index 0000000..fb04803
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
+    
+    protected void messageSent() throws Exception {
+        verifyExpectedBroker(inflightMessageCount);
+        if (++inflightMessageCount == failureCount) {
+            LOG.info("STOPPING DB!@!!!!");
+            final EmbeddedDataSource ds = ((SyncCreateDataSource)getExistingDataSource()).getDelegate();
+            ds.setShutdownDatabase("shutdown");
+            LOG.info("DB STOPPED!@!!!!");
+            
+            Thread dbRestartThread = new Thread("db-re-start-thread") {
+                public void run() {
+                    delayTillRestartRequired();
+                    ds.setShutdownDatabase("false");
+                    LOG.info("DB RESTARTED!@!!!!");
+                }
+            };
+            dbRestartThread.start();
+        }
+        verifyExpectedBroker(inflightMessageCount);
+    }
+
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        } else if (inflightMessageCount == failureCount + 10) {
+            assertEquals("connected to slave, count:" + inflightMessageCount, slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        }
+    }
+
+    protected void delayTillRestartRequired() {
+        LOG.info("Waiting for master broker to Stop");
+        master.waitUntilStopped();
+    }
+
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        producer.send(producerDestination, message);
+    }
+
+    @Override
+    protected Session createReceiveSession(Connection receiveConnection) throws Exception {
+        return receiveConnection.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    @Override
+    protected void consumeMessage(Message message, List<Message> messageList) {
+        try {
+            receiveSession.commit();
+            super.consumeMessage(message, messageList);
+        } catch (JMSException e) {
+            LOG.info("Failed to commit message receipt: " + message, e);
+            try {
+                receiveSession.rollback();
+            } catch (JMSException ignored) {
+            }
+
+            if (e instanceof TransactionRolledBackException) {
+                TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException) e;
+                if (transactionRolledBackException.getMessage().indexOf("in doubt") != -1) {
+                    // failover chucked bc there is a missing reply to a commit.
+                    // failover is involved b/c the store exception is handled broker side and the client just
+                    // sees a disconnect (socket.close()).
+                    // If the client needs to be aware of the failure then it should not use IOExceptionHandler
+                    // so that the exception will propagate back
+
+                    // for this test case:
+                    // the commit may have got there and the reply is lost "or" the commit may be lost.
+                    // so we may or may not get a resend.
+                    //
+                    // At the application level we need to determine if the message is there or not which is not trivial
+                    // for this test we assert received == sent
+                    // so we need to know whether the message will be replayed.
+                    // we can ask the store b/c we know it is jdbc - guess we could go through a destination
+                    // message store interface also or use jmx
+                    java.sql.Connection dbConnection = null;
+                    try {
+                        ActiveMQMessage mqMessage = (ActiveMQMessage) message;
+                        MessageId id = mqMessage.getMessageId();
+                        dbConnection = sharedDs.getConnection();
+                        PreparedStatement s = dbConnection.prepareStatement(((JDBCPersistenceAdapter) connectedToBroker().getPersistenceAdapter()).getStatements().getFindMessageStatement());
+                        s.setString(1, id.getProducerId().toString());
+                        s.setLong(2, id.getProducerSequenceId());
+                        ResultSet rs = s.executeQuery();
+
+                        if (!rs.next()) {
+                            // message is gone, so lets count it as consumed
+                            LOG.info("On TransactionRolledBackException we know that the ack/commit got there b/c message is gone so we count it: " + mqMessage);
+                            super.consumeMessage(message, messageList);
+                        } else {
+                            LOG.info("On TransactionRolledBackException we know that the ack/commit was lost so we expect a replay of: " + mqMessage);
+                        }
+                    } catch (Exception dbe) {
+                        dbe.printStackTrace();
+                    } finally {
+                        try {
+                            dbConnection.close();
+                        } catch (SQLException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private BrokerService connectedToBroker() {
+        return ((ActiveMQConnection)receiveConnection).getBrokerInfo().getBrokerName().equals("master") ? master : slave.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
new file mode 100644
index 0000000..9b21c44
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueTest extends JmsTopicSendReceiveWithTwoConnectionsTest implements ExceptionListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
+
+    public boolean transactedSends = false;
+    public int failureCount = 25;  // or 20 for even tx batch boundary
+
+    int inflightMessageCount = 0;
+    EmbeddedDataSource sharedDs;
+    BrokerService broker;
+    final CountDownLatch restartDBLatch = new CountDownLatch(1);
+
+    protected void setUp() throws Exception {
+        setAutoFail(true);
+        topic = false;
+        verbose = true;
+        // startup db
+        sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+
+        broker = new BrokerService();
+
+        DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler();
+        handler.setIgnoreSQLExceptions(false);
+        handler.setStopStartConnectors(true);
+        broker.setIoExceptionHandler(handler);
+        broker.addConnector("tcp://localhost:0");
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        persistenceAdapter.setDataSource(sharedDs);
+        persistenceAdapter.setUseLock(false);
+        persistenceAdapter.setLockKeepAlivePeriod(500);
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws  Exception {
+       super.tearDown();
+       broker.stop();
+    }
+
+
+    protected Session createSendSession(Connection sendConnection) throws Exception {
+        if (transactedSends) {
+            return sendConnection.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory f =
+                new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getPublishableConnectString());
+        f.setExceptionListener(this);
+        return f;
+    }
+
+    @Override
+    protected void messageSent() throws Exception {    
+        if (++inflightMessageCount == failureCount) {
+            LOG.info("STOPPING DB!@!!!!");
+            final EmbeddedDataSource ds = sharedDs;
+            ds.setShutdownDatabase("shutdown");
+            try {
+                ds.getConnection();
+            } catch (Exception ignored) {
+            }
+            LOG.info("DB STOPPED!@!!!!");
+            
+            Thread dbRestartThread = new Thread("db-re-start-thread") {
+                public void run() {
+                    LOG.info("Sleeping for 10 seconds before allowing db restart");
+                    try {
+                        restartDBLatch.await(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    ds.setShutdownDatabase("false");
+                    LOG.info("DB RESTARTED!@!!!!");
+                }
+            };
+            dbRestartThread.start();
+        }
+    }
+     
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        {   
+            // do some retries as db failures filter back to the client until broker sees
+            // db lock failure and shuts down
+            boolean sent = false;
+            do {
+                try { 
+                    producer.send(producerDestination, message);
+
+                    if (transactedSends && ((inflightMessageCount+1) %10 == 0 || (inflightMessageCount+1) >= messageCount)) {
+                        LOG.info("committing on send: " + inflightMessageCount + " message: " + message);
+                        session.commit();
+                    }
+
+                    sent = true;
+                } catch (JMSException e) {
+                    LOG.info("Exception on producer send:", e);
+                    try { 
+                        Thread.sleep(2000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+            } while(!sent);
+
+        }
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        LOG.error("exception on connection: ", exception);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
new file mode 100644
index 0000000..c7b0ec6
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.logging.Logger;
+
+import javax.sql.DataSource;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
+    protected DataSource sharedDs;
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void setUp() throws Exception {
+        // startup db
+        sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+        super.setUp();
+    }
+
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("master");
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        persistenceAdapter.setDataSource(getExistingDataSource());
+        configureJdbcPersistenceAdapter(persistenceAdapter);
+        master.setPersistenceAdapter(persistenceAdapter);
+        configureBroker(master);
+        master.start();
+    }
+
+    protected void configureBroker(BrokerService brokerService) {
+        DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
+        // we want any store io exception to stop the broker
+        stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
+        brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
+                    // no need for broker.setMasterConnectorURI(masterConnectorURI)
+                    // as the db lock provides the slave/master initialisation
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+                    JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+                    persistenceAdapter.setDataSource(getExistingDataSource());
+                    persistenceAdapter.setCreateTablesOnStartup(false);
+                    broker.setPersistenceAdapter(persistenceAdapter);
+                    configureJdbcPersistenceAdapter(persistenceAdapter);
+                    configureBroker(broker);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+
+    protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+        persistenceAdapter.setLockKeepAlivePeriod(500);
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
+    }
+
+    protected DataSource getExistingDataSource() throws Exception {
+        return sharedDs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
new file mode 100644
index 0000000..7dc88f7
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.junit.Ignore;
+
+
+public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTestSupport {
+    private final String brokerUrl = "tcp://localhost:62001";
+    private final String singleUriString = "failover://(" + brokerUrl +")?randomize=false";
+
+    @Override
+    protected void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(singleUriString);
+    }
+
+    @Override
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("shared-master");
+        configureSharedPersistenceAdapter(master);
+        master.addConnector(brokerUrl);
+        master.start();
+    }
+
+    private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception {
+       LevelDBStore adapter = new LevelDBStore();
+       adapter.setDirectory(new File("shared"));
+       broker.setPersistenceAdapter(adapter);
+    }
+
+    @Override
+    protected void createSlave() throws Exception {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("shared-slave");
+                    configureSharedPersistenceAdapter(broker);
+                    // add transport as a service so that it is bound on start, after store started
+                    final TransportConnector tConnector = new TransportConnector();
+                    tConnector.setUri(new URI(brokerUrl));
+                    broker.addConnector(tConnector);
+
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+        }).start();
+    }
+
+
+    // The @Ignore is just here for documentation, since this is a JUnit3 test
+    // I added the sleep because without it the two other test cases fail.  I haven't looked into it, but
+    // my guess whatever setUp does isn't really finished when the teardown runs.
+    @Ignore("See https://issues.apache.org/jira/browse/AMQ-5164")
+    @Override
+    public void testAdvisory() throws Exception {
+        Thread.sleep(5 * 1000);
+        //super.testAdvisory();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
new file mode 100644
index 0000000..2808ebe
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestSupport.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * Test failover for Queues
+ */
+abstract public class QueueMasterSlaveTestSupport extends JmsTopicSendReceiveWithTwoConnectionsTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(QueueMasterSlaveTestSupport.class);
+
+    protected BrokerService master;
+    protected AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>();
+    protected CountDownLatch slaveStarted = new CountDownLatch(1);
+    protected int inflightMessageCount;
+    protected int failureCount = 50;
+    protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&useExponentialBackOff=false";
+
+    @Override
+    protected void setUp() throws Exception {
+        setMaxTestTime(TimeUnit.MINUTES.toMillis(10));
+        setAutoFail(true);
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+        super.messageCount = 500;
+        failureCount = super.messageCount / 2;
+        super.topic = isTopic();
+        createMaster();
+        createSlave();
+        // wait for thing to connect
+        Thread.sleep(1000);
+        super.setUp();
+    }
+
+    protected String getSlaveXml() {
+        return "org/apache/activemq/broker/ft/slave.xml";
+    }
+
+    protected String getMasterXml() {
+        return "org/apache/activemq/broker/ft/master.xml";
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        master.stop();
+        master.waitUntilStopped();
+        slaveStarted.await(60, TimeUnit.SECONDS);
+        BrokerService brokerService = slave.get();
+        if( brokerService!=null ) {
+            brokerService.stop();
+        }
+        master.stop();
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(uriString);
+    }
+
+    @Override
+    protected void messageSent() throws Exception {
+        if (++inflightMessageCount == failureCount) {
+            Thread.sleep(1000);
+            LOG.error("MASTER STOPPED!@!!!!");
+            master.stop();
+        }
+    }
+
+    protected boolean isTopic() {
+        return false;
+    }
+
+    protected void createMaster() throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
+        brokerFactory.afterPropertiesSet();
+        master = brokerFactory.getBroker();
+        master.start();
+    }
+
+    protected void createSlave() throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
+        brokerFactory.afterPropertiesSet();
+        BrokerService broker = brokerFactory.getBroker();
+        broker.start();
+        slave.set(broker);
+        slaveStarted.countDown();
+    }
+
+    public void testVirtualTopicFailover() throws Exception {
+
+        MessageConsumer qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
+        assertNull("No message there yet", qConsumer.receive(1000));
+        qConsumer.close();
+        assertTrue(!master.isSlave());
+        master.stop();
+        assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
+        assertTrue(!slave.get().isSlave());
+
+        final String text = "ForUWhenSlaveKicksIn";
+        producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
+
+        qConsumer = session.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
+
+        javax.jms.Message message = qConsumer.receive(4000);
+        assertNotNull("Get message after failover", message);
+        assertEquals("correct message", text, ((TextMessage)message).getText());
+    }
+
+    public void testAdvisory() throws Exception {
+        MessageConsumer advConsumer = session.createConsumer(AdvisorySupport.getMasterBrokerAdvisoryTopic());
+
+        master.stop();
+        assertTrue("slave started", slaveStarted.await(60, TimeUnit.SECONDS));
+        LOG.info("slave started");
+        Message advisoryMessage = advConsumer.receive(5000);
+        LOG.info("received " + advisoryMessage);
+        assertNotNull("Didn't received advisory", advisoryMessage);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
new file mode 100644
index 0000000..c9f178d
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+public class QueueMasterSlaveTestUsingSharedFileTest extends
+        QueueMasterSlaveTestSupport {
+    
+    protected String getSlaveXml() {
+        return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
+    }
+    
+    protected String getMasterXml() {
+        return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
+    }
+    
+    protected void createSlave() throws Exception {    	
+    	// Start the Brokers async since starting them up could be a blocking operation..
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    QueueMasterSlaveTestUsingSharedFileTest.super.createSlave();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+        }).start();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
new file mode 100644
index 0000000..5331a22
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/SyncCreateDataSource.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.logging.Logger;
+import javax.sql.DataSource;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+// prevent concurrent calls from attempting to create the db at the same time
+// can result in "already exists in this jvm" errors
+
+public class SyncCreateDataSource implements DataSource {
+    final EmbeddedDataSource delegate;
+
+    SyncCreateDataSource(EmbeddedDataSource dataSource) {
+        this.delegate = dataSource;
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        synchronized (this) {
+            return delegate.getConnection();
+        }
+    }
+
+    @Override
+    public Connection getConnection(String username, String password) throws SQLException {
+        synchronized (this) {
+            return delegate.getConnection();
+        }
+    }
+
+    @Override
+    public PrintWriter getLogWriter() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void setLogWriter(PrintWriter out) throws SQLException {
+    }
+
+    @Override
+    public int getLoginTimeout() throws SQLException {
+        return 0;
+    }
+
+    @Override
+    public void setLoginTimeout(int seconds) throws SQLException {
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return false;
+    }
+
+    EmbeddedDataSource getDelegate() {
+        return delegate;
+    }
+
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
new file mode 100644
index 0000000..ee7ca0f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/kahaDbJdbcLeaseQueueMasterSlaveTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.net.URI;
+import javax.sql.DataSource;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.Statements;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class kahaDbJdbcLeaseQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
+    protected DataSource sharedDs;
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void setUp() throws Exception {
+        // startup db
+        sharedDs = new SyncCreateDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
+        super.setUp();
+    }
+
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("master");
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) master.getPersistenceAdapter();
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setCreateTablesOnStartup(true);
+        leaseDatabaseLocker.setDataSource(getExistingDataSource());
+        leaseDatabaseLocker.setStatements(new Statements());
+        kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
+        configureLocker(kahaDBPersistenceAdapter);
+        configureBroker(master);
+        master.start();
+    }
+
+    protected void configureBroker(BrokerService brokerService) {
+        DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
+        // we want any store io exception to stop the broker
+        stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
+        brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+                    KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+                    LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+                    leaseDatabaseLocker.setDataSource(getExistingDataSource());
+                    leaseDatabaseLocker.setStatements(new Statements());
+                    kahaDBPersistenceAdapter.setLocker(leaseDatabaseLocker);
+                    configureLocker(kahaDBPersistenceAdapter);
+                    configureBroker(broker);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+
+    protected void configureLocker(KahaDBPersistenceAdapter kahaDBPersistenceAdapter) throws IOException {
+        kahaDBPersistenceAdapter.setLockKeepAlivePeriod(500);
+        kahaDBPersistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
+    }
+
+    @Override
+    public void testVirtualTopicFailover() throws Exception {
+        // Ignoring for now, see AMQ-4842
+    }
+
+    protected DataSource getExistingDataSource() throws Exception {
+        return sharedDs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
new file mode 100644
index 0000000..ad9cca1
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class mKahaDbQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("master");
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+
+        MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+        List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+        FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+        defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        defaultEntry.setPerDestination(true);
+        adapters.add(defaultEntry);
+
+        mKahaDB.setFilteredPersistenceAdapters(adapters);
+        master.setPersistenceAdapter(mKahaDB);
+
+        master.start();
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
+                    // no need for broker.setMasterConnectorURI(masterConnectorURI)
+                    // as the db lock provides the slave/master initialisation
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+
+                    MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+                    List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+                    FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+                    defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+                    defaultEntry.setPerDestination(true);
+                    adapters.add(defaultEntry);
+
+                    mKahaDB.setFilteredPersistenceAdapters(adapters);
+                    broker.setPersistenceAdapter(mKahaDB);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml
new file mode 100644
index 0000000..f5eef67
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileMaster.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="shared"  useJmx="false" deleteAllMessagesOnStartup="true"  xmlns="http://activemq.apache.org/schema/core">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:62001"/>
+    </transportConnectors>
+       
+  </broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml
new file mode 100644
index 0000000..7c88a8c
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ft/sharedFileSlave.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="shared" useJmx="false"  deleteAllMessagesOnStartup="false"  xmlns="http://activemq.apache.org/schema/core">
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:62002"/>
+    </transportConnectors>
+    
+       
+  </broker>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java
new file mode 100644
index 0000000..6910bed
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/BrokerViewSlowStoreStartupTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to verify that the BrokerView accessed while the BrokerSerivce is waiting
+ * for a Slow Store startup to complete doesn't throw unexpected NullPointerExceptions.
+ */
+public class BrokerViewSlowStoreStartupTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerViewSlowStoreStartupTest.class);
+
+    private final CountDownLatch holdStoreStart = new CountDownLatch(1);
+    private final String brokerName = "brokerViewTest";
+
+    private BrokerService broker;
+    private Thread startThread;
+
+    private BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+
+        KahaDBStore kaha = new KahaDBStore() {
+
+            @Override
+            public void start() throws Exception {
+                LOG.info("Test KahaDB class is waiting for signal to complete its start()");
+                holdStoreStart.await();
+                super.start();
+                LOG.info("Test KahaDB class is completed its start()");
+            }
+        };
+
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+
+        broker.setPersistenceAdapter(kaha);
+        broker.setUseJmx(true);
+
+        return broker;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+
+        startThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    broker.start();
+                } catch(Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        startThread.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+        // ensure we don't keep the broker held if an exception occurs somewhere.
+        holdStoreStart.countDown();
+
+        startThread.join();
+
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout=120000)
+    public void testBrokerViewOnSlowStoreStart() throws Exception {
+
+        // Ensure we have an Admin View.
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+        final BrokerView view = broker.getAdminView();
+
+        try {
+            view.getBrokerName();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getBrokerId();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalEnqueueCount();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalDequeueCount();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalConsumerCount();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalProducerCount();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalMessageCount();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTotalMessagesCached();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.resetStatistics();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.enableStatistics();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.disableStatistics();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.isStatisticsEnabled();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTopics();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getQueues();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryTopics();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryQueues();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTopicSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getDurableTopicSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getQueueSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryTopicSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryQueueSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getInactiveDurableTopicSubscribers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTopicProducers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getQueueProducers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryTopicProducers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getTemporaryQueueProducers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.getDynamicDestinationProducers();
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.removeConnector("tcp");
+            fail("Should have thrown an NoSuchElementException");
+        } catch(NoSuchElementException e) {
+        }
+
+        try {
+            view.removeNetworkConnector("tcp");
+            fail("Should have thrown an NoSuchElementException");
+        } catch(NoSuchElementException e) {
+        }
+
+        try {
+            view.addTopic("TEST");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.addQueue("TEST");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.removeTopic("TEST");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.removeQueue("TEST");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.createDurableSubscriber("1", "2", "3","4");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        try {
+            view.destroyDurableSubscriber("1", "2");
+            fail("Should have thrown an IllegalStateException");
+        } catch(IllegalStateException e) {
+        }
+
+        holdStoreStart.countDown();
+        startThread.join();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getBroker() != null;
+            }
+        });
+        assertNotNull(view.getBroker());
+
+        try {
+            view.getBrokerName();
+        } catch(Exception e) {
+            fail("caught an exception getting the Broker property: " + e.getClass().getName());
+        }
+
+        try {
+            view.getBrokerId();
+        } catch(IllegalStateException e) {
+            fail("caught an exception getting the Broker property: " + e.getClass().getName());
+        }
+
+        try {
+            view.getTotalEnqueueCount();
+        } catch(IllegalStateException e) {
+            fail("caught an exception getting the Broker property: " + e.getClass().getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
new file mode 100644
index 0000000..9998be9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class);
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        useTopic = false;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64);
+        answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64);
+        answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64);
+        answer.setUseJmx(true);
+        answer.setSchedulerSupport(true);
+
+        // allow options to be visible via jmx
+
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    public void testHealthView() throws Exception{
+        Connection connection = connectionFactory.createConnection();
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination();
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        for (int i = 0; i < 60; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[1024 *1024]);
+            producer.send(message);
+        }
+
+        Thread.sleep(1000);
+
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += ",service=Health";
+        ObjectName brokerName = assertRegisteredObjectName(objectNameStr);
+        HealthViewMBean health =  MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, HealthViewMBean.class, true);
+        List<HealthStatus> list = health.healthList();
+
+        for (HealthStatus status : list) {
+            LOG.info("Health status: {}", status);
+        }
+
+        assertEquals(2, list.size());
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            LOG.info("Bean Registered: " + objectName);
+        } else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java
new file mode 100644
index 0000000..2c2b373
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/jmx/Log4JConfigTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+public class Log4JConfigTest extends EmbeddedBrokerTestSupport {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Log4JConfigTest.class);
+
+    private static final String BROKER_LOGGER = "org.apache.activemq.broker.BrokerService";
+
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        useTopic = false;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setUseJmx(true);
+        answer.setSchedulerSupport(true);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    @Test
+    public void testLog4JConfigViewExists() throws Exception {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        String log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName).toString();
+        assertRegisteredObjectName(log4jConfigViewName);
+    }
+
+    @Test
+    public void testLog4JConfigViewGetLoggers() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        List<String> loggers = log4jConfigView.getLoggers();
+        assertNotNull(loggers);
+        assertFalse(loggers.isEmpty());
+    }
+
+    @Test
+    public void testLog4JConfigViewGetLevel() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        String level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertFalse(level.isEmpty());
+    }
+
+    @Test
+    public void testLog4JConfigViewGetLevelUnknownLoggerName() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        // Non-existent loggers will return a name equal to the root level.
+        String level = log4jConfigView.getLogLevel("not.a.logger");
+        assertNotNull(level);
+        assertFalse(level.isEmpty());
+        assertEquals(Logger.getRootLogger().getLevel().toString(), level);
+    }
+
+    @Test
+    public void testLog4JConfigViewSetLevel() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        String level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertFalse(level.isEmpty());
+
+        log4jConfigView.setLogLevel(BROKER_LOGGER, "WARN");
+        level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertEquals("WARN", level);
+
+        log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO");
+        level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertEquals("INFO", level);
+    }
+
+    @Test
+    public void testLog4JConfigViewSetLevelNoChangeIfLevelIsBad() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        log4jConfigView.setLogLevel(BROKER_LOGGER, "INFO");
+        String level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertEquals("INFO", level);
+
+        log4jConfigView.setLogLevel(BROKER_LOGGER, "BAD");
+        level = log4jConfigView.getLogLevel(BROKER_LOGGER);
+        assertNotNull(level);
+        assertEquals("INFO", level);
+    }
+
+    @Test
+    public void testLog4JConfigViewGetRootLogLevel() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        String level = log4jConfigView.getRootLogLevel();
+        assertNotNull(level);
+        assertFalse(level.isEmpty());
+
+        String currentRootLevel = Logger.getRootLogger().getLevel().toString();
+        assertEquals(currentRootLevel, level);
+    }
+
+    @Test
+    public void testLog4JConfigViewSetRootLevel() throws Throwable {
+        String brokerObjectName = broker.getBrokerObjectName().toString();
+        ObjectName log4jConfigViewName = BrokerMBeanSupport.createLog4JConfigViewName(brokerObjectName);
+        Log4JConfigViewMBean log4jConfigView =  MBeanServerInvocationHandler.newProxyInstance(
+            mbeanServer, log4jConfigViewName, Log4JConfigViewMBean.class, true);
+
+        String currentRootLevel = Logger.getRootLogger().getLevel().toString();
+        log4jConfigView.setRootLogLevel("WARN");
+        currentRootLevel = Logger.getRootLogger().getLevel().toString();
+        assertEquals("WARN", currentRootLevel);
+        log4jConfigView.setRootLogLevel("INFO");
+        currentRootLevel = Logger.getRootLogger().getLevel().toString();
+        assertEquals("INFO", currentRootLevel);
+
+        Level level;
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            LOG.info("Bean Registered: " + objectName);
+        } else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+}


Mime
View raw message