qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject [2/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP 0-x client specific test failover to client suite
Date Sun, 21 Jan 2018 10:11:13 GMT
QPID-6933: [System Tests] Move AMQP 0-x client specific test failover to client suite


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/588c65f7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/588c65f7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/588c65f7

Branch: refs/heads/master
Commit: 588c65f77406318c1884cf0aed37bf74f1f495ae
Parents: 25f11ba
Author: Keith Wall <kwall@apache.org>
Authored: Sun Jan 21 08:48:15 2018 +0000
Committer: Keith Wall <kwall@apache.org>
Committed: Sun Jan 21 08:48:15 2018 +0000

----------------------------------------------------------------------
 .../AddressBasedFailoverBehaviourTest.java      |   34 -
 .../client/failover/FailoverBehaviourTest.java  | 1630 ------------------
 .../failover/MultipleBrokersFailoverTest.java   |  272 ---
 .../server/failover/FailoverMethodTest.java     |  271 ---
 .../qpid/test/client/failover/FailoverTest.java |  325 ----
 .../qpid/test/utils/FailoverBaseCase.java       |  163 --
 test-profiles/CPPExcludes                       |   10 -
 test-profiles/ExternalBrokerTests               |    9 -
 test-profiles/Java010Excludes                   |   14 -
 test-profiles/Java10Excludes                    |   13 -
 test-profiles/JavaPre010Excludes                |    3 -
 test-profiles/JavaTransientExcludes             |    4 -
 12 files changed, 2748 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
deleted file mode 100644
index 99fcbc5..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
-{
-    @Override
-    protected Destination createDestination(Session session) throws JMSException
-    {
-        return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
deleted file mode 100644
index dd04d6d..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ /dev/null
@@ -1,1630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.client.failover;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TransactionRolledBackException;
-import javax.naming.NamingException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BrokerDetails;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * Test suite to test all possible failover corner cases
- */
-public class FailoverBehaviourTest extends FailoverBaseCase implements ExceptionListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);
-
-    private static final String TEST_MESSAGE_FORMAT = "test message {0}";
-
-    /** Indicates whether tests are run against clustered broker */
-    private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
-
-    /** Default number of messages to send before failover */
-    private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
-
-    /** Actual number of messages to send before failover */
-    protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
-
-    /** Test connection */
-    protected Connection _connection;
-
-    /**
-     * Consumer session
-     */
-    private Session _consumerSession;
-
-    /**
-     * Test destination
-     */
-    private Destination _destination;
-
-    /**
-     * Consumer
-     */
-    private MessageConsumer _consumer;
-
-    /**
-     * Producer session
-     */
-    private Session _producerSession;
-
-    /**
-     * Producer
-     */
-    private MessageProducer _producer;
-
-    /**
-     * Holds exception sent into {@link ExceptionListener} on failover
-     */
-    private JMSException _exceptionListenerException;
-
-    @Override
-    public void setUp() throws Exception
-    {
-        super.setUp();
-
-        _connection = getConnection();
-        _connection.setExceptionListener(this);
-        ((AMQConnection) _connection).setConnectionListener(this);
-    }
-
-    /**
-     * Test whether MessageProducer can successfully publish messages after
-     * failover and rollback transaction
-     */
-    public void testMessageProducingAndRollbackAfterFailover() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        produceMessages();
-        causeFailure();
-
-        assertFailoverException();
-        // producer should be able to send messages after failover
-        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
-        // rollback after failover
-        _producerSession.rollback();
-
-        // tests whether sending and committing is working after failover
-        produceMessages();
-        _producerSession.commit();
-
-        // tests whether receiving and committing is working after failover
-        consumeMessages();
-        _consumerSession.commit();
-    }
-
-    /**
-     * Test whether {@link TransactionRolledBackException} is thrown on commit
-     * of dirty transacted session after failover.
-     * <p>
-     * Verifies whether second after failover commit is successful.
-     */
-    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        produceMessages();
-        causeFailure();
-
-        assertFailoverException();
-
-        // producer should be able to send messages after failover
-        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
-        try
-        {
-            _producerSession.commit();
-            fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
-        }
-        catch (JMSException t)
-        {
-            assertTrue("Expected TransactionRolledBackException but thrown " + t,
-                    t instanceof TransactionRolledBackException);
-        }
-
-        // simulate process of user replaying the transaction
-        produceMessages("replayed test message {0}", _messageNumber, false);
-
-        // no exception should be thrown
-        _producerSession.commit();
-
-        // only messages sent after rollback should be received
-        consumeMessages("replayed test message {0}", _messageNumber);
-
-        // no exception should be thrown
-        _consumerSession.commit();
-    }
-
-    /**
-     * Tests JMSException is not thrown on commit with a clean session after
-     * failover
-     */
-    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-
-        causeFailure();
-
-        assertFailoverException();
-
-        // should not throw an exception for a clean session
-        _producerSession.commit();
-
-        // tests whether sending and committing is working after failover
-        produceMessages();
-        _producerSession.commit();
-
-        // tests whether receiving and committing is working after failover
-        consumeMessages();
-        _consumerSession.commit();
-    }
-
-    /**
-     * Tests {@link TransactionRolledBackException} is thrown on commit of dirty
-     * transacted session after failover.
-     * <p>
-     * Verifies whether second after failover commit is successful.
-     */
-    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        produceMessages();
-        _producerSession.commit();
-
-        // receive messages but do not commit
-        consumeMessages();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        try
-        {
-            // should throw TransactionRolledBackException
-            _consumerSession.commit();
-            fail("TransactionRolledBackException is expected on commit after failover");
-        }
-        catch (Exception t)
-        {
-            assertTrue("Expected TransactionRolledBackException but thrown " + t,
-                    t instanceof TransactionRolledBackException);
-        }
-
-        resendMessagesIfNecessary();
-
-        // consume messages successfully
-        consumeMessages();
-        _consumerSession.commit();
-    }
-
-    /**
-     * Tests JMSException is not thrown on commit with a clean session after failover
-     */
-    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        produceMessages();
-        _producerSession.commit();
-
-        consumeMessages();
-        _consumerSession.commit();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        // should not throw an exception with a clean consumer session
-        _consumerSession.commit();
-    }
-
-    /**
-     * Test that TransactionRolledBackException is thrown on commit of
-     * dirty session in asynchronous consumer after failover.
-     */
-    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
-    throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, false);
-        FailoverTestMessageListener ml = new FailoverTestMessageListener();
-        _consumer.setMessageListener(ml);
-
-        _connection.start();
-
-        produceMessages();
-        _producerSession.commit();
-
-        // wait for message receiving
-        ml.awaitForEnd();
-
-        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
-        // assert messages
-        int counter = 0;
-        for (Message message : ml.getReceivedMessages())
-        {
-            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
-        }
-        ml.reset();
-
-        causeFailure();
-        assertFailoverException();
-
-
-        try
-        {
-            _consumerSession.commit();
-            fail("TransactionRolledBackException should be thrown!");
-        }
-        catch (TransactionRolledBackException e)
-        {
-            // that is what is expected
-        }
-
-        resendMessagesIfNecessary();
-
-        // wait for message receiving
-        ml.awaitForEnd();
-
-        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
-        // assert messages
-        counter = 0;
-        for (Message message : ml.getReceivedMessages())
-        {
-            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
-        }
-
-        // commit again. It should be successful
-        _consumerSession.commit();
-    }
-
-    /**
-     * Test that {@link Session#rollback()} does not throw exception after failover
-     * and that we are able to consume messages.
-     */
-    public void testRollbackAfterFailover() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-
-        produceMessages();
-        _producerSession.commit();
-
-        consumeMessages();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        _consumerSession.rollback();
-
-        resendMessagesIfNecessary();
-
-        // tests whether receiving and committing is working after failover
-        consumeMessages();
-        _consumerSession.commit();
-    }
-
-    /**
-     * Test that {@link Session#rollback()} does not throw exception after receiving further messages
-     * after failover, and we can receive published messages after rollback.
-     */
-    public void testRollbackAfterReceivingAfterFailover() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-
-        produceMessages();
-        _producerSession.commit();
-
-        consumeMessages();
-        causeFailure();
-
-        assertFailoverException();
-
-        resendMessagesIfNecessary();
-
-        consumeMessages();
-
-        _consumerSession.rollback();
-
-        // tests whether receiving and committing is working after failover
-        consumeMessages();
-        _consumerSession.commit();
-    }
-
-    /**
-     * Test that {@link Session#recover()} does not throw an exception after failover
-     * and that we can consume messages after recover.
-     */
-    public void testRecoverAfterFailover() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // consume messages but do not acknowledge them
-        consumeMessages();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        _consumerSession.recover();
-
-        resendMessagesIfNecessary();
-
-        // tests whether receiving and acknowledgment is working after recover
-        Message lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-    }
-
-    /**
-     * Test that receiving more messages after failover and then calling
-     * {@link Session#recover()} does not throw an exception
-     * and that we can consume messages after recover.
-     */
-    public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // consume messages but do not acknowledge them
-        consumeMessages();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        // publishing should work after failover
-        resendMessagesIfNecessary();
-
-        // consume messages again on a dirty session
-        consumeMessages();
-
-        // recover should successfully restore session
-        _consumerSession.recover();
-
-        // tests whether receiving and acknowledgment is working after recover
-        Message lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-    }
-
-    /**
-     * Test that first call to {@link Message#acknowledge()} after failover
-     * throws a JMSEXception if session is dirty.
-     */
-    public void testAcknowledgeAfterFailover() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // consume messages but do not acknowledge them
-        Message lastMessage = consumeMessages();
-        causeFailure();
-
-        assertFailoverException();
-
-        try
-        {
-            // an implicit recover performed when acknowledge throws an exception due to failover
-            lastMessage.acknowledge();
-            fail("JMSException should be thrown");
-        }
-        catch (JMSException t)
-        {
-            // TODO: assert error code and/or expected exception type
-        }
-
-        resendMessagesIfNecessary();
-
-        // tests whether receiving and acknowledgment is working after recover
-        lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-    }
-
-    /**
-     * Test that calling acknowledge before failover leaves the session
-     * clean for use after failover.
-     */
-    public void testAcknowledgeBeforeFailover() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // consume messages and acknowledge them
-        Message lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-
-        causeFailure();
-
-        assertFailoverException();
-
-        produceMessages();
-
-        // tests whether receiving and acknowledgment is working after recover
-        lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-    }
-
-    /**
-     * Test that receiving of messages after failover prior to calling
-     * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
-     */
-    public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // consume messages but do not acknowledge them
-        consumeMessages();
-        causeFailure();
-
-        assertFailoverException();
-
-        resendMessagesIfNecessary();
-
-        // consume again on dirty session
-        Message lastMessage = consumeMessages();
-        try
-        {
-            // an implicit recover performed when acknowledge throws an exception due to failover
-            lastMessage.acknowledge();
-            fail("JMSException should be thrown");
-        }
-        catch (JMSException t)
-        {
-            // TODO: assert error code and/or expected exception type
-        }
-
-        // tests whether receiving and acknowledgment is working on a clean session
-        lastMessage = consumeMessages();
-        lastMessage.acknowledge();
-    }
-
-    /**
-     * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
-     * and we can consume messages after acknowledge.
-     */
-    public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
-    {
-        init(Session.CLIENT_ACKNOWLEDGE, false);
-        FailoverTestMessageListener ml = new FailoverTestMessageListener();
-        _consumer.setMessageListener(ml);
-        _connection.start();
-
-        produceMessages();
-
-        // wait for message receiving
-        ml.awaitForEnd();
-
-        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
-        // assert messages
-        int counter = 0;
-        Message currentMessage = null;
-        for (Message message : ml.getReceivedMessages())
-        {
-            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
-            currentMessage = message;
-        }
-        ml.reset();
-
-        causeFailure();
-        assertFailoverException();
-
-
-        try
-        {
-            currentMessage.acknowledge();
-            fail("JMSException should be thrown!");
-        }
-        catch (JMSException e)
-        {
-            // TODO: assert error code and/or expected exception type
-        }
-
-        resendMessagesIfNecessary();
-
-        // wait for message receiving
-        ml.awaitForEnd();
-
-        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
-        // assert messages
-        counter = 0;
-        for (Message message : ml.getReceivedMessages())
-        {
-            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
-            currentMessage = message;
-        }
-
-        // acknowledge again. It should be successful
-        currentMessage.acknowledge();
-    }
-
-    /**
-     * Test whether {@link Session#recover()} works as expected after failover
-     * in AA mode.
-     */
-    public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
-    {
-        init(Session.AUTO_ACKNOWLEDGE, true);
-
-        produceMessages();
-
-        // receive first message in order to start a dispatcher thread
-        Message receivedMessage = _consumer.receive(1000l);
-        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
-        causeFailure();
-
-        assertFailoverException();
-
-        _consumerSession.recover();
-
-        resendMessagesIfNecessary();
-
-        // tests whether receiving is working after recover
-        consumeMessages();
-    }
-
-    public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
-    {
-        sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
-    }
-
-    public void testTransactedSessionCloseAfterFailover() throws Exception
-    {
-        sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
-    }
-
-    public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
-    {
-        sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void testPublishAutoAcknowledgedWhileFailover() throws Exception
-    {
-        publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void testPublishClientAcknowledgedWhileFailover() throws Exception
-    {
-        Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
-        receivedMessage.acknowledge();
-    }
-
-    public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
-    {
-        publishWhileFailingOver(Session.SESSION_TRANSACTED);
-        _consumerSession.commit();
-    }
-
-    public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
-    {
-        publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
-    {
-        publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
-
-    }
-
-    public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
-    {
-        publishWithFailoverMutex(Session.SESSION_TRANSACTED);
-    }
-
-    public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
-    {
-        sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
-    }
-
-    public void testTransactedSessionCloseWhileFailover() throws Exception
-    {
-        sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
-    }
-
-    public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
-    {
-        sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
-    {
-        browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
-    }
-
-    public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
-    {
-        browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
-    }
-
-    public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
-    {
-        browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
-    {
-        doFailoverWhilstPublishingInFlight(true);
-    }
-
-    public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
-    {
-        doFailoverWhilstPublishingInFlight(false);
-    }
-
-    private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, false);
-
-        final int numberOfMessages = 200;
-
-        final CountDownLatch halfWay = new CountDownLatch(1);
-        final CountDownLatch allDone = new CountDownLatch(1);
-        final AtomicReference<Exception> exception = new AtomicReference<>();
-
-        Runnable producerRunnable = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                Thread.currentThread().setName("ProducingThread");
-
-                try
-                {
-                    for(int i=0; i< numberOfMessages; i++)
-                    {
-                        boolean success = false;
-                        while(!success)
-                        {
-                            try
-                            {
-                                Message message = _producerSession.createMessage();
-                                message.setIntProperty("msgNum", i);
-                                _producer.send(message);
-                                _producerSession.commit();
-                                success = true;
-                            }
-                            catch (javax.jms.IllegalStateException e)
-                            {
-                                // fail - failover should not leave a JMS object in an illegal state
-                                throw e;
-                            }
-                            catch (JMSException e)
-                            {
-                                // OK we will be failing over
-                                LOGGER.debug("Got JMS exception, probably just failing over", e);
-                            }
-                        }
-
-                        if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
-                        {
-                            halfWay.countDown();
-                        }
-                    }
-
-                    allDone.countDown();
-                }
-                catch (Exception e)
-                {
-                    exception.set(e);
-                }
-            }
-        };
-
-        Thread producerThread = new Thread(producerRunnable);
-        producerThread.start();
-
-        assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
-
-        if (hardKill)
-        {
-            LOGGER.debug("Killing the Broker");
-            killDefaultBroker();
-        }
-        else
-        {
-            LOGGER.debug("Stopping the Broker");
-            stopDefaultBroker();
-        }
-
-        if (exception.get() != null)
-        {
-            LOGGER.error("Unexpected exception from producer thread", exception.get());
-        }
-        assertNull("Producer thread should not have got an exception", exception.get());
-
-        assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
-
-        producerThread.join(30000);
-
-        // Extra work to prove the session still okay
-        assertNotNull(_producerSession.createTemporaryQueue());
-    }
-
-
-    private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
-    {
-        setDelayedFailoverPolicy(5);
-        init(autoAcknowledge, true);
-
-        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
-        Message message = _producerSession.createTextMessage(text);
-
-        failDefaultBroker();
-
-        if(!_failoverStarted.await(5, TimeUnit.SECONDS))
-        {
-            fail("Did not receieve notification failover had started");
-        }
-
-        _producer.send(message);
-
-        if (_producerSession.getTransacted())
-        {
-            _producerSession.commit();
-        }
-
-        Message receivedMessage = _consumer.receive(1000l);
-        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-        return receivedMessage;
-    }
-
-    private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
-    {
-        setDelayedFailoverPolicy(5);
-        init(autoAcknowledge, true);
-
-        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
-        Message message = _producerSession.createTextMessage(text);
-
-        AMQConnection connection = (AMQConnection)_connection;
-
-        // holding failover mutex should prevent the failover from
-        // proceeding before we try to send the message
-        synchronized(connection.getFailoverMutex())
-        {
-            failDefaultBroker();
-
-            // wait to make sure that connection is lost
-            while(!connection.isFailingOver())
-            {
-                Thread.sleep(25l);
-            }
-
-            try
-            {
-                _producer.send(message);
-                fail("Sending should fail because connection was lost and failover has not yet completed");
-            }
-            catch(JMSException e)
-            {
-                // JMSException is expected
-            }
-        }
-        // wait for failover completion, thus ensuring it actually
-        //got started, before allowing the test to tear down
-        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
-     }
-
-    /**
-     * This test only tests 0-8/0-9/0-9-1 failover timeout
-     */
-    public void testFailoverHandlerTimeoutExpires() throws Exception
-    {
-        _connection.close();
-        setTestSystemProperty("qpid.failover_method_timeout", "10000");
-        AMQConnection connection = null;
-        try
-        {
-            connection = createConnectionWithFailover();
-
-            // holding failover mutex should prevent the failover from proceeding
-            synchronized(connection.getFailoverMutex())
-            {
-                killDefaultBroker();
-                startDefaultBroker();
-
-                // sleep interval exceeds failover timeout interval
-                Thread.sleep(11000l);
-            }
-
-            // allows the failover thread to proceed
-            Thread.yield();
-            assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
-            assertTrue("Failover should not succeed due to timeout", connection.isClosed());
-        }
-        finally
-        {
-            if (connection != null)
-            {
-                connection.close();
-            }
-        }
-    }
-
-    public void testFailoverHandlerTimeoutReconnected() throws Exception
-    {
-        _connection.close();
-        setTestSystemProperty("qpid.failover_method_timeout", "10000");
-        AMQConnection connection = null;
-        try
-        {
-            connection = createConnectionWithFailover();
-
-            // holding failover mutex should prevent the failover from proceeding
-            synchronized(connection.getFailoverMutex())
-            {
-                killDefaultBroker();
-                startDefaultBroker();
-            }
-
-            // allows the failover thread to proceed
-            Thread.yield();
-            awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
-            assertFalse("Failover should restore connectivity", connection.isClosed());
-        }
-        finally
-        {
-            if (connection != null)
-            {
-                connection.close();
-            }
-        }
-    }
-
-    /**
-     * Tests that the producer flow control flag is reset when failover occurs while
-     * the producers are being blocked by the broker.
-     *
-     * Uses Apache Qpid Broker-J specific queue configuration to enabled PSFC.
-     */
-    public void testFlowControlFlagResetOnFailover() throws Exception
-    {
-        // we do not need the connection failing to second broker
-        _connection.close();
-
-        // make sure that failover timeout is bigger than flow control timeout
-        setTestSystemProperty("qpid.failover_method_timeout", "60000");
-        setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
-
-        AMQConnection connection = null;
-        try
-        {
-            connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));
-
-            final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-            final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
-            final AtomicInteger counter = new AtomicInteger();
-            // try to send 5 messages (should block after 4)
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        MessageProducer producer = producerSession.createProducer(queue);
-                        for (int i=0; i < 5; i++)
-                        {
-                            Message next = createNextMessage(producerSession, i);
-                            producer.send(next);
-                            producerSession.commit();
-                            counter.incrementAndGet();
-                        }
-                    }
-                    catch(Exception e)
-                    {
-                        // ignore
-                    }
-                }
-            }).start();
-
-            long limit= 30000l;
-            long start = System.currentTimeMillis();
-
-            // wait  until session is blocked
-            while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
-            {
-                Thread.sleep(100l);
-            }
-
-            assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
-
-            final int currentCounter = counter.get();
-            assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);
-
-            killDefaultBroker();
-            startDefaultBroker();
-
-            // allows the failover thread to proceed
-            Thread.yield();
-            awaitForFailoverCompletion(60000l);
-
-            assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
-        }
-        finally
-        {
-            if (connection != null)
-            {
-                connection.close();
-            }
-        }
-    }
-
-    public void testFailoverWhenConnectionStopped() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-
-        produceMessages();
-        _producerSession.commit();
-
-        final CountDownLatch stopFlag = new CountDownLatch(1);
-        final AtomicReference<Exception> exception = new AtomicReference<>();
-        final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
-        final AtomicInteger counter = new AtomicInteger();
-
-        _consumer.setMessageListener(new MessageListener()
-        {
-            @Override
-            public void onMessage(Message message)
-            {
-                if (stopFlag.getCount() == 1)
-                {
-                    try
-                    {
-                        LOGGER.debug("Stopping connection from dispatcher thread");
-                        _connection.stop();
-                        LOGGER.debug("Connection stopped from dispatcher thread");
-
-                    }
-                    catch (Exception e)
-                    {
-                        exception.set(e);
-                    }
-                    finally
-                    {
-                        stopFlag.countDown();
-
-                        failDefaultBroker();
-                    }
-
-                }
-                else
-                {
-                    try
-                    {
-                        _consumerSession.commit();
-                        counter.incrementAndGet();
-                        expectedMessageLatch.countDown();
-                    }
-                    catch (Exception e)
-                    {
-                        exception.set(e);
-                    }
-                }
-            }
-        });
-
-
-        boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
-        assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
-                stopResult);
-        assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
-
-        // wait for failover to complete
-        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
-        assertFailoverException();
-
-        resendMessagesIfNecessary();
-        _producerSession.commit();
-
-        _connection.start();
-
-        assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
-
-        Thread.sleep(500l);
-        assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
-
-        _connection.close();
-    }
-
-    public void testConnectionCloseInterruptsFailover() throws Exception
-    {
-        _connection.close();
-
-        final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
-        final CountDownLatch failoverBegun = new CountDownLatch(1);
-
-        AMQConnection connection = createConnectionWithFailover();
-        connection.setConnectionListener(new ConnectionListener()
-        {
-            @Override
-            public void bytesSent(final long count)
-            {
-            }
-
-            @Override
-            public void bytesReceived(final long count)
-            {
-            }
-
-            @Override
-            public boolean preFailover(final boolean redirect)
-            {
-                failoverBegun.countDown();
-                LOGGER.info("Failover started");
-                return true;
-            }
-
-            @Override
-            public boolean preResubscribe()
-            {
-                return true;
-            }
-
-            @Override
-            public void failoverComplete()
-            {
-                failoverCompleted.set(true);
-            }
-        });
-
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        assertNotNull("Session should be created", session);
-        killDefaultBroker();
-
-        boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
-        assertTrue("Failover did not begin with a reasonable time", failingOver);
-
-        // Failover will now be in flight
-        connection.close();
-        assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
-    }
-
-    private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
-    {
-        final Map<String, Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity", capacity);
-        arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
-        ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
-        Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
-                + "'&autodelete='" + false + "'");
-        ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
-        return queue;
-    }
-
-    private AMQConnection createConnectionWithFailover() throws NamingException, JMSException, URLSyntaxException
-    {
-        return createConnectionWithFailover(null);
-    }
-
-    private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
-    {
-        String retries = "200";
-        String connectdelay = "1000";
-        String cycleCount = "2";
-
-        String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
-                            "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
-
-        String newUrl = String.format(newUrlFormat, "localhost", getDefaultAmqpPort(),
-                                                    retries, connectdelay, cycleCount);
-
-        if (connectionOptions != null)
-        {
-            for (Map.Entry<String,String> option: connectionOptions.entrySet())
-            {
-                newUrl+= "&" + option.getKey() + "='" + option.getValue() + "'";
-            }
-        }
-        ConnectionFactory connectionFactory = new AMQConnectionFactory(newUrl);
-        AMQConnection connection = (AMQConnection) connectionFactory.createConnection("admin", "admin");
-        connection.setConnectionListener(this);
-        return connection;
-    }
-
-    /**
-     * Tests {@link Session#close()} for session with given acknowledge mode
-     * to ensure that close works after failover.
-     *
-     * @param acknowledgeMode session acknowledge mode
-     * @throws JMSException
-     */
-    private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
-    {
-        init(acknowledgeMode, true);
-        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
-        if (acknowledgeMode == Session.SESSION_TRANSACTED)
-        {
-            _producerSession.commit();
-        }
-
-        // intentionally receive message but do not commit or acknowledge it in
-        // case of transacted or CLIENT_ACK session
-        Message receivedMessage = _consumer.receive(1000l);
-        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
-        causeFailure();
-
-        assertFailoverException();
-
-        // for transacted/client_ack session
-        // no exception should be thrown but transaction should be automatically
-        // rolled back
-        _consumerSession.close();
-    }
-
-    /**
-     * A helper method to instantiate produce and consumer sessions, producer
-     * and consumer.
-     *
-     * @param acknowledgeMode
-     *            acknowledge mode
-     * @param startConnection
-     *            indicates whether connection should be started
-     * @throws JMSException
-     */
-    private void init(int acknowledgeMode, boolean startConnection) throws JMSException
-    {
-        boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
-        _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
-        _destination = createDestination(_consumerSession);
-        _consumer = _consumerSession.createConsumer(_destination);
-
-        if (startConnection)
-        {
-            _connection.start();
-        }
-
-        _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
-        _producer = _producerSession.createProducer(_destination);
-
-    }
-
-    protected Destination createDestination(Session session) throws JMSException
-    {
-        return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
-    }
-
-    /**
-     * Resends messages if reconnected to a non-clustered broker
-     *
-     * @throws JMSException
-     */
-    private void resendMessagesIfNecessary() throws JMSException
-    {
-        if (!CLUSTERED)
-        {
-            // assert that a new broker does not have messages on a queue
-            if (_consumer.getMessageListener() == null)
-            {
-                Message message = _consumer.receive(100l);
-                assertNull("Received a message after failover with non-clustered broker!", message);
-            }
-            // re-sending messages if reconnected to a non-clustered broker
-            produceMessages(true);
-        }
-    }
-
-    /**
-     * Produces a default number of messages with default text content into test
-     * queue
-     *
-     * @throws JMSException
-     */
-    private void produceMessages() throws JMSException
-    {
-        produceMessages(false);
-    }
-
-    private void produceMessages(boolean seperateProducer) throws JMSException
-    {
-        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
-    }
-
-    /**
-     * Consumes a default number of messages and asserts their content.
-     *
-     * @return last consumed message
-     * @throws JMSException
-     */
-    private Message consumeMessages() throws JMSException
-    {
-        return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
-    }
-
-    /**
-     * Produces given number of text messages with content matching given
-     * content pattern
-     *
-     * @param messagePattern message content pattern
-     * @param messageNumber  number of messages to send
-     * @param standaloneProducer whether to use the existing producer or a new one.
-     * @throws JMSException
-     */
-    private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
-    {
-        Session producerSession;
-        MessageProducer producer;
-
-        if(standaloneProducer)
-        {
-            producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
-            producer = producerSession.createProducer(_destination);
-        }
-        else
-        {
-            producerSession = _producerSession;
-            producer = _producer;
-        }
-
-        for (int i = 0; i < messageNumber; i++)
-        {
-            String text = MessageFormat.format(messagePattern, i);
-            Message message = producerSession.createTextMessage(text);
-            producer.send(message);
-            LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
-        }
-
-        if(standaloneProducer)
-        {
-            producerSession.commit();
-        }
-    }
-
-    /**
-     * Consumes given number of text messages and asserts that their content
-     * matches given pattern
-     *
-     * @param messagePattern
-     *            messages content pattern
-     * @param messageNumber
-     *            message number to received
-     * @return last consumed message
-     * @throws JMSException
-     */
-    private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
-    {
-        Message receivedMesssage = null;
-        for (int i = 0; i < messageNumber; i++)
-        {
-            receivedMesssage = _consumer.receive(1000l);
-            assertReceivedMessage(receivedMesssage, messagePattern, i);
-        }
-        return receivedMesssage;
-    }
-
-    /**
-     * Asserts received message
-     *
-     * @param receivedMessage
-     *            received message
-     * @param messagePattern
-     *            messages content pattern
-     * @param messageIndex
-     *            message index
-     */
-    private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
-    {
-        assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
-        assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
-                + receivedMessage, receivedMessage instanceof TextMessage);
-        String expectedText = MessageFormat.format(messagePattern, messageIndex);
-        String receivedText = null;
-        try
-        {
-            receivedText = ((TextMessage) receivedMessage).getText();
-        }
-        catch (JMSException e)
-        {
-            fail("JMSException occured while getting message text:" + e.getMessage());
-        }
-        LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
-        assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
-                expectedText, receivedText);
-    }
-
-    /**
-     * Causes failover and waits till connection is re-established.
-     */
-    private void causeFailure()
-    {
-        causeFailure(DEFAULT_FAILOVER_TIME * 2);
-    }
-
-    /**
-     * Causes failover by stopping broker and waits till
-     * connection is re-established during given time interval.
-     *
-     * @param delay
-     *            time interval to wait for connection re-establishement
-     */
-    private void causeFailure(long delay)
-    {
-        failDefaultBroker();
-
-        awaitForFailoverCompletion(delay);
-    }
-
-    private void awaitForFailoverCompletion(long delay)
-    {
-        LOGGER.info("Awaiting {} ms for failover completion..", delay);
-        try
-        {
-            if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
-            {
-                fail("Failover did not complete");
-            }
-        }
-        catch (InterruptedException e)
-        {
-            fail("Test was interrupted:" + e.getMessage());
-        }
-    }
-
-    private void assertFailoverException()
-    {
-        // TODO: assert exception is received (once implemented)
-        // along with error code and/or expected exception type
-    }
-
-
-    @Override
-    public void onException(JMSException e)
-    {
-        _exceptionListenerException = e;
-    }
-
-    /**
-     * Causes 1 second delay before reconnect in order to test whether JMS
-     * methods block while failover is in progress
-     */
-    private static class DelayingFailoverPolicy extends FailoverPolicy
-    {
-
-        private CountDownLatch _suspendLatch;
-        private long _delay;
-
-        public DelayingFailoverPolicy(AMQConnection connection, long delay)
-        {
-            super(connection.getConnectionURL(), connection);
-            _suspendLatch = new CountDownLatch(1);
-            _delay = delay;
-        }
-
-        @Override
-        public void attainedConnection()
-        {
-            try
-            {
-                _suspendLatch.await(_delay, TimeUnit.SECONDS);
-            }
-            catch (InterruptedException e)
-            {
-                // continue
-            }
-            super.attainedConnection();
-        }
-
-    }
-
-
-    private class FailoverTestMessageListener implements MessageListener
-    {
-        // message counter
-        private AtomicInteger _counter = new AtomicInteger();
-
-        private List<Message> _receivedMessage = new ArrayList<Message>();
-
-        private volatile CountDownLatch _endLatch;
-
-        public FailoverTestMessageListener() throws JMSException
-        {
-            _endLatch = new CountDownLatch(1);
-        }
-
-        @Override
-        public void onMessage(Message message)
-        {
-            _receivedMessage.add(message);
-            if (_counter.incrementAndGet() % _messageNumber == 0)
-            {
-                _endLatch.countDown();
-            }
-        }
-
-        public void reset()
-        {
-            _receivedMessage.clear();
-            _endLatch = new CountDownLatch(1);
-            _counter.set(0);
-        }
-
-        public List<Message> getReceivedMessages()
-        {
-            return _receivedMessage;
-        }
-
-        public Object awaitForEnd() throws InterruptedException
-        {
-            return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
-        }
-
-        public int getMessageCounter()
-        {
-            return _counter.get();
-        }
-    }
-
-    /**
-     * Tests {@link Session#close()} for session with given acknowledge mode
-     * to ensure that it blocks until failover implementation restores connection.
-     *
-     * @param acknowledgeMode session acknowledge mode
-     * @throws JMSException
-     */
-    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
-    {
-        initDelayedFailover(acknowledgeMode);
-
-        // intentionally receive message but not commit or acknowledge it in
-        // case of transacted or CLIENT_ACK session
-        Message receivedMessage = _consumer.receive(1000l);
-        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
-        failDefaultBroker();
-
-        // wait until failover is started
-        _failoverStarted.await(5, TimeUnit.SECONDS);
-
-        // test whether session#close blocks while failover is in progress
-        _consumerSession.close();
-
-        assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
-        assertFailoverException();
-    }
-
-    /**
-     * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
-     *
-     * @param acknowledgeMode session acknowledge mode
-     * @return queue browser
-     * @throws JMSException
-     */
-    private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
-    {
-        init(acknowledgeMode, false);
-        _consumer.close();
-        _connection.start();
-
-        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
-        if (acknowledgeMode == Session.SESSION_TRANSACTED)
-        {
-            _producerSession.commit();
-        }
-        else
-        {
-            ((AMQSession)_producerSession).sync();
-        }
-
-        QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
-        return browser;
-    }
-
-    /**
-     * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
-     * to ensure that it blocks until failover implementation restores connection.
-     *
-     * @param acknowledgeMode session acknowledge mode
-     * @throws JMSException
-     */
-    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
-    {
-        QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
-
-        @SuppressWarnings("unchecked")
-        Enumeration<Message> messages = browser.getEnumeration();
-        Message receivedMessage = (Message) messages.nextElement();
-        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
-        failDefaultBroker();
-
-        // wait until failover is started
-        _failoverStarted.await(5, TimeUnit.SECONDS);
-
-        browser.close();
-
-        assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
-        assertFailoverException();
-    }
-
-    private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
-    {
-        DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
-        init(acknowledgeMode, true);
-        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
-        if (acknowledgeMode == Session.SESSION_TRANSACTED)
-        {
-            _producerSession.commit();
-        }
-        return failoverPolicy;
-    }
-
-    private DelayingFailoverPolicy setDelayedFailoverPolicy()
-    {
-        return setDelayedFailoverPolicy(2);
-    }
-
-    private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
-    {
-        AMQConnection amqConnection = (AMQConnection) _connection;
-        DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
-        ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
-        return failoverPolicy;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
deleted file mode 100644
index f78b9d2..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.test.utils.BrokerHolder;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/*
- * we need to create 4 brokers:
- * 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed)
- * 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- * 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed)
- * 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- */
-public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(MultipleBrokersFailoverTest.class);
-
-    private static final String FAILOVER_VIRTUAL_HOST = "failover";
-    private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover";
-    private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
-    private static final int FAILOVER_RETRIES = 0;
-    private static final int FAILOVER_CONNECTDELAY = 0;
-    private static final long FAILOVER_AWAIT_TIME = FailoverBaseCase.DEFAULT_FAILOVER_TIME;
-    private static final int NUMBER_OF_BROKERS = 4;
-
-    private BrokerHolder[] _brokerHolders;
-    private String _connectionURL;
-    private Connection _connection;
-    private CountDownLatch _failoverComplete;
-    private CountDownLatch _failoverStarted;
-    private Session _consumerSession;
-    private Destination _destination;
-    private MessageConsumer _consumer;
-    private Session _producerSession;
-    private MessageProducer _producer;
-
-    @Override
-    public void startDefaultBroker()
-    {
-        // do not start the default broker for this test
-    }
-
-    @Override
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-
-        _brokerHolders = new BrokerHolder[NUMBER_OF_BROKERS];
-
-        // the test should connect to the second broker first and fail over to the forth broker
-        // after unsuccessful try to establish the connection to the 3d broker
-        for (int i = 0; i < NUMBER_OF_BROKERS; i++)
-        {
-            String host = null;
-            if (i == 1 || i == NUMBER_OF_BROKERS - 1)
-            {
-                host = FAILOVER_VIRTUAL_HOST;
-            }
-            else
-            {
-                host = NON_FAILOVER_VIRTUAL_HOST;
-            }
-
-            BrokerHolder brokerHolder = createSpawnedBroker();
-            createTestVirtualHostNode(brokerHolder, host, true);
-            brokerHolder.start();
-
-            _brokerHolders[i] = brokerHolder;
-        }
-
-        _connectionURL = generateUrlString(NUMBER_OF_BROKERS);
-
-        _connection = getConnection(_connectionURL);
-        ((AMQConnection) _connection).setConnectionListener(this);
-        _failoverComplete = new CountDownLatch(1);
-        _failoverStarted = new CountDownLatch(1);
-    }
-
-    private String generateUrlString(int numBrokers)
-    {
-        String baseString = "amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST
-                            + "?&failover='roundrobin?cyclecount='1''&brokerlist='";
-        StringBuffer buffer = new StringBuffer(baseString);
-
-        for(int i = 0; i< numBrokers ; i++)
-        {
-            if(i != 0)
-            {
-                buffer.append(";");
-            }
-
-            String broker = String.format(BROKER_PORTION_FORMAT, _brokerHolders[i].getAmqpPort(),
-                                          FAILOVER_CONNECTDELAY, FAILOVER_RETRIES);
-            buffer.append(broker);
-        }
-        buffer.append("'");
-
-        return buffer.toString();
-    }
-
-    @Override
-    public void tearDown() throws Exception
-    {
-        try
-        {
-            super.tearDown();
-        }
-        finally
-        {
-            for (BrokerHolder broker : _brokerHolders)
-            {
-                stopBrokerSafely(broker);
-            }
-        }
-    }
-
-
-    public void testFailoverOnBrokerKill() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
-        assertSendReceive(0);
-
-        _brokerHolders[1].kill();
-
-        awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
-        assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
-        assertSendReceive(2);
-        assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
-    }
-
-    public void testFailoverOnBrokerStop() throws Exception
-    {
-        init(Session.SESSION_TRANSACTED, true);
-        assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
-        assertSendReceive(0);
-
-        _brokerHolders[1].shutdown();
-
-        awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
-        assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
-        assertSendReceive(1);
-        assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
-    }
-
-    private void assertConnectionPort(int brokerPort)
-    {
-        int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort();
-        assertEquals("Unexpected broker port", brokerPort, connectionPort);
-    }
-
-    private void assertSendReceive(int index) throws JMSException
-    {
-        Message message = createNextMessage(_producerSession, index);
-        _producer.send(message);
-        if (_producerSession.getTransacted())
-        {
-            _producerSession.commit();
-        }
-        Message receivedMessage = _consumer.receive(1000l);
-        assertReceivedMessage(receivedMessage, index);
-        if (_consumerSession.getTransacted())
-        {
-            _consumerSession.commit();
-        }
-    }
-
-    private void awaitForFailoverCompletion(long delay) throws Exception
-    {
-        LOGGER.info("Awaiting Failover completion..");
-        if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
-        {
-            fail("Failover did not complete within " + delay + "ms.");
-        }
-    }
-
-    private void assertReceivedMessage(Message receivedMessage, int messageIndex)
-    {
-        assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
-        assertTrue(
-                "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage,
-                receivedMessage instanceof TextMessage);
-    }
-
-    private void init(int acknowledgeMode, boolean startConnection) throws Exception
-    {
-        boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
-        _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
-        _destination = _consumerSession.createQueue(getTestQueueName());
-        _consumer = _consumerSession.createConsumer(_destination);
-
-        if (startConnection)
-        {
-            _connection.start();
-        }
-
-        _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
-        _producer = _producerSession.createProducer(_destination);
-
-    }
-
-    @Override
-    public void bytesSent(long count)
-    {
-    }
-
-    @Override
-    public void bytesReceived(long count)
-    {
-    }
-
-    @Override
-    public boolean preFailover(boolean redirect)
-    {
-        _failoverStarted.countDown();
-        return true;
-    }
-
-    @Override
-    public boolean preResubscribe()
-    {
-        return true;
-    }
-
-    @Override
-    public void failoverComplete()
-    {
-        _failoverComplete.countDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
deleted file mode 100644
index 1916664..0000000
--- a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.SystemUtils;
-
-public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
-    private CountDownLatch _failoverComplete = new CountDownLatch(1);
-    private final int _freePortWithNoBroker = findFreePort();
-
-    /**
-     * Test that the round robin method has the correct delays.
-     * The first connection will work but the localhost connection should fail but the duration it takes
-     * to report the failure is what is being tested.
-     *
-     */
-    public void testFailoverRoundRobinDelay() throws Exception
-    {
-        if (SystemUtils.isWindows())
-        {
-            //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
-            return;
-        }
-
-        //note: The first broker has no connect delay and the default 1 retry
-        //        while the tcp:localhost broker has 3 retries with a 2s connect delay
-        String connectionString = "amqp://guest:guest@/test?brokerlist=" +
-                                  "'tcp://localhost:" + getDefaultAmqpPort() +
-                                  ";tcp://localhost:" + _freePortWithNoBroker + "?connectdelay='2000',retries='3''";
-
-        AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
-        try
-        {
-            long start = System.currentTimeMillis();
-            AMQConnection connection = new AMQConnection(url);
-
-            connection.setExceptionListener(this);
-
-            LOGGER.debug("Stopping broker");
-            stopDefaultBroker();
-            LOGGER.debug("Stopped broker");
-
-            _failoverComplete.await(30, TimeUnit.SECONDS);
-            assertEquals("failoverLatch was not decremented in given timeframe",
-                    0, _failoverComplete.getCount());
-
-            long end = System.currentTimeMillis();
-
-            long duration = (end - start);
-
-            //Failover should take more that 6 seconds.
-            // 3 Retries
-            // so VM Broker NoDelay 0 (Connect) NoDelay 0
-            // then TCP NoDelay 0 Delay 1 Delay 2 Delay  3
-            // so 3 delays of 2s in total for connection
-            // as this is a tcp connection it will take 1second per connection to fail
-            // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
-
-            // Ensure we actually had the delay
-            assertTrue("Failover took less than 6 seconds", duration > 6000);
-
-            // Ensure we don't have delays before initial connection and reconnection.
-            // We allow 1 second for initial connection and failover logic on top of 6s of sleep.
-            assertTrue("Failover took more than 11 seconds:(" + duration + ")", duration < 11000);
-        }
-        catch (QpidException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
-    public void testFailoverSingleDelay() throws Exception
-    {
-        if (SystemUtils.isWindows())
-        {
-            //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
-            return;
-        }
-
-        String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='2000',retries='3''";
-
-        AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
-        try
-        {
-            long start = System.currentTimeMillis();
-            AMQConnection connection = new AMQConnection(url);
-
-            connection.setExceptionListener(this);
-
-            LOGGER.debug("Stopping broker");
-            stopDefaultBroker();
-            LOGGER.debug("Stopped broker");
-
-            _failoverComplete.await(30, TimeUnit.SECONDS);
-            assertEquals("failoverLatch was not decremented in given timeframe",
-                    0, _failoverComplete.getCount());
-
-            long end = System.currentTimeMillis();
-
-            long duration = (end - start);
-
-            //Failover should take more that 6 seconds.
-            // 3 Retries
-            // so NoDelay 0 (Connect) NoDelay 0 Delay 1 Delay 2 Delay  3
-            // so 3 delays of 2s in total for connection
-            // so max time is 6 seconds of delay + 1 second of runtime. == 7 seconds
-
-            // Ensure we actually had the delay
-            assertTrue("Failover took less than 6 seconds", duration > 6000);
-
-            // Ensure we don't have delays before initial connection and reconnection.
-            // We allow 3 second for initial connection and failover logic on top of 6s of sleep.
-            assertTrue("Failover took more than 9 seconds:(" + duration + ")", duration < 9000);
-        }
-        catch (QpidException e)
-        {
-            fail(e.getMessage());
-        }
-    }
-
-
-    /**
-     * Test that setting 'nofailover' as the failover policy does not result in
-     * delays or connection attempts when the initial connection is lost.
-     *
-     * Test validates that there is a connection delay as required on initial
-     * connection.
-     */
-    public void testNoFailover() throws Exception
-    {
-        if (SystemUtils.isWindows())
-        {
-            //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
-            return;
-        }
-
-        int CONNECT_DELAY = 2000;
-        String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='" + CONNECT_DELAY + "'," +
-                                  "retries='3'',failover='nofailover'";
-
-        
-        AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
-        Thread brokerStart = null;
-        try
-        {
-            //Kill initial broker
-            stopDefaultBroker();
-
-            //Create a thread to start the broker asynchronously
-            brokerStart = new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        //Wait before starting broker
-                        // The wait should allow at least 1 retries to fail before broker is ready
-                        Thread.sleep(750);
-                        startDefaultBroker();
-                    }
-                    catch (Exception e)
-                    {
-                        LOGGER.error("Exception whilst starting broker", e);
-                    }
-                }
-            });
-
-            brokerStart.start();
-            long start = System.currentTimeMillis();
-            //Start the connection so it will use the retries
-            AMQConnection connection = new AMQConnection(url);
-
-            long end = System.currentTimeMillis();
-            long duration = (end - start);
-
-            // Check that we actually had a delay in connection
-            assertTrue("Initial connection should be longer than 1 delay : " + CONNECT_DELAY + " <:(" + duration + ")", duration > CONNECT_DELAY);
-
-
-            connection.setExceptionListener(this);
-
-            //Ensure we collect the brokerStart thread
-            brokerStart.join();
-            brokerStart = null;
-
-            start = System.currentTimeMillis();
-
-            //Kill connection
-            stopDefaultBroker();
-
-            _failoverComplete.await(30, TimeUnit.SECONDS);
-            assertEquals("failoverLatch was not decremented in given timeframe", 0, _failoverComplete.getCount());
-
-            end = System.currentTimeMillis();
-
-            duration = (end - start);
-
-            // Notification of the connection failure should be very quick as we are denying the ability to failover.
-            // It may not be as quick for Java profile tests so lets just make sure it is less than the connectiondelay
-            // Occasionally it takes 1s so we have to set CONNECT_DELAY to be higher to take that in to account. 
-            assertTrue("Notification of the connection failure took was : " + CONNECT_DELAY + " >:(" + duration + ")", duration < CONNECT_DELAY);
-        }
-        catch (QpidException e)
-        {
-            fail(e.getMessage());
-        }
-        finally
-        {
-            // Guard against the case where the broker took too long to start
-            // and the initial connection failed to be formed.
-            if (brokerStart != null)
-            {
-                brokerStart.join();
-            }
-        }
-    }
-
-    @Override
-    public void onException(JMSException e)
-    {
-        if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
-        {
-            LOGGER.debug("Received AMQDisconnectedException");
-            _failoverComplete.countDown();
-        }
-        else
-        {
-            LOGGER.error("Unexpected underlying exception", e.getLinkedException());
-        }
-    }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message