activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-3166 - implement rollbackOnlyOnAsyncException such that async exceptions on transactional sends or acks result in the transaction being marked rollback only and commit failing with an exception. Test that shows current state of p
Date Wed, 25 May 2016 10:30:09 GMT
Repository: activemq
Updated Branches:
  refs/heads/master f46b2927a -> fe9d99e7a


AMQ-3166 - implement rollbackOnlyOnAsyncException such that async exceptions on transactional
sends or acks result in the transaction being marked rollback only and commit failing with
an exception. Test that shows current state of play using alwaySendSync or AsyncCallback.
rollbackOnlyOnAsyncException enabled by default.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fe9d99e7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fe9d99e7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fe9d99e7

Branch: refs/heads/master
Commit: fe9d99e7a071c4c09c78dfd95630036b86a8a05b
Parents: f46b292
Author: gtully <gary.tully@gmail.com>
Authored: Wed May 25 11:24:43 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed May 25 11:24:43 2016 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   9 +
 .../activemq/broker/TransportConnection.java    |  61 ++--
 .../activemq/transaction/Transaction.java       |  42 ++-
 .../activemq/transaction/XATransaction.java     |   6 -
 .../org/apache/activemq/bugs/AMQ3166Test.java   | 294 +++++++++++++++++++
 .../activemq/network/SimpleNetworkTest.java     |   2 +-
 .../failover/FailoverTransactionTest.java       |  21 +-
 7 files changed, 383 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 77f0993..19910a5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -264,6 +264,7 @@ public class BrokerService implements Service {
     private boolean restartAllowed = true;
     private boolean restartRequested = false;
     private boolean rejectDurableConsumers = false;
+    private boolean rollbackOnlyOnAsyncException = true;
 
     private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
 
@@ -3205,4 +3206,12 @@ public class BrokerService implements Service {
     public void setAdjustUsageLimits(boolean adjustUsageLimits) {
         this.adjustUsageLimits = adjustUsageLimits;
     }
+
+    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
+        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
+    }
+
+    public boolean isRollbackOnlyOnAsyncException() {
+        return rollbackOnlyOnAsyncException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 2727503..40b7d26 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -108,6 +108,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
     private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName()
+ ".Service");
     // Keeps track of the broker and connector that created this connection.
     protected final Broker broker;
+    protected final BrokerService brokerService;
     protected final TransportConnector connector;
     // Keeps track of the state of the connections.
     // protected final ConcurrentHashMap localConnectionStates=new
@@ -162,6 +163,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                                TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory)
{
         this.connector = connector;
         this.broker = broker;
+        this.brokerService = broker.getBrokerService();
+
         RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
         brokerConnectionStates = rb.getConnectionStates();
         if (connector != null) {
@@ -171,7 +174,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
         this.taskRunnerFactory = taskRunnerFactory;
         this.stopTaskRunnerFactory = stopTaskRunnerFactory;
         this.transport = transport;
-        final BrokerService brokerService = this.broker.getBrokerService();
         if( this.transport instanceof BrokerServiceAware ) {
             ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
         }
@@ -223,20 +225,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
     }
 
     public void serviceTransportException(IOException e) {
-        BrokerService bService = connector.getBrokerService();
-        if (bService.isShutdownOnSlaveFailure()) {
-            if (brokerInfo != null) {
-                if (brokerInfo.isSlaveBroker()) {
-                    LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(),
e);
-                    try {
-                        doStop();
-                        bService.stop();
-                    } catch (Exception ex) {
-                        LOG.warn("Failed to stop the master", ex);
-                    }
-                }
-            }
-        }
         if (!stopping.get() && !pendingStop) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
@@ -357,6 +345,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 }
                 response = new ExceptionResponse(e);
             } else {
+                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
                 serviceException(e);
             }
         }
@@ -379,6 +368,42 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
         return response;
     }
 
+    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command)
{
+        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException)
&& isInTransaction(command)) {
+            Transaction transaction = getActiveTransaction(command);
+            if (transaction != null && !transaction.isRollbackOnly()) {
+                LOG.debug("on async exception, force rollback of transaction for: " + command,
e);
+                transaction.setRollbackOnly(e);
+            }
+        }
+    }
+
+    private Transaction getActiveTransaction(Command command) {
+        Transaction transaction = null;
+        try {
+            if (command instanceof Message) {
+                Message messageSend = (Message) command;
+                ProducerId producerId = messageSend.getProducerId();
+                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
+            } else if (command instanceof  MessageAck) {
+                MessageAck messageAck = (MessageAck) command;
+                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
+                if (consumerExchange != null) {
+                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
+                }
+            }
+        } catch(Exception ignored){
+            LOG.trace("failed to find active transaction for command: " + command, ignored);
+        }
+        return transaction;
+    }
+
+    private boolean isInTransaction(Command command) {
+        return command instanceof Message && ((Message)command).isInTransaction()
+                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
+    }
+
     @Override
     public Response processKeepAlive(KeepAliveInfo info) throws Exception {
         return null;
@@ -1390,10 +1415,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 if (duplexName.contains("#")) {
                     duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
                 }
-                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(),
config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
+                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config,
brokerService.createDuplexNetworkConnectorObjectName(duplexName));
                 listener.setCreatedByDuplex(true);
                 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
remoteBridgeTransport, listener);
-                duplexBridge.setBrokerService(broker.getBrokerService());
+                duplexBridge.setBrokerService(brokerService);
                 // now turn duplex off this side
                 info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);
@@ -1483,7 +1508,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 context = state.getContext();
                 result.setConnectionContext(context);
                 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers()))
{
-                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
+                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
                 }
                 SessionState ss = state.getSessionState(id.getParentId());
                 if (ss != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
index 6843871..77710f3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
@@ -24,7 +24,10 @@ import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
+import javax.jms.TransactionRolledBackException;
 import javax.transaction.xa.XAException;
+
+import org.apache.activemq.TransactionContext;
 import org.apache.activemq.command.TransactionId;
 import org.slf4j.Logger;
 
@@ -41,6 +44,7 @@ public abstract class Transaction {
     public static final byte PREPARED_STATE = 2; // can go to: 3
     public static final byte FINISHED_STATE = 3;
     boolean committed = false;
+    boolean rollbackOnly = false;
 
     private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
     private byte state = START_STATE;
@@ -103,16 +107,16 @@ public abstract class Transaction {
         case IN_USE_STATE:
             break;
         default:
-            XAException xae = new XAException("Prepare cannot be called now.");
-            xae.errorCode = XAException.XAER_PROTO;
+            XAException xae = newXAException("Prepare cannot be called now", XAException.XAER_PROTO);
             throw xae;
         }
 
-        // // Run the prePrepareTasks
-        // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
-        // Callback r = (Callback) iter.next();
-        // r.execute();
-        // }
+        if (rollbackOnly) {
+            XAException xae = newXAException("COMMIT FAILED: Transaction marked rollback
only", XAException.XA_RBROLLBACK);
+            TransactionRolledBackException transactionRolledBackException = new TransactionRolledBackException(xae.getLocalizedMessage());
+            xae.initCause(transactionRolledBackException);
+            throw xae;
+        }
     }
     
     protected void fireBeforeCommit() throws Exception {
@@ -184,8 +188,7 @@ public abstract class Transaction {
             // I guess this could happen. Post commit task failed
             // to execute properly.
             getLog().warn("PRE COMMIT FAILED: ", e);
-            XAException xae = new XAException("PRE COMMIT FAILED");
-            xae.errorCode = XAException.XAER_RMERR;
+            XAException xae = newXAException("PRE COMMIT FAILED", XAException.XAER_RMERR);
             xae.initCause(e);
             throw xae;
         }
@@ -199,10 +202,27 @@ public abstract class Transaction {
             // I guess this could happen. Post commit task failed
             // to execute properly.
             getLog().warn("POST COMMIT FAILED: ", e);
-            XAException xae = new XAException("POST COMMIT FAILED");
-            xae.errorCode = XAException.XAER_RMERR;
+            XAException xae = newXAException("POST COMMIT FAILED",  XAException.XAER_RMERR);
             xae.initCause(e);
             throw xae;
         }
     }
+
+    public static XAException newXAException(String s, int errorCode) {
+        XAException xaException = new XAException(s + " " + TransactionContext.xaErrorCodeMarker
+ errorCode);
+        xaException.errorCode = errorCode;
+        return xaException;
+    }
+
+    public void setRollbackOnly(Throwable cause) {
+        if (!rollbackOnly) {
+            getLog().trace("setting rollback only, cause:", cause);
+            rollbackOnly = true;
+        }
+    }
+
+    public boolean isRollbackOnly() {
+        return rollbackOnly;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
index 8e31ec7..9e456f9 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
@@ -170,12 +170,6 @@ public class XATransaction extends Transaction {
         }
     }
 
-    public static XAException newXAException(String s, int errorCode) {
-        XAException xaException = new XAException(s + " " + TransactionContext.xaErrorCodeMarker
+ errorCode);
-        xaException.errorCode = errorCode;
-        return xaException;
-    }
-
     @Override
     public int prepare() throws XAException, IOException {
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
new file mode 100644
index 0000000..74e2d48
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3166Test.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.AsyncCallback;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import javax.transaction.xa.XAException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AMQ3166Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ3166Test.class);
+
+    private BrokerService brokerService;
+    private AtomicInteger sendAttempts = new AtomicInteger(0);
+
+
+    @Test
+    public void testCommitThroughAsyncErrorNoForceRollback() throws Exception {
+        startBroker(false);
+        Connection connection = createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(session.createQueue("QAT"));
+
+        for (int i=0; i<10; i++) {
+            producer.send(session.createTextMessage("Hello A"));
+        }
+
+        session.commit();
+
+        assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalEnqueueCount() == 1;
+            }
+        }));
+
+        connection.close();
+    }
+
+    @Test
+    public void testCommitThroughAsyncErrorForceRollback() throws Exception {
+        startBroker(true);
+        Connection connection = createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(session.createQueue("QAT"));
+
+        try {
+            for (int i = 0; i < 10; i++) {
+                producer.send(session.createTextMessage("Hello A"));
+            }
+            session.commit();
+            fail("Expect TransactionRolledBackException");
+        } catch (JMSException expected) {
+            assertTrue(expected.getCause() instanceof XAException);
+        }
+
+        assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalEnqueueCount() == 0;
+            }
+        }));
+
+        connection.close();
+    }
+
+    @Test
+    public void testAckCommitThroughAsyncErrorForceRollback() throws Exception {
+        startBroker(true);
+        Connection connection = createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue destination = session.createQueue("QAT");
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage("Hello A"));
+        producer.close();
+        session.commit();
+
+        MessageConsumer messageConsumer = session.createConsumer(destination);
+        assertNotNull("got message", messageConsumer.receive(4000));
+
+        try {
+            session.commit();
+            fail("Expect TransactionRolledBackException");
+        } catch (JMSException expected) {
+            assertTrue(expected.getCause() instanceof XAException);
+        }
+
+        assertTrue("one message still there!", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalMessageCount() == 1;
+            }
+        }));
+
+        connection.close();
+    }
+
+
+    @Test
+    public void testErrorOnSyncSend() throws Exception {
+        startBroker(false);
+        ActiveMQConnection connection = (ActiveMQConnection) createConnection();
+        connection.setAlwaysSyncSend(true);
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(session.createQueue("QAT"));
+
+        try {
+            for (int i = 0; i < 10; i++) {
+                producer.send(session.createTextMessage("Hello A"));
+            }
+            session.commit();
+        } catch (JMSException expectedSendFail) {
+            LOG.info("Got expected: " + expectedSendFail);
+            session.rollback();
+        }
+
+        assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalEnqueueCount() == 0;
+            }
+        }));
+
+        connection.close();
+    }
+
+
+    @Test
+    public void testRollbackOnAsyncErrorAmqApi() throws Exception {
+        startBroker(false);
+        ActiveMQConnection connection = (ActiveMQConnection) createConnection();
+        connection.start();
+        final ActiveMQSession session = (ActiveMQSession) connection.createSession(true,
Session.SESSION_TRANSACTED);
+        int batchSize = 10;
+        final CountDownLatch batchSent = new CountDownLatch(batchSize);
+        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(session.createQueue("QAT"));
+
+        for (int i=0; i<batchSize; i++) {
+            producer.send(session.createTextMessage("Hello A"), new AsyncCallback() {
+                @Override
+                public void onSuccess() {
+                    batchSent.countDown();
+                }
+
+                @Override
+                public void onException(JMSException e) {
+                    session.getTransactionContext().setRollbackOnly(true);
+                    batchSent.countDown();
+                }
+            });
+
+            if (i==0) {
+                // transaction context begun on first send
+                session.getTransactionContext().addSynchronization(new Synchronization()
{
+                    @Override
+                    public void beforeEnd() throws Exception {
+                        // await response to all sends in the batch
+                        if (!batchSent.await(10, TimeUnit.SECONDS)) {
+                            LOG.error("TimedOut waiting for aync send requests!");
+                            session.getTransactionContext().setRollbackOnly(true);
+                        };
+                        super.beforeEnd();
+                    }
+                });
+            }
+        }
+
+        try {
+            session.commit();
+            fail("expect rollback on async error");
+        } catch (TransactionRolledBackException expected) {
+        }
+
+        assertTrue("only one message made it through", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalEnqueueCount() == 0;
+            }
+        }));
+
+        connection.close();
+    }
+
+
+    private Connection createConnection() throws Exception {
+        String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
+        cf.setWatchTopicAdvisories(false);
+        return cf.createConnection();
+    }
+
+    public void startBroker(boolean forceRollbackOnAsyncSendException) throws Exception {
+        brokerService = createBroker(forceRollbackOnAsyncSendException);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    protected BrokerService createBroker(boolean forceRollbackOnAsyncSendException) throws
Exception {
+        BrokerService answer = new BrokerService();
+
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.setAdvisorySupport(false);
+        answer.setRollbackOnlyOnAsyncException(forceRollbackOnAsyncSendException);
+
+        answer.addConnector("tcp://0.0.0.0:0");
+
+        answer.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck
ack) throws Exception {
+                        if (ack.isStandardAck()) {
+                            throw new RuntimeException("no way, won't allow any standard
ack");
+                        }
+                        super.acknowledge(consumerExchange, ack);
+                    }
+
+                    @Override
+                    public void send(ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception {
+                        if (sendAttempts.incrementAndGet() > 1) {
+                            throw new RuntimeException("no way, won't accept any messages");
+                        }
+                        super.send(producerExchange, messageSend);
+                    }
+                }
+        });
+
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index a49301f..8a58eed 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -90,7 +90,7 @@ public class SimpleNetworkTest {
             Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
             Message msg = consumer1.receive(3000);
-            assertNotNull(msg);
+            assertNotNull("not null? message: " + i, msg);
             ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
             assertTrue(amqMessage.isCompressed());
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fe9d99e7/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index bb02ffe..717425f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -263,9 +263,6 @@ public class FailoverTransactionTest extends TestSupport {
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session2.createConsumer(destination);
         msg = consumer.receive(1000);
-        if (msg == null) {
-            msg = consumer.receive(5000);
-        }
         LOG.info("Received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
@@ -363,9 +360,6 @@ public class FailoverTransactionTest extends TestSupport {
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session2.createConsumer(destination);
         msg = consumer.receive(1000);
-        if (msg == null) {
-            msg = consumer.receive(5000);
-        }
         LOG.info("Received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
@@ -478,9 +472,6 @@ public class FailoverTransactionTest extends TestSupport {
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session2.createConsumer(destination);
         msg = consumer.receive(1000);
-        if (msg == null) {
-            msg = consumer.receive(5000);
-        }
         LOG.info("Received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
@@ -602,9 +593,6 @@ public class FailoverTransactionTest extends TestSupport {
         Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         consumer = session2.createConsumer(destination);
         msg = consumer.receive(1000);
-        if (msg == null) {
-            msg = consumer.receive(5000);
-        }
         LOG.info("Received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();
@@ -626,7 +614,11 @@ public class FailoverTransactionTest extends TestSupport {
         broker.stop();
         startBroker(false, url);
 
-        session.commit();
+        try {
+            session.commit();
+            fail("expect ex for rollback only on async exc");
+        } catch (JMSException expected) {
+        }
 
         // without tracking producers, message will not be replayed on recovery
         assertNull("we got the message", consumer.receive(5000));
@@ -886,9 +878,6 @@ public class FailoverTransactionTest extends TestSupport {
         Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer sweeper = sweeperSession.createConsumer(destination);
         msg = sweeper.receive(1000);
-        if (msg == null) {
-            msg = sweeper.receive(5000);
-        }
         LOG.info("Sweep received: " + msg);
         assertNull("no messges left dangling but got: " + msg, msg);
         connection.close();


Mime
View raw message