activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-302 - Improving XA Resilience
Date Wed, 11 Nov 2015 16:21:37 GMT
ARTEMIS-302 - Improving XA Resilience


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7bbd17cd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7bbd17cd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7bbd17cd

Branch: refs/heads/master
Commit: 7bbd17cd3749e1e7a7c153eccfc20a8d5ddf9cf0
Parents: a21a447
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Nov 10 15:26:44 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Nov 11 09:50:58 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ClientSessionImpl.java     |  12 +-
 .../core/impl/ActiveMQSessionContext.java       |   7 +-
 .../core/impl/wireformat/MessagePacket.java     |   6 +
 .../spi/core/remoting/SessionContext.java       |   2 +
 .../core/ServerSessionPacketHandler.java        |  15 +
 .../core/server/impl/ServerConsumerImpl.java    |   6 +-
 .../core/server/impl/ServerSessionImpl.java     |  40 +-
 .../artemis/core/transaction/Transaction.java   |   2 +
 .../core/transaction/impl/TransactionImpl.java  |  61 ++-
 .../byteman/ConcurrentDeliveryCancelTest.java   | 148 +++----
 .../tests/extras/jms/bridge/JMSBridgeTest.java  |   2 +-
 .../bridge/TransactionManagerLocatorImpl.java   |   5 +-
 ...MDBMultipleHandlersServerDisconnectTest.java | 412 +++++++++++++++++++
 .../integration/ra/ActiveMQRATestBase.java      |   1 -
 .../integration/remoting/ReconnectTest.java     |  96 +++++
 .../artemis/tests/util/JMSTestBase.java         |   4 +-
 .../artemis/jms/tests/AcknowledgementTest.java  |   3 +-
 .../core/postoffice/impl/BindingsImplTest.java  |   5 +
 18 files changed, 684 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 93618af..6134784 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
 
 public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
@@ -57,7 +58,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    private final ClientSessionFactoryInternal sessionFactory;
 
-   private final String name;
+   private String name;
 
    private final String username;
 
@@ -857,6 +858,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
             boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);
 
             if (!reattached) {
+
+               // We change the name of the Session, otherwise the server could close it while we are still sending the recreate
+               // in certain failure scenarios
+               // For instance the fact we didn't change the name of the session after failover or reconnect
+               // was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency
+               this.name = UUIDGenerator.getInstance().generateStringUUID();
+
+               sessionContext.resetName(name);
+
                for (ClientConsumerInternal consumer : cloneConsumers()) {
                   consumer.clearAtFailover();
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index d8dc125..1bac653 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -112,7 +112,7 @@ public class ActiveMQSessionContext extends SessionContext {
    private final Channel sessionChannel;
    private final int serverVersion;
    private int confirmationWindow;
-   private final String name;
+   private String name;
 
    protected Channel getSessionChannel() {
       return sessionChannel;
@@ -122,6 +122,11 @@ public class ActiveMQSessionContext extends SessionContext {
       return name;
    }
 
+   public void resetName(String name) {
+      this.name = name;
+   }
+
+
    protected int getConfirmationWindow() {
       return confirmationWindow;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index 4ed86ba..160e9bc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -34,4 +34,10 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
       return message;
    }
 
+   public String toString() {
+      return this.getParentString() + ",message=" + message + "]";
+
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 3f1cc14..1bdaffc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -59,6 +59,8 @@ public abstract class SessionContext {
       this.session = session;
    }
 
+   public abstract void resetName(String name);
+
    /**
     * it will eather reattach or reconnect, preferably reattaching it.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 0cd4472..904a8bc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -115,6 +115,8 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
 public class ServerSessionPacketHandler implements ChannelHandler {
 
+   private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
+
    private final ServerSession session;
 
    private final StorageManager storageManager;
@@ -193,6 +195,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       boolean closeChannel = false;
       boolean requiresResponse = false;
 
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::handlePacket," + packet);
+      }
+
       try {
          try {
             switch (type) {
@@ -522,6 +528,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                              final Packet response,
                              final boolean flush,
                              final boolean closeChannel) {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::scheduling response::" + response);
+      }
+
       storageManager.afterCompleteOperations(new IOCallback() {
          public void onError(final int errorCode, final String errorMessage) {
             ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);
@@ -529,6 +539,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage));
 
             doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::response sent::" + response);
+            }
+
          }
 
          public void done() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 2bafa1f..8cc9912 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -379,8 +379,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    @Override
    public void close(final boolean failed) throws Exception {
-      if (isTrace)
-      {
+      if (isTrace) {
          ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" +  this + " being closed with failed=" + failed, new Exception("trace"));
       }
 
@@ -405,8 +404,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       while (iter.hasNext()) {
          MessageReference ref = iter.next();
 
-         if (isTrace)
-         {
+         if (isTrace) {
             ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" +  this + " cancelling reference " + ref);
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7c09a4c..3cd1d7b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.impl;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,11 +38,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -57,10 +56,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityStore;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -996,7 +995,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString());
 
          try {
-            if (tx.getState() != Transaction.State.PREPARED) {
+            if (!tx.isEffective()) {
                // we don't want to rollback anything prepared here
                if (tx.getXid() != null) {
                   resourceManager.removeTransaction(tx.getXid());
@@ -1025,27 +1024,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    public synchronized void xaFailed(final Xid xid) throws Exception {
-      if (tx != null) {
-         final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
+      Transaction theTX = resourceManager.getTransaction(xid);
 
-         throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
+      if (theTX == null) {
+         theTX = newTransaction(xid);
+         resourceManager.putTransaction(xid, theTX);
       }
-      else {
-
-         tx = newTransaction(xid);
-         tx.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation"));
 
-         if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx);
-         }
-
-         boolean added = resourceManager.putTransaction(xid, tx);
-
-         if (!added) {
-            final String msg = "Cannot start, there is already a xid " + tx.getXid();
+      if (theTX.isEffective()) {
+         ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared");
+         tx = null;
+      }
+      else {
+         theTX.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation"));
+         tx = theTX;
+      }
 
-            throw new ActiveMQXAException(XAException.XAER_DUPID, msg);
-         }
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index 7d6151a..eb1ab3c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -33,6 +33,8 @@ public interface Transaction {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
    }
 
+   boolean isEffective();
+
    void prepare() throws Exception;
 
    void commit() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index e991454..5b7d255 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.transaction.impl;
 
 import javax.transaction.xa.Xid;
-
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -104,6 +103,11 @@ public class TransactionImpl implements Transaction {
    // Transaction implementation
    // -----------------------------------------------------------
 
+   public boolean isEffective() {
+      return state == State.PREPARED || state == State.COMMITTED;
+
+   }
+
    public void setContainsPersistent() {
       containsPersistent = true;
    }
@@ -142,6 +146,10 @@ public class TransactionImpl implements Transaction {
       storageManager.readLock();
       try {
          synchronized (timeoutLock) {
+            if (isEffective()) {
+               ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call");
+               return;
+            }
             if (state == State.ROLLBACK_ONLY) {
                if (exception != null) {
                   // this TX will never be rolled back,
@@ -197,6 +205,11 @@ public class TransactionImpl implements Transaction {
 
    public void commit(final boolean onePhase) throws Exception {
       synchronized (timeoutLock) {
+         if (state == State.COMMITTED) {
+            // I don't think this could happen, but just in case
+            ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call");
+            return;
+         }
          if (state == State.ROLLBACK_ONLY) {
             rollback();
 
@@ -248,15 +261,21 @@ public class TransactionImpl implements Transaction {
     */
    protected void doCommit() throws Exception {
       if (containsPersistent || xid != null && state == State.PREPARED) {
-
+         // ^^ These are the scenarios where we require a storage.commit
+         // for anything else we won't use the journal
          storageManager.commit(id);
-
-         state = State.COMMITTED;
       }
+
+      state = State.COMMITTED;
    }
 
    public void rollback() throws Exception {
       synchronized (timeoutLock) {
+         if (state == State.ROLLEDBACK) {
+            // I don't think this could happen, but just in case
+            ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace"));
+            return;
+         }
          if (xid != null) {
             if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) {
                throw new IllegalStateException("Transaction is in invalid state " + state);
@@ -290,17 +309,21 @@ public class TransactionImpl implements Transaction {
    }
 
    public void suspend() {
-      if (state != State.ACTIVE) {
-         throw new IllegalStateException("Can only suspend active transaction");
+      synchronized (timeoutLock) {
+         if (state != State.ACTIVE) {
+            throw new IllegalStateException("Can only suspend active transaction");
+         }
+         state = State.SUSPENDED;
       }
-      state = State.SUSPENDED;
    }
 
    public void resume() {
-      if (state != State.SUSPENDED) {
-         throw new IllegalStateException("Can only resume a suspended transaction");
+      synchronized (timeoutLock) {
+         if (state != State.SUSPENDED) {
+            throw new IllegalStateException("Can only resume a suspended transaction");
+         }
+         state = State.ACTIVE;
       }
-      state = State.ACTIVE;
    }
 
    public Transaction.State getState() {
@@ -316,12 +339,19 @@ public class TransactionImpl implements Transaction {
    }
 
    public void markAsRollbackOnly(final ActiveMQException exception1) {
-      if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only");
-      }
-      state = State.ROLLBACK_ONLY;
+      synchronized (timeoutLock) {
+         if (isEffective()) {
+            ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)");
+            return;
+         }
 
-      this.exception = exception1;
+         if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only");
+         }
+         state = State.ROLLBACK_ONLY;
+
+         this.exception = exception1;
+      }
    }
 
    public synchronized void addOperation(final TransactionOperation operation) {
@@ -425,6 +455,7 @@ public class TransactionImpl implements Transaction {
       return "TransactionImpl [xid=" + xid +
          ", id=" +
          id +
+         ", xid=" + xid +
          ", state=" +
          state +
          ", createTime=" +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
index c00985f..4629be3 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java
@@ -49,51 +49,40 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-/** This test will force two consumers to be waiting on close and introduce a race I saw in a production system */
+/**
+ * This test will force two consumers to be waiting on close and introduce a race I saw in a production system
+ */
 @RunWith(BMUnitRunner.class)
-public class ConcurrentDeliveryCancelTest extends JMSTestBase
-{
+public class ConcurrentDeliveryCancelTest extends JMSTestBase {
 
    // used to wait the thread to align at the same place and create the race
    private static final ReusableLatch latchEnter = new ReusableLatch(2);
    // used to start
    private static final ReusableLatch latchFlag = new ReusableLatch(1);
 
-   public static void enterCancel()
-   {
+   public static void enterCancel() {
       latchEnter.countDown();
-      try
-      {
+      try {
          latchFlag.await();
       }
-      catch (Exception ignored)
-      {
+      catch (Exception ignored) {
       }
    }
 
-   public static void resetLatches(int numberOfThreads)
-   {
+   public static void resetLatches(int numberOfThreads) {
       latchEnter.setCount(numberOfThreads);
       latchFlag.setCount(1);
    }
 
    @Test
-   @BMRules
-      (
-         rules =
-            {
-               @BMRule
-                  (
-                     name = "enterCancel-holdThere",
-                     targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl",
-                     targetMethod = "close",
-                     targetLocation = "ENTRY",
-                     action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();"
-                  )
-            }
-      )
-   public void testConcurrentCancels() throws Exception
-   {
+   @BMRules(
+      rules = {@BMRule(
+         name = "enterCancel-holdThere",
+         targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl",
+         targetMethod = "close",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")})
+   public void testConcurrentCancels() throws Exception {
 
       server.getAddressSettingsRepository().clear();
       AddressSettings settings = new AddressSettings();
@@ -103,10 +92,8 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
       cf.setReconnectAttempts(0);
       cf.setRetryInterval(10);
 
-
       System.out.println(".....");
-      for (ServerSession srvSess : server.getSessions())
-      {
+      for (ServerSession srvSess : server.getSessions()) {
          System.out.println(srvSess);
       }
 
@@ -120,8 +107,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
          Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
          MessageProducer producer = session.createProducer(queue);
 
-         for (int i = 0; i < numberOfMessages; i++)
-         {
+         for (int i = 0; i < numberOfMessages; i++) {
             TextMessage msg = session.createTextMessage("message " + i);
             msg.setIntProperty("i", i);
             producer.send(msg);
@@ -131,24 +117,22 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
          connection.close();
       }
 
-      for (int i = 0; i < 100; i++)
-      {
+      for (int i = 0; i < 100; i++) {
          XAConnectionFactory xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test");
 
          final XAConnection connection = xacf.createXAConnection();
          final XASession theSession = connection.createXASession();
-         ((ActiveMQSession)theSession).getCoreSession().addMetaData("theSession", "true");
+         ((ActiveMQSession) theSession).getCoreSession().addMetaData("theSession", "true");
 
          connection.start();
 
          final MessageConsumer consumer = theSession.createConsumer(queue);
 
-         XidImpl xid =  newXID();
+         XidImpl xid = newXID();
          theSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
          theSession.getXAResource().setTransactionTimeout(1); // I'm setting a small timeout just because I'm lazy to call end myself
 
-         for (int msg = 0; msg < 11; msg++)
-         {
+         for (int msg = 0; msg < 11; msg++) {
             Assert.assertNotNull(consumer.receiveNoWait());
          }
 
@@ -157,83 +141,68 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
          final List<ServerSession> serverSessions = new LinkedList<ServerSession>();
 
          // We will force now the failure simultaneously from several places
-         for (ServerSession srvSess : server.getSessions())
-         {
-            if (srvSess.getMetaData("theSession") != null)
-            {
+         for (ServerSession srvSess : server.getSessions()) {
+            if (srvSess.getMetaData("theSession") != null) {
                System.out.println(srvSess);
                serverSessions.add(srvSess);
             }
          }
 
-
          resetLatches(2); // from Transactional reaper
 
          List<Thread> threads = new LinkedList<Thread>();
 
-         threads.add(new Thread("ConsumerCloser")
-         {
-            public void run()
-            {
-               try
-               {
+         threads.add(new Thread("ConsumerCloser") {
+            public void run() {
+               try {
                   System.out.println(Thread.currentThread().getName() + " closing consumer");
                   consumer.close();
                   System.out.println(Thread.currentThread().getName() + " closed consumer");
                }
-               catch (Exception e)
-               {
+               catch (Exception e) {
                   e.printStackTrace();
                }
             }
          });
 
-         threads.add(new Thread("SessionCloser")
-         {
-            public void run()
-            {
-               for (ServerSession sess : serverSessions)
-               {
+         threads.add(new Thread("SessionCloser") {
+            public void run() {
+               for (ServerSession sess : serverSessions) {
                   System.out.println("Thread " + Thread.currentThread().getName() + " starting");
-                  try
-                  {
+                  try {
                      // A session.close could sneak in through failover or some other scenarios.
                      // a call to RemotingConnection.fail wasn't replicating the issue.
                      // I needed to call Session.close() directly to replicate what was happening in production
                      sess.close(true);
                   }
-                  catch (Exception e)
-                  {
+                  catch (Exception e) {
                      e.printStackTrace();
                   }
                   System.out.println("Thread " + Thread.currentThread().getName() + " done");
                }
             }
          });
-//
-//         consumer.close();
-//
-//         threads.add(new Thread("ClientFailing")
-//         {
-//            public void run()
-//            {
-//               ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
-//               impl.getConnection().fail(new HornetQException("failure"));
-//            }
-//         });
-//
-
-
-         for (Thread t : threads)
-         {
+         //
+         //         consumer.close();
+         //
+         //         threads.add(new Thread("ClientFailing")
+         //         {
+         //            public void run()
+         //            {
+         //               ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession();
+         //               impl.getConnection().fail(new HornetQException("failure"));
+         //            }
+         //         });
+         //
+
+         for (Thread t : threads) {
             t.start();
          }
 
          Assert.assertTrue(latchEnter.await(10, TimeUnit.MINUTES));
          latchFlag.countDown();
 
-         for (Thread t: threads)
-         {
+         for (Thread t : threads) {
             t.join(5000);
             Assert.assertFalse(t.isAlive());
          }
@@ -250,19 +219,16 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
       MessageConsumer consumer = session.createConsumer(queue);
       HashMap<Integer, AtomicInteger> mapCount = new HashMap<Integer, AtomicInteger>();
 
-      while (true)
-      {
-         TextMessage message = (TextMessage)consumer.receiveNoWait();
-         if (message == null)
-         {
+      while (true) {
+         TextMessage message = (TextMessage) consumer.receiveNoWait();
+         if (message == null) {
             break;
          }
 
          Integer value = message.getIntProperty("i");
 
          AtomicInteger count = mapCount.get(value);
-         if (count == null)
-         {
+         if (count == null) {
             count = new AtomicInteger(0);
             mapCount.put(message.getIntProperty("i"), count);
          }
@@ -271,16 +237,13 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
       }
 
       boolean failed = false;
-      for (int i = 0; i < numberOfMessages; i++)
-      {
+      for (int i = 0; i < numberOfMessages; i++) {
          AtomicInteger count = mapCount.get(i);
-         if (count == null)
-         {
+         if (count == null) {
             System.out.println("Message " + i + " not received");
             failed = true;
          }
-         else if (count.get() > 1)
-         {
+         else if (count.get() > 1) {
             System.out.println("Message " + i + " received " + count.get() + " times");
             failed = true;
          }
@@ -290,6 +253,5 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase
 
       connection.close();
 
-
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
index ce08f0e..f7f439d 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -1727,7 +1727,7 @@ public class JMSBridgeTest extends BridgeTestBase {
 
    @Test
    public void testSetTMClass() throws Exception {
-      TransactionManagerLocatorImpl.tm = new DummyTransactionManager();
+      TransactionManagerLocatorImpl.setTransactionManager(new DummyTransactionManager());
 
       JMSBridgeImpl bridge = null;
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
index c883f47..7523b98 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java
@@ -23,14 +23,15 @@ import org.apache.activemq.artemis.service.extensions.transactions.TransactionMa
 
 public class TransactionManagerLocatorImpl implements TransactionManagerLocator {
 
-   public static TransactionManager tm = null;
+   private static TransactionManager tm = null;
 
    @Override
    public TransactionManager getTransactionManager() {
+      new Exception("trace").printStackTrace();
       return tm;
    }
 
-   public void setTransactionManager(TransactionManager transactionManager) {
+   public static void setTransactionManager(TransactionManager transactionManager) {
       tm = transactionManager;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
new file mode 100644
index 0000000..739578e
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
@@ -0,0 +1,412 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.jms.ra;
+
+import javax.jms.Message;
+import javax.resource.ResourceException;
+import javax.resource.spi.UnavailableException;
+import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl;
+import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simulates several messages being received over multiple instances with reconnects during the process.
+ */
+public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase {
+
+   final ConcurrentHashMap<Integer, AtomicInteger> mapCounter = new ConcurrentHashMap<Integer, AtomicInteger>();
+
+   volatile ActiveMQResourceAdapter resourceAdapter;
+
+   ServerLocator nettyLocator;
+
+   @Before
+   public void setUp() throws Exception {
+      nettyLocator = createNettyNonHALocator();
+      nettyLocator.setRetryInterval(10);
+      nettyLocator.setReconnectAttempts(-1);
+      mapCounter.clear();
+      resourceAdapter = null;
+      super.setUp();
+      createQueue(true, "outQueue");
+      DummyTMLocator.startTM();
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      DummyTMLocator.stopTM();
+      super.tearDown();
+   }
+
+   protected boolean usePersistence() {
+      return true;
+   }
+
+   @Override
+   public boolean useSecurity() {
+      return false;
+   }
+
+   @Test
+   public void testReconnectMDBNoMessageLoss() throws Exception {
+      AddressSettings settings = new AddressSettings();
+      settings.setRedeliveryDelay(1000);
+      settings.setMaxDeliveryAttempts(-1);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", settings);
+      ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+      resourceAdapter = qResourceAdapter;
+
+      //      qResourceAdapter.setTransactionManagerLocatorClass(DummyTMLocator.class.getName());
+      //      qResourceAdapter.setTransactionManagerLocatorMethod("getTM");
+
+      MyBootstrapContext ctx = new MyBootstrapContext();
+
+      qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
+      qResourceAdapter.start(ctx);
+
+      final int NUMBER_OF_SESSIONS = 10;
+
+      ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+      spec.setMaxSession(NUMBER_OF_SESSIONS);
+      spec.setTransactionTimeout(1);
+      spec.setReconnectAttempts(-1);
+      spec.setConfirmationWindowSize(-1);
+      spec.setReconnectInterval(1000);
+      spec.setCallTimeout(1000L);
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+      spec.setConsumerWindowSize(1024 * 1024);
+
+      TestEndpointFactory endpointFactory = new TestEndpointFactory(true);
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+      Assert.assertEquals(1, resourceAdapter.getActivations().values().size());
+
+      final int NUMBER_OF_MESSAGES = 3000;
+
+      Thread producer = new Thread() {
+         public void run() {
+            try {
+               ServerLocator locator = createInVMLocator(0);
+               ClientSessionFactory factory = locator.createSessionFactory();
+               ClientSession session = factory.createSession(false, false);
+
+               ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+
+               for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+
+                  ClientMessage message = session.createMessage(true);
+
+                  message.getBodyBuffer().writeString("teststring " + i);
+                  message.putIntProperty("i", i);
+
+                  clientProducer.send(message);
+
+                  if (i % 100 == 0) {
+                     session.commit();
+                  }
+               }
+               session.commit();
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+            }
+
+         }
+      };
+
+      producer.start();
+
+      final AtomicBoolean metaDataFailed = new AtomicBoolean(false);
+
+      // This thread will keep bugging the handlers.
+      // if they behave well with XA, the test pass!
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      Thread buggerThread = new Thread() {
+         public void run() {
+            while (running.get()) {
+               try {
+                  Thread.sleep(RandomUtil.randomInterval(100, 200));
+               }
+               catch (InterruptedException intex) {
+                  intex.printStackTrace();
+                  return;
+               }
+
+               List<ServerSession> serverSessions = new LinkedList<>();
+
+               for (ServerSession session : server.getSessions()) {
+                  if (session.getMetaData("resource-adapter") != null) {
+                     serverSessions.add(session);
+                  }
+               }
+
+               System.err.println("Contains " + serverSessions.size() + " RA sessions");
+
+               if (serverSessions.size() != NUMBER_OF_SESSIONS) {
+                  System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS + " RA Sessions but it only contained accordingly to the meta-data");
+                  metaDataFailed.set(true);
+               }
+               else if (serverSessions.size() == NUMBER_OF_SESSIONS) {
+                  // it became the same after some reconnect? which would be acceptable
+                  metaDataFailed.set(false);
+               }
+
+               if (serverSessions.size() > 0) {
+
+                  int randomBother = RandomUtil.randomInterval(0, serverSessions.size() - 1);
+                  System.out.println("bugging session " + randomBother);
+
+                  RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection();
+
+                  connection.fail(new ActiveMQException("failed at random " + randomBother));
+               }
+            }
+
+         }
+      };
+
+      buggerThread.start();
+
+      ServerLocator locator = createInVMLocator(0);
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer("jms.queue.outQueue");
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         ClientMessage message = consumer.receive(5000);
+         if (message == null) {
+            break;
+         }
+
+         Assert.assertNotNull(message);
+         message.acknowledge();
+
+         Integer value = message.getIntProperty("i");
+         AtomicInteger mapCount = new AtomicInteger(1);
+
+         mapCount = mapCounter.putIfAbsent(value, mapCount);
+
+         if (mapCount != null) {
+            mapCount.incrementAndGet();
+         }
+
+         if (i % 200 == 0) {
+            System.out.println("received " + i);
+            session.commit();
+         }
+      }
+
+      session.commit();
+      Assert.assertNull(consumer.receiveImmediate());
+
+      boolean failed = false;
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i));
+
+         if (atomicInteger == null) {
+            System.out.println("didn't receive message with i=" + i);
+            failed = true;
+         }
+         else if (atomicInteger.get() > 1) {
+            System.out.println("message with i=" + i + " received " + atomicInteger.get() + " times");
+            failed = true;
+         }
+      }
+
+      running.set(false);
+
+      buggerThread.join();
+      producer.join();
+
+      Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get());
+
+      Assert.assertFalse(failed);
+
+      System.out.println("Received " + NUMBER_OF_MESSAGES + " messages");
+
+      qResourceAdapter.stop();
+
+      session.close();
+   }
+
+   protected class TestEndpointFactory implements MessageEndpointFactory {
+
+      private final boolean isDeliveryTransacted;
+
+      public TestEndpointFactory(boolean deliveryTransacted) {
+         isDeliveryTransacted = deliveryTransacted;
+      }
+
+      public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
+         TestEndpoint retEnd = new TestEndpoint();
+         if (xaResource != null) {
+            retEnd.setXAResource(xaResource);
+         }
+         return retEnd;
+      }
+
+      public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+         return isDeliveryTransacted;
+      }
+   }
+
+   public class TestEndpoint extends DummyMessageEndpoint {
+
+      ClientSessionFactory factory;
+      ClientSession endpointSession;
+      ClientProducer producer;
+
+      Transaction currentTX;
+
+      public TestEndpoint() {
+         super(null);
+         try {
+            factory = nettyLocator.createSessionFactory();
+            //            buggingList.add(factory);
+            endpointSession = factory.createSession(true, false, false);
+            producer = endpointSession.createProducer("jms.queue.outQueue");
+         }
+         catch (Throwable e) {
+            throw new RuntimeException(e);
+         }
+      }
+
+      @Override
+      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
+         super.beforeDelivery(method);
+         try {
+            DummyTMLocator.tm.begin();
+            currentTX = DummyTMLocator.tm.getTransaction();
+            currentTX.enlistResource(xaResource);
+         }
+         catch (Throwable e) {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+      public void onMessage(Message message) {
+         //         try
+         //         {
+         //            System.out.println(Thread.currentThread().getName() + "**** onMessage enter " + message.getIntProperty("i"));
+         //         }
+         //         catch (Exception e)
+         //         {
+         //         }
+
+         Integer value = 0;
+
+         try {
+            value = message.getIntProperty("i");
+         }
+         catch (Exception e) {
+
+         }
+
+         super.onMessage(message);
+
+         try {
+            currentTX.enlistResource(endpointSession);
+            ClientMessage message1 = endpointSession.createMessage(true);
+            message1.putIntProperty("i", message.getIntProperty("i"));
+            producer.send(message1);
+            currentTX.delistResource(endpointSession, XAResource.TMSUCCESS);
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+            try {
+               currentTX.setRollbackOnly();
+            }
+            catch (Exception ex) {
+            }
+            e.printStackTrace();
+            //            throw new RuntimeException(e);
+         }
+      }
+
+      @Override
+      public void afterDelivery() throws ResourceException {
+         try {
+            DummyTMLocator.tm.commit();
+            //            currentTX.commit();
+         }
+         catch (Throwable e) {
+         }
+         super.afterDelivery();
+      }
+   }
+
+   public static class DummyTMLocator {
+
+      public static TransactionManagerImple tm;
+
+      public static void stopTM() {
+         try {
+            TransactionManagerLocatorImpl.setTransactionManager(null);
+            TransactionReaper.terminate(true);
+            TxControl.disable(true);
+         }
+         catch (Exception e) {
+            e.printStackTrace();
+         }
+         tm = null;
+      }
+
+      public static void startTM() {
+         tm = new TransactionManagerImple();
+         TransactionManagerLocatorImpl.setTransactionManager(tm);
+         TxControl.enable();
+      }
+
+      public TransactionManager getTM() {
+         return tm;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java
index f5895c1..75dc976 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java
@@ -138,7 +138,6 @@ public abstract class ActiveMQRATestBase extends JMSTestBase {
 
       public void onMessage(Message message) {
          lastMessage = (ActiveMQMessage) message;
-         System.err.println(message);
       }
 
       public void reset(CountDownLatch latch) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
index 7448393..3278a64 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
@@ -17,9 +17,14 @@
 package org.apache.activemq.artemis.tests.integration.remoting;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -110,6 +115,97 @@ public class ReconnectTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testMetadataAfterReconnectionNetty() throws Exception {
+      internalMetadataAfterRetry(true);
+   }
+
+   @Test
+   public void testMetadataAfterReconnectionInVM() throws Exception {
+      internalMetadataAfterRetry(false);
+   }
+
+   public void internalMetadataAfterRetry(final boolean isNetty) throws Exception {
+      final int pingPeriod = 1000;
+
+      ActiveMQServer server = createServer(false, isNetty);
+
+      server.start();
+
+      ClientSessionInternal session = null;
+
+      try {
+         for (int i = 0; i < 100; i++) {
+            ServerLocator locator = createFactory(isNetty);
+            locator.setClientFailureCheckPeriod(pingPeriod);
+            locator.setRetryInterval(1);
+            locator.setRetryIntervalMultiplier(1d);
+            locator.setReconnectAttempts(-1);
+            locator.setConfirmationWindowSize(-1);
+            ClientSessionFactory factory = createSessionFactory(locator);
+
+            session = (ClientSessionInternal) factory.createSession();
+
+            session.addMetaData("meta1", "meta1");
+
+            ServerSession[] sessions = countMetadata(server, "meta1", 1);
+            Assert.assertEquals(1, sessions.length);
+
+            final AtomicInteger count = new AtomicInteger(0);
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            session.addFailoverListener(new FailoverEventListener() {
+               @Override
+               public void failoverEvent(FailoverEventType eventType) {
+                  if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
+                     latch.countDown();
+                  }
+               }
+            });
+
+            sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!"));
+
+            Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+            sessions = countMetadata(server, "meta1", 1);
+
+            Assert.assertEquals(1, sessions.length);
+
+            locator.close();
+         }
+      }
+      finally {
+         try {
+            session.close();
+         }
+         catch (Throwable e) {
+         }
+
+         server.stop();
+      }
+
+   }
+
+   private ServerSession[] countMetadata(ActiveMQServer server, String parameter, int expected) throws Exception {
+      List<ServerSession> sessionList = new LinkedList<ServerSession>();
+
+      for (int i = 0; i < 10 && sessionList.size() != expected; i++) {
+         sessionList.clear();
+         for (ServerSession sess : server.getSessions()) {
+            if (sess.getMetaData(parameter) != null) {
+               sessionList.add(sess);
+            }
+         }
+
+         if (sessionList.size() != expected) {
+            Thread.sleep(100);
+         }
+      }
+
+      return sessionList.toArray(new ServerSession[sessionList.size()]);
+   }
+
+   @Test
    public void testInterruptReconnectNetty() throws Exception {
       internalTestInterruptReconnect(true, false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java
index d468617..cb4ba0a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java
@@ -133,7 +133,9 @@ public class JMSTestBase extends ActiveMQTestBase {
 
       mbeanServer = MBeanServerFactory.createMBeanServer();
 
-      Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()).addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()).
+         addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)).
+         setTransactionTimeoutScanPeriod(100);
 
       server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence()));
       jmsServer = new JMSServerManagerImpl(server);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
index d467616..45f443a 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
@@ -782,8 +782,7 @@ public class AcknowledgementTest extends JMSTestCase {
 
       messageReceived = (TextMessage)consumer.receiveNoWait();
 
-      if (messageReceived != null)
-      {
+      if (messageReceived != null) {
          System.out.println("Message received " + messageReceived.getText());
       }
       Assert.assertNull(messageReceived);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index b79cb7f..3bfd628 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -108,6 +108,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
 
       }
 
+      @Override
+      public boolean isEffective() {
+         return false;
+      }
+
       public boolean hasTimedOut(long currentTime, int defaultTimeout) {
          return false;
       }


Mime
View raw message