activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-302 - more work about improving resilience of MDBs and XA
Date Tue, 17 Nov 2015 15:10:05 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 06b0c3119 -> 5b824e1bc


ARTEMIS-302 - more work about improving resilience of MDBs and XA


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b1d50761
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b1d50761
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b1d50761

Branch: refs/heads/master
Commit: b1d5076108de3c93aaacc246c4228ad052148a04
Parents: 06b0c31
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Nov 16 16:06:08 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Nov 16 18:11:44 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ClientSessionImpl.java     |  11 +-
 .../core/client/impl/ClientSessionInternal.java |   2 +
 .../core/client/impl/DelegatingSession.java     |   4 +
 .../ra/inflow/ActiveMQMessageHandler.java       |  30 +++--
 .../artemis/core/server/impl/QueueImpl.java     |  13 +-
 .../core/server/impl/ServerConsumerImpl.java    |  46 ++++---
 .../core/server/impl/ServerSessionImpl.java     |  35 ++++-
 ...MDBMultipleHandlersServerDisconnectTest.java | 135 ++++++++++++++-----
 .../cluster/failover/FailoverTest.java          |   4 +-
 9 files changed, 206 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 6134784..0a24022 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -532,6 +532,10 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
       rollbackOnly = false;
    }
 
+   public void markRollbackOnly() {
+      rollbackOnly = true;
+   }
+
    public ClientMessage createMessage(final byte type,
                                       final boolean durable,
                                       final long expiration,
@@ -1036,7 +1040,12 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
 
       // we should never throw rollback if we have already prepared
       if (rollbackOnly) {
-         ActiveMQClientLogger.LOGGER.commitAfterFailover();
+         if (onePhase) {
+            throw new XAException(XAException.XAER_RMFAIL);
+         }
+         else {
+            ActiveMQClientLogger.LOGGER.commitAfterFailover();
+         }
       }
 
       // Note - don't need to flush acks since the previous end would have

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 06d6024..cd697c0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -93,6 +93,8 @@ public interface ClientSessionInternal extends ClientSession {
 
    void resetIfNeeded() throws ActiveMQException;
 
+   void markRollbackOnly();
+
    /**
     * This is used internally to control and educate the user
     * about using the thread boundaries properly.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
index 2d6f4a4..4d72dc9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
@@ -141,6 +141,10 @@ public class DelegatingSession implements ClientSessionInternal {
       session.close();
    }
 
+   public void markRollbackOnly() {
+      session.markRollbackOnly();
+   }
+
    public void commit() throws ActiveMQException {
       session.commit();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index f6a8535..82eb50d 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -38,9 +38,9 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.ra.ActiveMQRALogger;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.ra.ActiveMQRALogger;
 import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
 import org.apache.activemq.artemis.service.extensions.ServiceUtils;
 import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapper;
@@ -292,6 +292,11 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
          if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm
!= null) {
             tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
          }
+
+         if (trace) {
+            ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling beforeDelivery
on message " + message);
+         }
+
          endpoint.beforeDelivery(ActiveMQActivation.ONMESSAGE);
          beforeDelivery = true;
          msg.doBeforeReceive();
@@ -299,13 +304,17 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
          //In the transacted case the message must be acked *before* onMessage is called
 
          if (transacted) {
-            message.acknowledge();
+            message.individualAcknowledge();
          }
 
          ((MessageListener) endpoint).onMessage(msg);
 
          if (!transacted) {
-            message.acknowledge();
+            message.individualAcknowledge();
+         }
+
+         if (trace) {
+            ActiveMQRALogger.LOGGER.trace("HornetQMessageHandler::calling afterDelivery on
message " + message);
          }
 
          try {
@@ -313,6 +322,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
          }
          catch (ResourceException e) {
             ActiveMQRALogger.LOGGER.unableToCallAfterDelivery(e);
+            // If we get here, The TX was already rolled back
+            // However we must do some stuff now to make sure the client message buffer is
cleared
+            // so we mark this as rollbackonly
+            session.markRollbackOnly();
             return;
          }
          if (useLocalTx) {
@@ -340,13 +353,6 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
                }
                catch (Exception e1) {
                   ActiveMQRALogger.LOGGER.warn("unnable to clear the transaction", e1);
-                  try {
-                     session.rollback();
-                  }
-                  catch (ActiveMQException e2) {
-                     ActiveMQRALogger.LOGGER.warn("Unable to rollback", e2);
-                     return;
-                  }
                }
             }
 
@@ -369,6 +375,10 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
                ActiveMQRALogger.LOGGER.unableToRollbackTX();
             }
          }
+
+         // This is to make sure we will issue a rollback after failures
+         // so that would cleanup consumer buffers among other things
+         session.markRollbackOnly();
       }
       finally {
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ee60a14..a5b3622 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -45,8 +45,8 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
@@ -58,10 +58,10 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -1548,7 +1548,7 @@ public class QueueImpl implements Queue {
                      Binding binding = postOffice.getBinding(originalMessageQueue);
 
                      if (binding != null && binding instanceof LocalQueueBinding)
{
-                        targetQueue = ((LocalQueueBinding)binding).getID();
+                        targetQueue = ((LocalQueueBinding) binding).getID();
                         queues.put(originalMessageQueue, targetQueue);
                      }
                   }
@@ -1562,12 +1562,10 @@ public class QueueImpl implements Queue {
 
                }
 
-
             }
          }
       });
 
-
    }
 
    public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority)
throws Exception {
@@ -2025,6 +2023,9 @@ public class QueueImpl implements Queue {
    private void internalAddRedistributor(final Executor executor) {
       // create the redistributor only once if there are no local consumers
       if (consumerSet.isEmpty() && redistributor == null) {
+         if (isTrace) {
+            ActiveMQServerLogger.LOGGER.trace("QueueImpl::Adding redistributor on queue "
+ this.toString());
+         }
          redistributor = new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE);
 
          consumerList.add(new ConsumerHolder(redistributor));
@@ -2103,7 +2104,7 @@ public class QueueImpl implements Queue {
                      final MessageReference ref,
                      final boolean expiry,
                      final boolean rejectDuplicate,
-                     final long ... queueIDs) throws Exception {
+                     final long... queueIDs) throws Exception {
       ServerMessage copyMessage = makeCopy(ref, expiry);
 
       copyMessage.setAddress(toAddress);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8cc9912..ae30549 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -380,7 +380,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    @Override
    public void close(final boolean failed) throws Exception {
       if (isTrace) {
-         ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" +  this + " being closed
with failed=" + failed, new Exception("trace"));
+         ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed
with failed=" + failed, new Exception("trace"));
       }
 
       callback.removeReadyListener(this);
@@ -405,7 +405,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
          MessageReference ref = iter.next();
 
          if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" +  this + " cancelling
reference " + ref);
+            ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling
reference " + ref);
          }
 
          ref.getQueue().cancel(tx, ref, true);
@@ -662,14 +662,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
          MessageReference ref;
          do {
-            ref = deliveringRefs.poll();
+            synchronized (lock) {
+               ref = deliveringRefs.poll();
+            }
 
             if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
                ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx +
", consumer=" + this);
             }
 
             if (ref == null) {
-               throw ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
+               ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id,
messageID, messageQueue.getName());
+               if (tx != null) {
+                  tx.markAsRollbackOnly(ils);
+               }
+               throw ils;
             }
 
             ackReference(tx, ref);
@@ -719,7 +725,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       MessageReference ref = removeReferenceByID(messageID);
 
       if (ref == null) {
-         throw new IllegalStateException("Cannot find ref to ack " + messageID);
+         ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id,
messageID, messageQueue.getName());
+         if (tx != null) {
+            tx.markAsRollbackOnly(ils);
+         }
+         throw ils;
       }
 
       ackReference(tx, ref);
@@ -752,23 +762,29 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
       // Expiries can come in out of sequence with respect to delivery order
 
-      Iterator<MessageReference> iter = deliveringRefs.iterator();
+      synchronized (lock) {
+         // This is an optimization, if the reference is the first one, we just poll it.
+         if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
+            return deliveringRefs.poll();
+         }
+
+         Iterator<MessageReference> iter = deliveringRefs.iterator();
 
-      MessageReference ref = null;
+         MessageReference ref = null;
 
-      while (iter.hasNext()) {
-         MessageReference theRef = iter.next();
+         while (iter.hasNext()) {
+            MessageReference theRef = iter.next();
 
-         if (theRef.getMessage().getMessageID() == messageID) {
-            iter.remove();
+            if (theRef.getMessage().getMessageID() == messageID) {
+               iter.remove();
 
-            ref = theRef;
+               ref = theRef;
 
-            break;
+               break;
+            }
          }
+         return ref;
       }
-
-      return ref;
    }
 
    public void readyForWriting(final boolean ready) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3cd1d7b..16ec608 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -668,18 +669,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
    }
 
    public void acknowledge(final long consumerID, final long messageID) throws Exception
{
-      ServerConsumer consumer = consumers.get(consumerID);
-
-      if (consumer == null) {
-         throw ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
-      }
+      ServerConsumer consumer = findConsumer(consumerID);
 
       if (tx != null && tx.getState() == State.ROLLEDBACK) {
          // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
          // have these messages to be stuck on the limbo until the server is restarted
          // The tx has already timed out, so we need to ack and rollback immediately
          Transaction newTX = newTransaction();
-         consumer.acknowledge(newTX, messageID);
+         try {
+            consumer.acknowledge(newTX, messageID);
+         }
+         catch (Exception e) {
+            // just ignored
+            // will log it just in case
+            ActiveMQServerLogger.LOGGER.debug("Ignored exception while acking messageID "
+ messageID +
+                                                 " on a rolledback TX", e);
+         }
          newTX.rollback();
       }
       else {
@@ -687,9 +692,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
       }
    }
 
-   public void individualAcknowledge(final long consumerID, final long messageID) throws
Exception {
+   private ServerConsumer findConsumer(long consumerID) throws Exception {
       ServerConsumer consumer = consumers.get(consumerID);
 
+      if (consumer == null) {
+         Transaction currentTX = tx;
+         ActiveMQIllegalStateException exception = ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
+
+         if (currentTX != null) {
+            currentTX.markAsRollbackOnly(exception);
+         }
+
+         throw exception;
+      }
+      return consumer;
+   }
+
+   public void individualAcknowledge(final long consumerID, final long messageID) throws
Exception {
+      ServerConsumer consumer = findConsumer(consumerID);
+
       if (tx != null && tx.getState() == State.ROLLEDBACK) {
          // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
          // have these messages to be stuck on the limbo until the server is restarted

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
index 739578e..d500f6b 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
@@ -14,12 +14,20 @@ package org.apache.activemq.artemis.tests.extras.jms.ra;
 
 import javax.jms.Message;
 import javax.resource.ResourceException;
+import javax.resource.spi.LocalTransactionException;
 import javax.resource.spi.UnavailableException;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.reflect.Method;
 import java.util.LinkedList;
 import java.util.List;
@@ -37,12 +45,13 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
 import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl;
 import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.junit.After;
@@ -61,6 +70,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
 
    ServerLocator nettyLocator;
 
+   private volatile boolean playTXTimeouts = true;
+   private volatile boolean playServerClosingSession = true;
+   private volatile boolean playServerClosingConsumer = true;
+
    @Before
    public void setUp() throws Exception {
       nettyLocator = createNettyNonHALocator();
@@ -91,7 +104,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
    @Test
    public void testReconnectMDBNoMessageLoss() throws Exception {
       AddressSettings settings = new AddressSettings();
-      settings.setRedeliveryDelay(1000);
+      settings.setRedeliveryDelay(100);
       settings.setMaxDeliveryAttempts(-1);
       server.getAddressSettingsRepository().clear();
       server.getAddressSettingsRepository().addMatch("#", settings);
@@ -125,8 +138,9 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
       qResourceAdapter.endpointActivation(endpointFactory, spec);
 
       Assert.assertEquals(1, resourceAdapter.getActivations().values().size());
+      ActiveMQActivation activation = resourceAdapter.getActivations().values().toArray(new
ActiveMQActivation[1])[0];
 
-      final int NUMBER_OF_MESSAGES = 3000;
+      final int NUMBER_OF_MESSAGES = 1000;
 
       Thread producer = new Thread() {
          public void run() {
@@ -178,18 +192,12 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
                   return;
                }
 
-               List<ServerSession> serverSessions = new LinkedList<>();
-
-               for (ServerSession session : server.getSessions()) {
-                  if (session.getMetaData("resource-adapter") != null) {
-                     serverSessions.add(session);
-                  }
-               }
+               List<ServerSession> serverSessions = lookupServerSessions("resource-adapter");
 
                System.err.println("Contains " + serverSessions.size() + " RA sessions");
 
                if (serverSessions.size() != NUMBER_OF_SESSIONS) {
-                  System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS
+ " RA Sessions but it only contained accordingly to the meta-data");
+                  System.err.println("the server was supposed to have " + NUMBER_OF_MESSAGES
+ " RA Sessions but it only contained accordingly to the meta-data");
                   metaDataFailed.set(true);
                }
                else if (serverSessions.size() == NUMBER_OF_SESSIONS) {
@@ -197,12 +205,29 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
                   metaDataFailed.set(false);
                }
 
-               if (serverSessions.size() > 0) {
+               if (playServerClosingSession && serverSessions.size() > 0) {
 
                   int randomBother = RandomUtil.randomInterval(0, serverSessions.size() -
1);
                   System.out.println("bugging session " + randomBother);
 
-                  RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection();
+                  ServerSession serverSession = serverSessions.get(randomBother);
+
+                  if (playServerClosingConsumer && RandomUtil.randomBoolean()) {
+                     // will play this randomly, only half of the times
+                     for (ServerConsumer consumer : serverSession.getServerConsumers()) {
+                        try {
+                           // Simulating a rare race that could happen in production
+                           // where the consumer is closed while things are still happening
+                           consumer.close(true);
+                           Thread.sleep(100);
+                        }
+                        catch (Exception e) {
+                           e.printStackTrace();
+                        }
+                     }
+                  }
+
+                  RemotingConnection connection = serverSession.getRemotingConnection();
 
                   connection.fail(new ActiveMQException("failed at random " + randomBother));
                }
@@ -221,11 +246,21 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
       ClientConsumer consumer = session.createConsumer("jms.queue.outQueue");
 
       for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
-         ClientMessage message = consumer.receive(5000);
+         ClientMessage message = consumer.receive(60000);
          if (message == null) {
             break;
          }
 
+         if (i == NUMBER_OF_MESSAGES * 0.90) {
+            System.out.println("Disabled failures at " + i);
+            playTXTimeouts = false;
+            playServerClosingSession = false;
+            playServerClosingConsumer = false;
+
+         }
+
+         System.out.println("Received " + i + " messages");
+
          Assert.assertNotNull(message);
          message.acknowledge();
 
@@ -247,16 +282,19 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
       session.commit();
       Assert.assertNull(consumer.receiveImmediate());
 
+      StringWriter writer = new StringWriter();
+      PrintWriter out = new PrintWriter(writer);
+
       boolean failed = false;
       for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
          AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i));
 
          if (atomicInteger == null) {
-            System.out.println("didn't receive message with i=" + i);
+            out.println("didn't receive message with i=" + i);
             failed = true;
          }
          else if (atomicInteger.get() > 1) {
-            System.out.println("message with i=" + i + " received " + atomicInteger.get()
+ " times");
+            out.println("message with i=" + i + " received " + atomicInteger.get() + " times");
             failed = true;
          }
       }
@@ -266,15 +304,34 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
       buggerThread.join();
       producer.join();
 
-      Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly",
metaDataFailed.get());
+      qResourceAdapter.stop();
+
+      session.close();
+
+      if (failed) {
+         for (int i = 0; i < 10; i++) {
+            System.out.println("----------------------------------------------------");
+         }
+         System.out.println(writer.toString());
+      }
 
       Assert.assertFalse(failed);
 
       System.out.println("Received " + NUMBER_OF_MESSAGES + " messages");
 
-      qResourceAdapter.stop();
+      Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly",
metaDataFailed.get());
 
-      session.close();
+   }
+
+   private List<ServerSession> lookupServerSessions(String parameter) {
+      List<ServerSession> serverSessions = new LinkedList<ServerSession>();
+
+      for (ServerSession session : server.getSessions()) {
+         if (session.getMetaData(parameter) != null) {
+            serverSessions.add(session);
+         }
+      }
+      return serverSessions;
    }
 
    protected class TestEndpointFactory implements MessageEndpointFactory {
@@ -330,17 +387,10 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
          catch (Throwable e) {
             throw new RuntimeException(e.getMessage(), e);
          }
+
       }
 
       public void onMessage(Message message) {
-         //         try
-         //         {
-         //            System.out.println(Thread.currentThread().getName() + "**** onMessage
enter " + message.getIntProperty("i"));
-         //         }
-         //         catch (Exception e)
-         //         {
-         //         }
-
          Integer value = 0;
 
          try {
@@ -355,9 +405,15 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
          try {
             currentTX.enlistResource(endpointSession);
             ClientMessage message1 = endpointSession.createMessage(true);
-            message1.putIntProperty("i", message.getIntProperty("i"));
+            message1.putIntProperty("i", value);
             producer.send(message1);
             currentTX.delistResource(endpointSession, XAResource.TMSUCCESS);
+
+            if (playTXTimeouts) {
+               if (RandomUtil.randomInterval(0, 5) == 3) {
+                  Thread.sleep(2000);
+               }
+            }
          }
          catch (Exception e) {
             e.printStackTrace();
@@ -373,11 +429,26 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
 
       @Override
       public void afterDelivery() throws ResourceException {
+         // This is a copy & paste of what the Application server would do here
          try {
-            DummyTMLocator.tm.commit();
-            //            currentTX.commit();
+            if (currentTX.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
+               DummyTMLocator.tm.rollback();
+            }
+            else {
+               DummyTMLocator.tm.commit();
+            }
          }
-         catch (Throwable e) {
+         catch (HeuristicMixedException e) {
+            throw new LocalTransactionException(e);
+         }
+         catch (SystemException e) {
+            throw new LocalTransactionException(e);
+         }
+         catch (HeuristicRollbackException e) {
+            throw new LocalTransactionException(e);
+         }
+         catch (RollbackException e) {
+            throw new LocalTransactionException(e);
          }
          super.afterDelivery();
       }
@@ -389,7 +460,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
 
       public static void stopTM() {
          try {
-            TransactionManagerLocatorImpl.setTransactionManager(null);
             TransactionReaper.terminate(true);
             TxControl.disable(true);
          }
@@ -401,7 +471,6 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
 
       public static void startTM() {
          tm = new TransactionManagerImple();
-         TransactionManagerLocatorImpl.setTransactionManager(tm);
          TxControl.enable();
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b1d50761/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 9801a83..42d5211 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -1170,7 +1170,7 @@ public class FailoverTest extends FailoverTestBase {
       crash(session);
 
       try {
-         session.commit(xid, true);
+         session.commit(xid, false);
 
          Assert.fail("Should throw exception");
       }
@@ -1374,7 +1374,7 @@ public class FailoverTest extends FailoverTestBase {
       crash(session2);
 
       try {
-         session2.commit(xid, true);
+         session2.commit(xid, false);
 
          Assert.fail("Should throw exception");
       }


Mime
View raw message