activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-302 more changes around XA reliability (resilience on failures)
Date Wed, 16 Dec 2015 15:23:29 GMT
ARTEMIS-302 more changes around XA reliability (resilience on failures)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/af1f79bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/af1f79bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/af1f79bf

Branch: refs/heads/master
Commit: af1f79bff503ee02ac119efceac65928f671fd1e
Parents: a2c8e6b
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Dec 14 20:11:53 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Dec 16 10:19:35 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ClientSessionImpl.java     |  14 +-
 .../activemq/artemis/ra/ActiveMQRALogger.java   |  10 +-
 .../artemis/ra/ActiveMQRAManagedConnection.java |   2 +-
 .../artemis/ra/ActiveMQRAXAResource.java        |  21 +-
 .../artemis/ra/ActiveMQResourceAdapter.java     | 188 +++---
 .../artemis/ra/inflow/ActiveMQActivation.java   |  76 +--
 .../ra/inflow/ActiveMQMessageHandler.java       |   3 +-
 .../impl/journal/JournalStorageManager.java     |   3 +-
 .../core/server/cluster/impl/BridgeImpl.java    |  23 +
 .../core/server/impl/ServerConsumerImpl.java    |   3 +-
 .../core/server/impl/ServerSessionImpl.java     |   9 +-
 .../core/transaction/impl/TransactionImpl.java  | 148 ++--
 .../transaction/impl/TransactionImplTest.java   | 673 +++++++++++++++++++
 .../byteman/ClusteredBridgeReconnectTest.java   | 228 +++++++
 .../byteman/ConcurrentDeliveryCancelTest.java   |  92 +--
 .../tests/extras/byteman/TimeoutXATest.java     |  12 +-
 ...MDBMultipleHandlersServerDisconnectTest.java | 116 +++-
 .../cluster/bridge/BridgeReconnectTest.java     |   4 +
 .../failover/AsynchronousFailoverTest.java      |   8 +-
 .../integration/ra/ResourceAdapterTest.java     |  48 +-
 .../integration/xa/BasicXaRecoveryTest.java     |   3 +
 .../tests/unit/ra/ResourceAdapterTest.java      |  24 +-
 22 files changed, 1371 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index dc89680..221413c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -539,8 +539,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       rollback(false);
    }
 
-   @Override
-   public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException {
+   public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException
+   {
+      rollback(isLastMessageAsDelivered, true);
+   }
+
+   public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException
+   {
       if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
          ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
       }
@@ -559,7 +564,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
       // We need to make sure we don't get any inflight messages
       for (ClientConsumerInternal consumer : cloneConsumers()) {
-         consumer.clear(true);
+         consumer.clear(waitConsumers);
       }
 
       // Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -1173,7 +1178,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       try {
          if (rollbackOnly) {
             try {
-               rollback();
+               rollback(false, false);
             }
             catch (Throwable ignored) {
                ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored);
@@ -1252,6 +1257,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
          return sessionContext.configureTransactionTimeout(seconds);
       }
       catch (Throwable t) {
+         markRollbackOnly(); // The TM will ignore any errors from here, if things are this screwed up we mark rollbackonly
          // This could occur if the TM interrupts the thread
          XAException xaException = new XAException(XAException.XAER_RMFAIL);
          xaException.initCause(t);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
index 6ec3a5e..7853f09 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRALogger.java
@@ -74,20 +74,20 @@ public interface ActiveMQRALogger extends BasicLogger {
    void awaitingJMSServerCreation();
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections.", format = Message.Format.MESSAGE_FORMAT)
-   void rebalancingConnections();
+   @Message(id = 151006, value = "Cluster topology change detected. Re-balancing connections on even {0}.", format = Message.Format.MESSAGE_FORMAT)
+   void rebalancingConnections(String event);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 152001, value = "problem resetting xa session after failure", format = Message.Format.MESSAGE_FORMAT)
-   void problemResettingXASession();
+   void problemResettingXASession(@Cause Throwable t);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 152002, value = "Unable to roll local transaction back", format = Message.Format.MESSAGE_FORMAT)
    void unableToRollbackTX();
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 152003, value = "unable to reset session after failure", format = Message.Format.MESSAGE_FORMAT)
-   void unableToResetSession();
+   @Message(id = 152003, value = "unable to reset session after failure, we will place the MDB Inflow now in setup mode for activation={0}" , format = Message.Format.MESSAGE_FORMAT)
+   void unableToResetSession(String spec, @Cause Exception e);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 152004, value = "Handling JMS exception failure", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
index 3cd1515..97c6032 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnection.java
@@ -811,7 +811,7 @@ public final class ActiveMQRAManagedConnection implements ManagedConnection, Exc
 
    private void createCF() {
       if (connectionFactory == null) {
-         connectionFactory = ra.createActiveMQConnectionFactory(mcf.getProperties());
+         connectionFactory = ra.getConnectionFactory(mcf.getProperties());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java
index 093ee0f..9d485f3 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAXAResource.java
@@ -21,8 +21,8 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.client.impl.ActiveMQXAResource;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 
 /**
  * ActiveMQXAResource.
@@ -76,13 +76,18 @@ public class ActiveMQRAXAResource implements ActiveMQXAResource {
 
       ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource;
       try {
-         //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this
-         sessionInternal.resetIfNeeded();
-      }
-      catch (ActiveMQException e) {
-         ActiveMQRALogger.LOGGER.problemResettingXASession();
-      }
-      try {
+         try {
+            //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this
+            sessionInternal.resetIfNeeded();
+         }
+         catch (ActiveMQException e) {
+            ActiveMQRALogger.LOGGER.problemResettingXASession(e);
+
+            XAException xaException = new XAException(XAException.XAER_RMFAIL);
+            xaException.initCause(e);
+            throw xaException;
+         }
+
          xaResource.start(xid, flags);
       }
       finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
index f9781c1..8c1a9b4 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQResourceAdapter.java
@@ -29,11 +29,12 @@ import javax.transaction.xa.XAResource;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
@@ -141,7 +142,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
 
       raProperties = new ActiveMQRAProperties();
       configured = new AtomicBoolean(false);
-      activations = new ConcurrentHashMap<>();
+      activations = Collections.synchronizedMap(new IdentityHashMap<ActivationSpec, ActiveMQActivation>());
       recoveryManager = new RecoveryManager();
    }
 
@@ -1570,7 +1571,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
     */
    protected void setup() throws ActiveMQException {
       raProperties.init();
-      defaultActiveMQConnectionFactory = createActiveMQConnectionFactory(raProperties);
+      defaultActiveMQConnectionFactory = newConnectionFactory(raProperties);
       recoveryActiveMQConnectionFactory = createRecoveryActiveMQConnectionFactory(raProperties);
 
       Map<String, String> recoveryConfProps = new HashMap<>();
@@ -1623,126 +1624,133 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable {
       raProperties.setJgroupsChannelRefName(jgroupsChannelRefName);
    }
 
-   public synchronized ActiveMQConnectionFactory createActiveMQConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
+   public synchronized ActiveMQConnectionFactory getConnectionFactory(final ConnectionFactoryProperties overrideProperties) {
       ActiveMQConnectionFactory cf;
       boolean known = false;
 
       if (!knownConnectionFactories.keySet().contains(overrideProperties)) {
-         List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
+         cf = newConnectionFactory(overrideProperties);
+         knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
+      }
+      else {
+         Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
+         cf = pair.getA();
+         pair.getB().incrementAndGet();
+         known = true;
+      }
 
-         String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
+      if (known && cf.getServerLocator().isClosed()) {
+         knownConnectionFactories.remove(overrideProperties);
+         cf = newConnectionFactory(overrideProperties);
+         knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
+      }
 
-         Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
+      return cf;
+   }
 
-         String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
+   public ActiveMQConnectionFactory newConnectionFactory(ConnectionFactoryProperties overrideProperties) {
+      ActiveMQConnectionFactory cf;
+      List<String> connectorClassName = overrideProperties.getParsedConnectorClassNames() != null ? overrideProperties.getParsedConnectorClassNames() : raProperties.getParsedConnectorClassNames();
 
-         String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
+      String discoveryAddress = overrideProperties.getDiscoveryAddress() != null ? overrideProperties.getDiscoveryAddress() : getDiscoveryAddress();
 
-         String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
+      Boolean ha = overrideProperties.isHA() != null ? overrideProperties.isHA() : getHA();
 
-         if (ha == null) {
-            ha = ActiveMQClient.DEFAULT_IS_HA;
-         }
+      String jgroupsFileName = overrideProperties.getJgroupsFile() != null ? overrideProperties.getJgroupsFile() : getJgroupsFile();
 
-         if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
-            BroadcastEndpointFactory endpointFactory = null;
+      String jgroupsChannel = overrideProperties.getJgroupsChannelName() != null ? overrideProperties.getJgroupsChannelName() : getJgroupsChannelName();
 
-            if (jgroupsLocatorClassName != null) {
-               String jchannelRefName = raProperties.getJgroupsChannelRefName();
-               JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
-               endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
-            }
-            else if (discoveryAddress != null) {
-               Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
-               if (discoveryPort == null) {
-                  discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
-               }
-
-               String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
-               endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
-            }
-            else if (jgroupsFileName != null) {
-               endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
-            }
-            Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
-            if (refreshTimeout == null) {
-               refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
-            }
+      String jgroupsLocatorClassName = raProperties.getJgroupsChannelLocatorClass();
+
+      if (ha == null) {
+         ha = ActiveMQClient.DEFAULT_IS_HA;
+      }
 
-            Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
+      if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null) {
+         BroadcastEndpointFactory endpointFactory = null;
 
-            if (initialTimeout == null) {
-               initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+         if (jgroupsLocatorClassName != null) {
+            String jchannelRefName = raProperties.getJgroupsChannelRefName();
+            JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
+            endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
+         }
+         else if (discoveryAddress != null) {
+            Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort() : getDiscoveryPort();
+            if (discoveryPort == null) {
+               discoveryPort = ActiveMQClient.DEFAULT_DISCOVERY_PORT;
             }
 
-            DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
+            String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress();
+            endpointFactory = new UDPBroadcastEndpointFactory().setGroupAddress(discoveryAddress).setGroupPort(discoveryPort).setLocalBindAddress(localBindAddress).setLocalBindPort(-1);
+         }
+         else if (jgroupsFileName != null) {
+            endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(jgroupsChannel).setFile(jgroupsFileName);
+         }
+         Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout() : raProperties.getDiscoveryRefreshTimeout();
+         if (refreshTimeout == null) {
+            refreshTimeout = ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
+         }
 
-            if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
-               ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
-            }
+         Long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout() : raProperties.getDiscoveryInitialWaitTimeout();
 
-            if (ha) {
-               cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
-            }
-            else {
-               cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
-            }
+         if (initialTimeout == null) {
+            initialTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
          }
-         else if (connectorClassName != null) {
-            TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
 
-            List<Map<String, Object>> connectionParams;
-            if (overrideProperties.getParsedConnectorClassNames() != null) {
-               connectionParams = overrideProperties.getParsedConnectionParameters();
-            }
-            else {
-               connectionParams = raProperties.getParsedConnectionParameters();
-            }
+         DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration().setRefreshTimeout(refreshTimeout).setDiscoveryInitialWaitTimeout(initialTimeout).setBroadcastEndpointFactory(endpointFactory);
 
-            for (int i = 0; i < connectorClassName.size(); i++) {
-               TransportConfiguration tc;
-               if (connectionParams == null || i >= connectionParams.size()) {
-                  tc = new TransportConfiguration(connectorClassName.get(i));
-                  ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
-               }
-               else {
-                  tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
-               }
-
-               transportConfigurations[i] = tc;
-            }
+         if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+            ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for discovery=" + groupConfiguration + " with ha=" + ha);
+         }
 
-            if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
-               ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
-                                                Arrays.toString(transportConfigurations) + " with ha=" + ha);
-            }
+         if (ha) {
+            cf = ActiveMQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
+         }
+         else {
+            cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+         }
+      }
+      else if (connectorClassName != null) {
+         TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectorClassName.size()];
+
+         List<Map<String, Object>> connectionParams;
+         if (overrideProperties.getParsedConnectorClassNames() != null) {
+            connectionParams = overrideProperties.getParsedConnectionParameters();
+         }
+         else {
+            connectionParams = raProperties.getParsedConnectionParameters();
+         }
 
-            if (ha) {
-               cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
+         for (int i = 0; i < connectorClassName.size(); i++) {
+            TransportConfiguration tc;
+            if (connectionParams == null || i >= connectionParams.size()) {
+               tc = new TransportConfiguration(connectorClassName.get(i));
+               ActiveMQRALogger.LOGGER.debug("No connector params provided using default");
             }
             else {
-               cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+               tc = new TransportConfiguration(connectorClassName.get(i), connectionParams.get(i));
             }
+
+            transportConfigurations[i] = tc;
          }
-         else {
-            throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
+
+         if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
+            ActiveMQRALogger.LOGGER.debug("Creating Connection Factory on the resource adapter for transport=" +
+                                             Arrays.toString(transportConfigurations) + " with ha=" + ha);
          }
 
-         setParams(cf, overrideProperties);
-         knownConnectionFactories.put(overrideProperties, new Pair(cf, new AtomicInteger(1)));
+         if (ha) {
+            cf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.XA_CF, transportConfigurations);
+         }
+         else {
+            cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, transportConfigurations);
+         }
       }
       else {
-         Pair<ActiveMQConnectionFactory, AtomicInteger> pair = knownConnectionFactories.get(overrideProperties);
-         cf = pair.getA();
-         pair.getB().incrementAndGet();
-         known = true;
-      }
-
-      if (known && cf.getServerLocator().isClosed()) {
-         knownConnectionFactories.remove(overrideProperties);
-         cf = createActiveMQConnectionFactory(overrideProperties);
+         throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ResourceAdapter Connection Factory");
       }
 
+      setParams(cf, overrideProperties);
       return cf;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 79b9cb9..d7a242d 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -48,7 +48,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.ra.ActiveMQRABundle;
@@ -420,12 +419,19 @@ public class ActiveMQActivation {
          // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
       }
 
-      if (threadTearDown.isAlive()) {
-         if (factory != null) {
-            // This will interrupt any threads waiting on reconnect
+      if (factory != null) {
+         try {
+            // closing the factory will help making sure pending threads are closed
             factory.close();
-            factory = null;
          }
+         catch (Throwable e) {
+            ActiveMQRALogger.LOGGER.warn(e);
+         }
+
+         factory = null;
+      }
+
+      if (threadTearDown.isAlive()) {
          threadTearDown.interrupt();
 
          try {
@@ -440,11 +446,6 @@ public class ActiveMQActivation {
          }
       }
 
-      if (spec.isHasBeenUpdated() && factory != null) {
-         ra.closeConnectionFactory(spec);
-         factory = null;
-      }
-
       nodes.clear();
       lastReceived = false;
 
@@ -465,23 +466,11 @@ public class ActiveMQActivation {
             factory = (ActiveMQConnectionFactory) fac;
          }
          else {
-            ActiveMQRAConnectionFactory raFact = (ActiveMQRAConnectionFactory) fac;
-            if (spec.isHasBeenUpdated()) {
-               factory = raFact.getResourceAdapter().createActiveMQConnectionFactory(spec);
-            }
-            else {
-               factory = raFact.getDefaultFactory();
-               if (factory != ra.getDefaultActiveMQConnectionFactory()) {
-                  ActiveMQRALogger.LOGGER.warnDifferentConnectionfactory();
-               }
-            }
+            factory = ra.newConnectionFactory(spec);
          }
       }
-      else if (spec.isHasBeenUpdated()) {
-         factory = ra.createActiveMQConnectionFactory(spec);
-      }
       else {
-         factory = ra.getDefaultActiveMQConnectionFactory();
+         factory = ra.newConnectionFactory(spec);
       }
    }
 
@@ -627,9 +616,18 @@ public class ActiveMQActivation {
       return buffer.toString();
    }
 
-   public void rebalance() {
-      ActiveMQRALogger.LOGGER.rebalancingConnections();
-      reconnect(null);
+   public void startReconnectThread(final String threadName) {
+      if (trace) {
+         ActiveMQRALogger.LOGGER.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this);
+      }
+      Runnable runnable = new Runnable() {
+         @Override
+         public void run() {
+            reconnect(null);
+         }
+      };
+      Thread t = new Thread(runnable, threadName);
+      t.start();
    }
 
    /**
@@ -638,6 +636,9 @@ public class ActiveMQActivation {
     * @param failure if reconnecting in the event of a failure
     */
    public void reconnect(Throwable failure) {
+      if (trace) {
+         ActiveMQRALogger.LOGGER.trace("reconnecting activation " + this);
+      }
       if (failure != null) {
          if (failure instanceof ActiveMQException && ((ActiveMQException) failure).getType() == ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST) {
             ActiveMQRALogger.LOGGER.awaitingTopicQueueCreation(getActivationSpec().getDestination());
@@ -728,6 +729,7 @@ public class ActiveMQActivation {
    }
 
    private class RebalancingListener implements ClusterTopologyListener {
+
       @Override
       public void nodeUP(TopologyMember member, boolean last) {
          boolean newNode = false;
@@ -741,14 +743,8 @@ public class ActiveMQActivation {
          }
 
          if (lastReceived && newNode) {
-            Runnable runnable = new Runnable() {
-               @Override
-               public void run() {
-                  rebalance();
-               }
-            };
-            Thread t = new Thread(runnable, "NodeUP Connection Rebalancer");
-            t.start();
+            ActiveMQRALogger.LOGGER.rebalancingConnections("nodeUp " + member.toString());
+            startReconnectThread("NodeUP Connection Rebalancer");
          }
          else if (last) {
             lastReceived = true;
@@ -759,14 +755,8 @@ public class ActiveMQActivation {
       public void nodeDown(long eventUID, String nodeID) {
          if (nodes.remove(nodeID)) {
             removedNodes.put(nodeID, eventUID);
-            Runnable runnable = new Runnable() {
-               @Override
-               public void run() {
-                  rebalance();
-               }
-            };
-            Thread t = new Thread(runnable, "NodeDOWN Connection Rebalancer");
-            t.start();
+            ActiveMQRALogger.LOGGER.rebalancingConnections("nodeDown " + nodeID);
+            startReconnectThread("NodeDOWN Connection Rebalancer");
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index a180fc7..b0d64cc 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -386,7 +386,8 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
             session.resetIfNeeded();
          }
          catch (ActiveMQException e) {
-            ActiveMQRALogger.LOGGER.unableToResetSession();
+            ActiveMQRALogger.LOGGER.unableToResetSession(activation.toString(), e);
+            activation.startReconnectThread("Reset MessageHandler after Failure Thread");
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index fbf9655..77cfd0d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -27,6 +27,7 @@ import java.security.MessageDigest;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -2924,7 +2925,7 @@ public class JournalStorageManager implements StorageManager {
 
       @Override
       public String toString() {
-         return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
+         return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]";
       }
 
       private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index b8b30bc..f638fbc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -28,11 +28,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
@@ -218,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       return bytes;
    }
 
+   // for tests
+   public ClientSessionFactory getSessionFactory() {
+      return csf;
+   }
+
    /* (non-Javadoc)
     * @see org.apache.activemq.artemis.core.server.Consumer#getDeliveringMessages()
     */
@@ -905,8 +912,24 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                scheduleRetryConnect();
             }
          }
+         catch (ActiveMQInterruptedException e) {
+            ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
+         }
+         catch (InterruptedException e) {
+            ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
+         }
          catch (Exception e) {
             ActiveMQServerLogger.LOGGER.errorConnectingBridge(e, this);
+            if (csf != null) {
+               try {
+                  csf.close();
+                  csf = null;
+               }
+               catch (Throwable ignored) {
+               }
+            }
+            fail(false);
+            scheduleRetryConnect();
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e04c35c..4a8b16a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -285,7 +285,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
          // queue for delivery later.
-         if (!started || transferring || !callback.isWritable(this)) {
+         // TCP-flow control has to be done first than everything else otherwise we may lose notifications
+         if (!callback.isWritable(this) || !started || transferring ) {
             return HandleStatus.BUSY;
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d036fdf..e21102c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1047,7 +1047,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString());
 
          try {
-            if (!tx.isEffective()) {
+            if (tx.getState() != Transaction.State.PREPARED) {
                // we don't want to rollback anything prepared here
                if (tx.getXid() != null) {
                   resourceManager.removeTransaction(tx.getXid());
@@ -1085,7 +1085,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       }
 
       if (theTX.isEffective()) {
-         ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared");
+         ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it " + theTX.getState());
          tx = null;
       }
       else {
@@ -1568,9 +1568,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (theTx.getState() == State.ROLLEDBACK) {
          Transaction newTX = newTransaction();
          cancelAndRollback(clientFailed, newTX, wasStarted, toCancel);
-         throw new IllegalStateException("Transaction has already been rolled back");
       }
-      cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
+      else {
+         cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
+      }
    }
 
    private void cancelAndRollback(boolean clientFailed,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 3490fee..ee90c4a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -32,6 +33,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 
 public class TransactionImpl implements Transaction {
 
+   private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
+
    private List<TransactionOperation> operations;
 
    private static final int INITIAL_NUM_PROPERTIES = 10;
@@ -105,8 +108,7 @@ public class TransactionImpl implements Transaction {
 
    @Override
    public boolean isEffective() {
-      return state == State.PREPARED || state == State.COMMITTED;
-
+      return state == State.PREPARED || state == State.COMMITTED || state == State.ROLLEDBACK;
    }
 
    @Override
@@ -141,32 +143,43 @@ public class TransactionImpl implements Transaction {
 
    @Override
    public boolean hasTimedOut(final long currentTime, final int defaultTimeout) {
-      if (timeoutSeconds == -1) {
-         return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
-      }
-      else {
-         return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
+      synchronized (timeoutLock) {
+         boolean timedout;
+         if (timeoutSeconds == -1) {
+            timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
+         }
+         else {
+            timedout = getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
+         }
+
+         if (timedout) {
+            markAsRollbackOnly(new ActiveMQException("TX Timeout"));
+         }
+
+         return timedout;
       }
    }
 
    @Override
    public void prepare() throws Exception {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::" + this);
+      }
       storageManager.readLock();
       try {
          synchronized (timeoutLock) {
             if (isEffective()) {
-               ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call");
+               ActiveMQServerLogger.LOGGER.debug("TransactionImpl::prepare::" + this + " is being ignored");
                return;
             }
             if (state == State.ROLLBACK_ONLY) {
+               if (isTrace) {
+                  ActiveMQServerLogger.LOGGER.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
+               }
+
+               internalRollback();
+
                if (exception != null) {
-                  // this TX will never be rolled back,
-                  // so we reset it now
-                  beforeRollback();
-                  afterRollback();
-                  if (operations != null) {
-                     operations.clear();
-                  }
                   throw exception;
                }
                else {
@@ -216,14 +229,17 @@ public class TransactionImpl implements Transaction {
 
    @Override
    public void commit(final boolean onePhase) throws Exception {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("TransactionImpl::commit::" + this);
+      }
       synchronized (timeoutLock) {
          if (state == State.COMMITTED) {
             // I don't think this could happen, but just in case
-            ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call");
+            ActiveMQServerLogger.LOGGER.debug("TransactionImpl::commit::" + this + " is being ignored");
             return;
          }
          if (state == State.ROLLBACK_ONLY) {
-            rollback();
+            internalRollback();
 
             if (exception != null) {
                throw exception;
@@ -236,12 +252,12 @@ public class TransactionImpl implements Transaction {
 
          if (xid != null) {
             if (onePhase && state != State.ACTIVE || !onePhase && state != State.PREPARED) {
-               throw new IllegalStateException("Transaction is in invalid state " + state);
+               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
             }
          }
          else {
             if (state != State.ACTIVE) {
-               throw new IllegalStateException("Transaction is in invalid state " + state);
+               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
             }
          }
 
@@ -249,6 +265,11 @@ public class TransactionImpl implements Transaction {
 
          doCommit();
 
+         // We want to make sure that nothing else gets done after the commit is issued
+         // this will eliminate any possibility or races
+         final List<TransactionOperation> operationsToComplete = this.operations;
+         this.operations = null;
+
          // We use the Callback even for non persistence
          // If we are using non-persistence with replication, the replication manager will have
          // to execute this runnable in the correct order
@@ -263,7 +284,7 @@ public class TransactionImpl implements Transaction {
 
             @Override
             public void done() {
-               afterCommit();
+               afterCommit(operationsToComplete);
             }
          });
 
@@ -285,44 +306,65 @@ public class TransactionImpl implements Transaction {
 
    @Override
    public void rollback() throws Exception {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("TransactionImpl::rollback::" + this);
+      }
+
       synchronized (timeoutLock) {
          if (state == State.ROLLEDBACK) {
             // I don't think this could happen, but just in case
-            ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace"));
+            ActiveMQServerLogger.LOGGER.debug("TransactionImpl::rollback::" + this + " is being ignored");
             return;
          }
          if (xid != null) {
             if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
-               throw new IllegalStateException("Transaction is in invalid state " + state);
+               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
             }
          }
          else {
             if (state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
-               throw new IllegalStateException("Transaction is in invalid state " + state);
+               throw new ActiveMQIllegalStateException("Transaction is in invalid state " + state);
             }
          }
 
-         beforeRollback();
-
-         doRollback();
-         state = State.ROLLEDBACK;
+         internalRollback();
+      }
+   }
 
-         // We use the Callback even for non persistence
-         // If we are using non-persistence with replication, the replication manager will have
-         // to execute this runnable in the correct order
-         storageManager.afterCompleteOperations(new IOCallback() {
+   private void internalRollback() throws Exception {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("TransactionImpl::internalRollback " + this);
+      }
 
-            @Override
-            public void onError(final int errorCode, final String errorMessage) {
-               ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
-            }
+      beforeRollback();
 
-            @Override
-            public void done() {
-               afterRollback();
-            }
-         });
+      try {
+         doRollback();
+         state = State.ROLLEDBACK;
+      }
+      catch (IllegalStateException e) {
+         // Something happened before and the TX didn't make to the Journal / Storage
+         // We will like to execute afterRollback and clear anything pending
+         ActiveMQServerLogger.LOGGER.warn(e);
       }
+      // We want to make sure that nothing else gets done after the commit is issued
+      // this will eliminate any possibility or races
+      final List<TransactionOperation> operationsToComplete = this.operations;
+      this.operations = null;
+
+      // We use the Callback even for non persistence
+      // If we are using non-persistence with replication, the replication manager will have
+      // to execute this runnable in the correct order
+      storageManager.afterCompleteOperations(new IOCallback() {
+
+         public void onError(final int errorCode, final String errorMessage) {
+            ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
+         }
+
+         public void done() {
+            afterRollback(operationsToComplete);
+         }
+      });
    }
 
    @Override
@@ -361,10 +403,14 @@ public class TransactionImpl implements Transaction {
    }
 
    @Override
-   public void markAsRollbackOnly(final ActiveMQException exception1) {
+   public void markAsRollbackOnly(final ActiveMQException exception) {
       synchronized (timeoutLock) {
+         if (isTrace) {
+            ActiveMQServerLogger.LOGGER.trace("TransactionImpl::" + this + " marking rollbackOnly for " + exception.toString() + ", msg=" + exception.getMessage());
+         }
+
          if (isEffective()) {
-            ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)");
+            ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared, committed or rolledback!)");
             return;
          }
 
@@ -373,7 +419,7 @@ public class TransactionImpl implements Transaction {
          }
          state = State.ROLLBACK_ONLY;
 
-         this.exception = exception1;
+         this.exception = exception;
       }
    }
 
@@ -434,19 +480,23 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-   private synchronized void afterCommit() {
-      if (operations != null) {
-         for (TransactionOperation operation : operations) {
+   private synchronized void afterCommit(List<TransactionOperation> oeprationsToComplete) {
+      if (oeprationsToComplete != null) {
+         for (TransactionOperation operation : oeprationsToComplete) {
             operation.afterCommit(this);
          }
+         // Help out GC here
+         oeprationsToComplete.clear();
       }
    }
 
-   private synchronized void afterRollback() {
-      if (operations != null) {
-         for (TransactionOperation operation : operations) {
+   private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) {
+      if (oeprationsToComplete != null) {
+         for (TransactionOperation operation : oeprationsToComplete) {
             operation.afterRollback(this);
          }
+         // Help out GC here
+         oeprationsToComplete.clear();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
new file mode 100644
index 0000000..3909c3c
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.transaction.impl;
+
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.GroupingInfo;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
+import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransactionImplTest extends ActiveMQTestBase {
+
+   @Test
+   public void testTimeoutAndThenCommitWithARollback() throws Exception {
+      TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
+      Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10));
+
+      final AtomicInteger commit = new AtomicInteger(0);
+      final AtomicInteger rollback = new AtomicInteger(0);
+
+      tx.addOperation(new TransactionOperation() {
+         @Override
+         public void beforePrepare(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterPrepare(Transaction tx) {
+
+         }
+
+         @Override
+         public void beforeCommit(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterCommit(Transaction tx) {
+            System.out.println("commit...");
+            commit.incrementAndGet();
+         }
+
+         @Override
+         public void beforeRollback(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterRollback(Transaction tx) {
+            System.out.println("rollback...");
+            rollback.incrementAndGet();
+         }
+
+         @Override
+         public List<MessageReference> getRelatedMessageReferences() {
+            return null;
+         }
+
+         @Override
+         public List<MessageReference> getListOnConsumer(long consumerID) {
+            return null;
+         }
+      });
+
+      for (int i = 0; i < 2; i++) {
+         try {
+            tx.commit();
+            Assert.fail("Exception expected!");
+         }
+         catch (ActiveMQException expected) {
+         }
+      }
+
+      // it should just be ignored!
+      tx.rollback();
+
+      Assert.assertEquals(0, commit.get());
+      Assert.assertEquals(1, rollback.get());
+
+   }
+
+   @Test
+   public void testTimeoutThenRollbackWithRollback() throws Exception {
+      TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
+      Assert.assertTrue(tx.hasTimedOut(System.currentTimeMillis() + 60000, 10));
+
+      final AtomicInteger commit = new AtomicInteger(0);
+      final AtomicInteger rollback = new AtomicInteger(0);
+
+      tx.addOperation(new TransactionOperation() {
+         @Override
+         public void beforePrepare(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterPrepare(Transaction tx) {
+
+         }
+
+         @Override
+         public void beforeCommit(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterCommit(Transaction tx) {
+            System.out.println("commit...");
+            commit.incrementAndGet();
+         }
+
+         @Override
+         public void beforeRollback(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterRollback(Transaction tx) {
+            System.out.println("rollback...");
+            rollback.incrementAndGet();
+         }
+
+         @Override
+         public List<MessageReference> getRelatedMessageReferences() {
+            return null;
+         }
+
+         @Override
+         public List<MessageReference> getListOnConsumer(long consumerID) {
+            return null;
+         }
+      });
+
+      tx.rollback();
+
+      // This is a case where another failure was detected (In parallel with the TX timeout for instance)
+      tx.markAsRollbackOnly(new ActiveMQException("rollback only again"));
+      tx.rollback();
+
+      Assert.assertEquals(0, commit.get());
+      Assert.assertEquals(1, rollback.get());
+
+   }
+
+   class FakeSM implements StorageManager {
+
+      @Override
+      public OperationContext getContext() {
+         return null;
+      }
+
+      @Override
+      public void lineUpContext() {
+
+      }
+
+      @Override
+      public OperationContext newContext(Executor executor) {
+         return null;
+      }
+
+      @Override
+      public OperationContext newSingleThreadContext() {
+         return null;
+      }
+
+      @Override
+      public void setContext(OperationContext context) {
+
+      }
+
+      @Override
+      public void stop(boolean ioCriticalError) throws Exception {
+
+      }
+
+      @Override
+      public void pageClosed(SimpleString storeName, int pageNumber) {
+
+      }
+
+      @Override
+      public void pageDeleted(SimpleString storeName, int pageNumber) {
+
+      }
+
+      @Override
+      public void pageWrite(PagedMessage message, int pageNumber) {
+
+      }
+
+      @Override
+      public void afterCompleteOperations(IOCallback run) {
+         run.done();
+      }
+
+      @Override
+      public boolean waitOnOperations(long timeout) throws Exception {
+         return false;
+      }
+
+      @Override
+      public void waitOnOperations() throws Exception {
+
+      }
+
+      @Override
+      public void beforePageRead() throws Exception {
+
+      }
+
+      @Override
+      public void afterPageRead() throws Exception {
+
+      }
+
+      @Override
+      public ByteBuffer allocateDirectBuffer(int size) {
+         return null;
+      }
+
+      @Override
+      public void freeDirectBuffer(ByteBuffer buffer) {
+
+      }
+
+      @Override
+      public void clearContext() {
+
+      }
+
+      @Override
+      public void confirmPendingLargeMessageTX(Transaction transaction,
+                                               long messageID,
+                                               long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void confirmPendingLargeMessage(long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void storeMessage(ServerMessage message) throws Exception {
+
+      }
+
+      @Override
+      public void storeReference(long queueID, long messageID, boolean last) throws Exception {
+
+      }
+
+      @Override
+      public void deleteMessage(long messageID) throws Exception {
+
+      }
+
+      @Override
+      public void storeAcknowledge(long queueID, long messageID) throws Exception {
+
+      }
+
+      @Override
+      public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception {
+
+      }
+
+      @Override
+      public void updateDeliveryCount(MessageReference ref) throws Exception {
+
+      }
+
+      @Override
+      public void updateScheduledDeliveryTime(MessageReference ref) throws Exception {
+
+      }
+
+      @Override
+      public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void deleteDuplicateID(long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void storeMessageTransactional(long txID, ServerMessage message) throws Exception {
+
+      }
+
+      @Override
+      public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception {
+
+      }
+
+      @Override
+      public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception {
+
+      }
+
+      @Override
+      public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception {
+
+      }
+
+      @Override
+      public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception {
+
+      }
+
+      @Override
+      public void deleteCursorAcknowledge(long ackID) throws Exception {
+
+      }
+
+      @Override
+      public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception {
+
+      }
+
+      @Override
+      public void deletePageComplete(long ackID) throws Exception {
+
+      }
+
+      @Override
+      public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception {
+
+      }
+
+      @Override
+      public void storeDuplicateIDTransactional(long txID,
+                                                SimpleString address,
+                                                byte[] duplID,
+                                                long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void updateDuplicateIDTransactional(long txID,
+                                                 SimpleString address,
+                                                 byte[] duplID,
+                                                 long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception {
+
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage() {
+         return null;
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception {
+         return null;
+      }
+
+      @Override
+      public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
+         return null;
+      }
+
+      @Override
+      public void prepare(long txID, Xid xid) throws Exception {
+
+      }
+
+      @Override
+      public void commit(long txID) throws Exception {
+
+      }
+
+      @Override
+      public void commit(long txID, boolean lineUpContext) throws Exception {
+
+      }
+
+      @Override
+      public void rollback(long txID) throws Exception {
+
+      }
+
+      @Override
+      public void rollbackBindings(long txID) throws Exception {
+
+      }
+
+      @Override
+      public void commitBindings(long txID) throws Exception {
+
+      }
+
+      @Override
+      public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception {
+
+      }
+
+      @Override
+      public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception {
+
+      }
+
+      @Override
+      public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception {
+
+      }
+
+      @Override
+      public void deletePageTransactional(long recordID) throws Exception {
+
+      }
+
+      @Override
+      public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
+                                                       PagingManager pagingManager,
+                                                       ResourceManager resourceManager,
+                                                       Map<Long, QueueBindingInfo> queueInfos,
+                                                       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+                                                       Set<Pair<Long, Long>> pendingLargeMessages,
+                                                       List<PageCountPending> pendingNonTXPageCounter,
+                                                       JournalLoader journalLoader) throws Exception {
+         return null;
+      }
+
+      @Override
+      public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public void deleteHeuristicCompletion(long id) throws Exception {
+
+      }
+
+      @Override
+      public void addQueueBinding(long tx, Binding binding) throws Exception {
+
+      }
+
+      @Override
+      public void deleteQueueBinding(long tx, long queueBindingID) throws Exception {
+
+      }
+
+      @Override
+      public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
+                                                       List<GroupingInfo> groupingInfos) throws Exception {
+         return null;
+      }
+
+      @Override
+      public void addGrouping(GroupBinding groupBinding) throws Exception {
+
+      }
+
+      @Override
+      public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception {
+
+      }
+
+      @Override
+      public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception {
+
+      }
+
+      @Override
+      public void deleteAddressSetting(SimpleString addressMatch) throws Exception {
+
+      }
+
+      @Override
+      public List<PersistedAddressSetting> recoverAddressSettings() throws Exception {
+         return null;
+      }
+
+      @Override
+      public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception {
+
+      }
+
+      @Override
+      public void deleteSecurityRoles(SimpleString addressMatch) throws Exception {
+
+      }
+
+      @Override
+      public List<PersistedRoles> recoverPersistedRoles() throws Exception {
+         return null;
+      }
+
+      @Override
+      public long storePageCounter(long txID, long queueID, long value) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public void deleteIncrementRecord(long txID, long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void deletePageCounter(long txID, long recordID) throws Exception {
+
+      }
+
+      @Override
+      public void deletePendingPageCounter(long txID, long recordID) throws Exception {
+
+      }
+
+      @Override
+      public long storePageCounterInc(long txID, long queueID, int add) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public long storePageCounterInc(long queueID, int add) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public Journal getBindingsJournal() {
+         return null;
+      }
+
+      @Override
+      public Journal getMessageJournal() {
+         return null;
+      }
+
+      @Override
+      public void startReplication(ReplicationManager replicationManager,
+                                   PagingManager pagingManager,
+                                   String nodeID,
+                                   boolean autoFailBack,
+                                   long initialReplicationSyncTimeout) throws Exception {
+
+      }
+
+      @Override
+      public boolean addToPage(PagingStore store,
+                               ServerMessage msg,
+                               Transaction tx,
+                               RouteContextList listCtx) throws Exception {
+         return false;
+      }
+
+      @Override
+      public void stopReplication() {
+
+      }
+
+      @Override
+      public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception {
+
+      }
+
+      @Override
+      public void storeID(long journalID, long id) throws Exception {
+
+      }
+
+      @Override
+      public void deleteID(long journalD) throws Exception {
+
+      }
+
+      @Override
+      public void readLock() {
+
+      }
+
+      @Override
+      public void readUnLock() {
+
+      }
+
+      @Override
+      public void persistIdGenerator() {
+
+      }
+
+      @Override
+      public void start() throws Exception {
+
+      }
+
+      @Override
+      public void stop() throws Exception {
+
+      }
+
+      @Override
+      public boolean isStarted() {
+         return false;
+      }
+
+      @Override
+      public long generateID() {
+         return 0;
+      }
+
+      @Override
+      public long getCurrentID() {
+         return 0;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java
new file mode 100644
index 0000000..3874937
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredBridgeReconnectTest.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * This will simulate a failure of a failure.
+ * The bridge could eventually during a race or multiple failures not be able to reconnect because it failed again.
+ * this should make the bridge to always reconnect itself.
+ */
+@RunWith(BMUnitRunner.class)
+public class ClusteredBridgeReconnectTest extends ClusterTestBase {
+
+   static ThreadLocal<Boolean> inConnect = new ThreadLocal<Boolean>();
+
+   public static void enterConnect() {
+      inConnect.set(Boolean.TRUE);
+   }
+
+   public static void exitConnect() {
+      inConnect.set(null);
+   }
+
+   public static volatile boolean shouldFail = false;
+
+   public static void send() {
+      if (inConnect.get() != null) {
+         if (shouldFail) {
+            shouldFail = false;
+            throw new NullPointerException("just because it's a test...");
+         }
+      }
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "enter",
+         targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl",
+         targetMethod = "connect",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.enterConnect();"), @BMRule(
+         name = "exit",
+         targetClass = "org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl",
+         targetMethod = "connect",
+         targetLocation = "EXIT",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.exitConnect();"), @BMRule(
+         name = "send",
+         targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl",
+         targetMethod = "send(org.apache.activemq.artemis.core.protocol.core.Packet)",
+         targetLocation = "EXIT",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.ClusteredBridgeReconnectTest.send();")
+
+      })
+   public void testReconnectBridge() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 0, "queue0", null);
+      addConsumer(1, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+      ClientSession session0 = sfs[0].createSession();
+      ClientSession session1 = sfs[0].createSession();
+
+      session0.start();
+      session1.start();
+
+      ClientProducer producer = session0.createProducer("queues.testaddress");
+
+      int NUMBER_OF_MESSAGES = 100;
+
+      Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size());
+
+      ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0];
+      Assert.assertEquals(1, connection.getRecords().size());
+
+      MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
+      ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         ClientMessage msg = session0.createMessage(true);
+         producer.send(msg);
+         session0.commit();
+
+         if (i == 17) {
+            shouldFail = true;
+            bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!"));
+         }
+      }
+
+      int cons0Count = 0, cons1Count = 0;
+
+      while (true) {
+         ClientMessage msg = consumers[0].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons0Count++;
+         msg.acknowledge();
+         session0.commit();
+      }
+
+      while (true) {
+         ClientMessage msg = consumers[1].getConsumer().receive(1000);
+         if (msg == null) {
+            break;
+         }
+         cons1Count++;
+         msg.acknowledge();
+         session1.commit();
+      }
+
+      Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count);
+
+      session0.commit();
+      session1.commit();
+
+      stopServers(0, 1);
+
+   }
+
+   static CountDownLatch latch;
+   static CountDownLatch latch2;
+   static Thread main;
+
+   public static void pause(SimpleString clusterName) {
+      if (clusterName.toString().startsWith("queue0")) {
+         try {
+            latch2.countDown();
+            latch.await();
+         }
+         catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+      }
+   }
+
+   public static void pause2(Notification notification) {
+      if (notification.getType() == CoreNotificationType.BINDING_REMOVED) {
+         SimpleString clusterName = notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
+         boolean inMain = main == Thread.currentThread();
+         if (clusterName.toString().startsWith("queue0") && !inMain) {
+            try {
+               latch2.countDown();
+               latch.await();
+            }
+            catch (InterruptedException e) {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   public static void restart2() {
+      latch.countDown();
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      shouldFail = false;
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      super.tearDown();
+   }
+
+   public boolean isNetty() {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
index 5b2dd0d..52cc9d8 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
@@ -74,6 +74,11 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
       latchFlag.setCount(1);
    }
 
+   @Override
+   protected boolean usePersistence() {
+      return true;
+   }
+
    @Test
    @BMRules(
       rules = {@BMRule(
@@ -84,6 +89,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
          action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")})
    public void testConcurrentCancels() throws Exception {
 
+      System.out.println(server.getConfiguration().getJournalLocation().toString());
       server.getAddressSettingsRepository().clear();
       AddressSettings settings = new AddressSettings();
       settings.setMaxDeliveryAttempts(-1);
@@ -184,18 +190,6 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
                }
             }
          });
-         //
-         //         consumer.close();
-         //
-         //         threads.add(new Thread("ClientFailing")
-         //         {
-         //            public void run()
-         //            {
-         //               ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
-         //               impl.getConnection().fail(new HornetQException("failure"));
-         //            }
-         //         });
-         //
 
          for (Thread t : threads) {
             t.start();
@@ -213,47 +207,55 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase {
       }
 
       Connection connection = cf.createConnection();
-      connection.setClientID("myID");
-
-      Thread.sleep(2000); // I am too lazy to call end on all the transactions
-      connection.start();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageConsumer consumer = session.createConsumer(queue);
-      HashMap<Integer, AtomicInteger> mapCount = new HashMap<>();
-
-      while (true) {
-         TextMessage message = (TextMessage) consumer.receiveNoWait();
-         if (message == null) {
-            break;
-         }
+      try {
+         connection.setClientID("myID");
 
-         Integer value = message.getIntProperty("i");
+         Thread.sleep(5000); // I am too lazy to call end on all the transactions
+         connection.start();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(queue);
+         HashMap<Integer, AtomicInteger> mapCount = new HashMap<>();
+
+         while (true) {
+            TextMessage message = (TextMessage) consumer.receiveNoWait();
+            if (message == null) {
+               break;
+            }
 
-         AtomicInteger count = mapCount.get(value);
-         if (count == null) {
-            count = new AtomicInteger(0);
-            mapCount.put(message.getIntProperty("i"), count);
-         }
+            Integer value = message.getIntProperty("i");
 
-         count.incrementAndGet();
-      }
+            AtomicInteger count = mapCount.get(value);
+            if (count == null) {
+               count = new AtomicInteger(0);
+               mapCount.put(message.getIntProperty("i"), count);
+            }
 
-      boolean failed = false;
-      for (int i = 0; i < numberOfMessages; i++) {
-         AtomicInteger count = mapCount.get(i);
-         if (count == null) {
-            System.out.println("Message " + i + " not received");
-            failed = true;
+            count.incrementAndGet();
          }
-         else if (count.get() > 1) {
-            System.out.println("Message " + i + " received " + count.get() + " times");
-            failed = true;
+
+         boolean failed = false;
+         for (int i = 0; i < numberOfMessages; i++) {
+            AtomicInteger count = mapCount.get(i);
+            if (count == null) {
+               System.out.println("Message " + i + " not received");
+               failed = true;
+            }
+            else if (count.get() > 1) {
+               System.out.println("Message " + i + " received " + count.get() + " times");
+               failed = true;
+            }
          }
-      }
 
-      Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed);
+         if (failed) {
+            System.err.println("Failed");
+            System.exit(-1);
+         }
 
-      connection.close();
+         Assert.assertFalse("test failed, look at the system.out of the test for more infomration", failed);
+      }
+      finally {
+         connection.close();
+      }
 
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/af1f79bf/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java
index 22611bc..76679ef 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/TimeoutXATest.java
@@ -27,6 +27,7 @@ import javax.jms.XASession;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -91,7 +92,7 @@ public class TimeoutXATest extends ActiveMQTestBase {
          @BMRule(
             name = "afterRollback TX",
             targetClass = "org.apache.activemq.artemis.core.transaction.impl.TransactionImpl",
-            targetMethod = "afterRollback()",
+            targetMethod = "afterRollback",
             targetLocation = "ENTRY",
             helper = "org.apache.activemq.artemis.tests.extras.byteman.TimeoutXATest",
             action = "afterRollback()")})
@@ -166,23 +167,20 @@ public class TimeoutXATest extends ActiveMQTestBase {
       Thread.sleep(1000);
       removingTXAwait0.countDown();
 
-      enteredRollbackLatch.await();
+      Assert.assertTrue(enteredRollbackLatch.await(10, TimeUnit.SECONDS));
 
       waitingRollbackLatch.countDown();
 
       t.join();
 
       consumer.close();
-//
-//      connction2.start();
-//
+
       consumer = session.createConsumer(queue);
       for (int i = 0; i < 10; i++) {
          Assert.assertNotNull(consumer.receive(5000));
       }
       Assert.assertNull(consumer.receiveNoWait());
-//      session.commit();
-//      session.close();
+
       connection.close();
       connction2.close();
 


Mime
View raw message