Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D802F18D01 for ; Wed, 11 Nov 2015 16:21:35 +0000 (UTC) Received: (qmail 55602 invoked by uid 500); 11 Nov 2015 16:21:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 55492 invoked by uid 500); 11 Nov 2015 16:21:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 55339 invoked by uid 99); 11 Nov 2015 16:21:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 16:21:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 66926E0BD1; Wed, 11 Nov 2015 16:21:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Wed, 11 Nov 2015 16:21:37 -0000 Message-Id: <7c70517c3f8f4cea882e8845960d58d7@git.apache.org> In-Reply-To: <4c832538598f49aea31409dfd90afddb@git.apache.org> References: <4c832538598f49aea31409dfd90afddb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] activemq-artemis git commit: ARTEMIS-302 - Improving XA Resilience ARTEMIS-302 - Improving XA Resilience Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7bbd17cd Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7bbd17cd Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7bbd17cd Branch: refs/heads/master Commit: 7bbd17cd3749e1e7a7c153eccfc20a8d5ddf9cf0 Parents: a21a447 Author: Clebert Suconic Authored: Tue Nov 10 15:26:44 2015 -0500 Committer: Clebert Suconic Committed: Wed Nov 11 09:50:58 2015 -0500 ---------------------------------------------------------------------- .../core/client/impl/ClientSessionImpl.java | 12 +- .../core/impl/ActiveMQSessionContext.java | 7 +- .../core/impl/wireformat/MessagePacket.java | 6 + .../spi/core/remoting/SessionContext.java | 2 + .../core/ServerSessionPacketHandler.java | 15 + .../core/server/impl/ServerConsumerImpl.java | 6 +- .../core/server/impl/ServerSessionImpl.java | 40 +- .../artemis/core/transaction/Transaction.java | 2 + .../core/transaction/impl/TransactionImpl.java | 61 ++- .../byteman/ConcurrentDeliveryCancelTest.java | 148 +++---- .../tests/extras/jms/bridge/JMSBridgeTest.java | 2 +- .../bridge/TransactionManagerLocatorImpl.java | 5 +- ...MDBMultipleHandlersServerDisconnectTest.java | 412 +++++++++++++++++++ .../integration/ra/ActiveMQRATestBase.java | 1 - .../integration/remoting/ReconnectTest.java | 96 +++++ .../artemis/tests/util/JMSTestBase.java | 4 +- .../artemis/jms/tests/AcknowledgementTest.java | 3 +- .../core/postoffice/impl/BindingsImplTest.java | 5 + 18 files changed, 684 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 93618af..6134784 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -49,6 +49,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.XidCodecSupport; public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { @@ -57,7 +58,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final ClientSessionFactoryInternal sessionFactory; - private final String name; + private String name; private final String username; @@ -857,6 +858,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi boolean reattached = sessionContext.reattachOnNewConnection(backupConnection); if (!reattached) { + + // We change the name of the Session, otherwise the server could close it while we are still sending the recreate + // in certain failure scenarios + // For instance the fact we didn't change the name of the session after failover or reconnect + // was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency + this.name = UUIDGenerator.getInstance().generateStringUUID(); + + sessionContext.resetName(name); + for (ClientConsumerInternal consumer : cloneConsumers()) { consumer.clearAtFailover(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index d8dc125..1bac653 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -112,7 +112,7 @@ public class ActiveMQSessionContext extends SessionContext { private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; - private final String name; + private String name; protected Channel getSessionChannel() { return sessionChannel; @@ -122,6 +122,11 @@ public class ActiveMQSessionContext extends SessionContext { return name; } + public void resetName(String name) { + this.name = name; + } + + protected int getConfirmationWindow() { return confirmationWindow; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index 4ed86ba..160e9bc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -34,4 +34,10 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI return message; } + public String toString() { + return this.getParentString() + ",message=" + message + "]"; + + } + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 3f1cc14..1bdaffc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -59,6 +59,8 @@ public abstract class SessionContext { this.session = session; } + public abstract void resetName(String name); + /** * it will eather reattach or reconnect, preferably reattaching it. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 0cd4472..904a8bc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -115,6 +115,8 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; public class ServerSessionPacketHandler implements ChannelHandler { + private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + private final ServerSession session; private final StorageManager storageManager; @@ -193,6 +195,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { boolean closeChannel = false; boolean requiresResponse = false; + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::handlePacket," + packet); + } + try { try { switch (type) { @@ -522,6 +528,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { final Packet response, final boolean flush, final boolean closeChannel) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::scheduling response::" + response); + } + storageManager.afterCompleteOperations(new IOCallback() { public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); @@ -529,6 +539,11 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); + + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("ServerSessionPacketHandler::response sent::" + response); + } + } public void done() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 2bafa1f..8cc9912 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -379,8 +379,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void close(final boolean failed) throws Exception { - if (isTrace) - { + if (isTrace) { ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } @@ -405,8 +404,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { while (iter.hasNext()) { MessageReference ref = iter.next(); - if (isTrace) - { + if (isTrace) { ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " cancelling reference " + ref); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 7c09a4c..3cd1d7b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.impl; import javax.transaction.xa.XAException; import javax.transaction.xa.Xid; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -39,11 +38,11 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -57,10 +56,10 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityStore; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -996,7 +995,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(tx.getXid().toString(), xid.toString()); try { - if (tx.getState() != Transaction.State.PREPARED) { + if (!tx.isEffective()) { // we don't want to rollback anything prepared here if (tx.getXid() != null) { resourceManager.removeTransaction(tx.getXid()); @@ -1025,27 +1024,24 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } public synchronized void xaFailed(final Xid xid) throws Exception { - if (tx != null) { - final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid(); + Transaction theTX = resourceManager.getTransaction(xid); - throw new ActiveMQXAException(XAException.XAER_PROTO, msg); + if (theTX == null) { + theTX = newTransaction(xid); + resourceManager.putTransaction(xid, theTX); } - else { - - tx = newTransaction(xid); - tx.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation")); - if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx); - } - - boolean added = resourceManager.putTransaction(xid, tx); - - if (!added) { - final String msg = "Cannot start, there is already a xid " + tx.getXid(); + if (theTX.isEffective()) { + ActiveMQServerLogger.LOGGER.debug("Client failed with Xid " + xid + " but the server already had it prepared"); + tx = null; + } + else { + theTX.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation")); + tx = theTX; + } - throw new ActiveMQXAException(XAException.XAER_DUPID, msg); - } + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("xastart into tx= " + tx); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java index 7d6151a..eb1ab3c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java @@ -33,6 +33,8 @@ public interface Transaction { ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY } + boolean isEffective(); + void prepare() throws Exception; void commit() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index e991454..5b7d255 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.transaction.impl; import javax.transaction.xa.Xid; - import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -104,6 +103,11 @@ public class TransactionImpl implements Transaction { // Transaction implementation // ----------------------------------------------------------- + public boolean isEffective() { + return state == State.PREPARED || state == State.COMMITTED; + + } + public void setContainsPersistent() { containsPersistent = true; } @@ -142,6 +146,10 @@ public class TransactionImpl implements Transaction { storageManager.readLock(); try { synchronized (timeoutLock) { + if (isEffective()) { + ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has already been prepared or committed before, just ignoring the prepare call"); + return; + } if (state == State.ROLLBACK_ONLY) { if (exception != null) { // this TX will never be rolled back, @@ -197,6 +205,11 @@ public class TransactionImpl implements Transaction { public void commit(final boolean onePhase) throws Exception { synchronized (timeoutLock) { + if (state == State.COMMITTED) { + // I don't think this could happen, but just in case + ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been committed before, just ignoring the commit call"); + return; + } if (state == State.ROLLBACK_ONLY) { rollback(); @@ -248,15 +261,21 @@ public class TransactionImpl implements Transaction { */ protected void doCommit() throws Exception { if (containsPersistent || xid != null && state == State.PREPARED) { - + // ^^ These are the scenarios where we require a storage.commit + // for anything else we won't use the journal storageManager.commit(id); - - state = State.COMMITTED; } + + state = State.COMMITTED; } public void rollback() throws Exception { synchronized (timeoutLock) { + if (state == State.ROLLEDBACK) { + // I don't think this could happen, but just in case + ActiveMQServerLogger.LOGGER.debug("XID " + xid + " has been rolledBack before, just ignoring the rollback call", new Exception("trace")); + return; + } if (xid != null) { if (state != State.PREPARED && state != State.ACTIVE && state != State.ROLLBACK_ONLY) { throw new IllegalStateException("Transaction is in invalid state " + state); @@ -290,17 +309,21 @@ public class TransactionImpl implements Transaction { } public void suspend() { - if (state != State.ACTIVE) { - throw new IllegalStateException("Can only suspend active transaction"); + synchronized (timeoutLock) { + if (state != State.ACTIVE) { + throw new IllegalStateException("Can only suspend active transaction"); + } + state = State.SUSPENDED; } - state = State.SUSPENDED; } public void resume() { - if (state != State.SUSPENDED) { - throw new IllegalStateException("Can only resume a suspended transaction"); + synchronized (timeoutLock) { + if (state != State.SUSPENDED) { + throw new IllegalStateException("Can only resume a suspended transaction"); + } + state = State.ACTIVE; } - state = State.ACTIVE; } public Transaction.State getState() { @@ -316,12 +339,19 @@ public class TransactionImpl implements Transaction { } public void markAsRollbackOnly(final ActiveMQException exception1) { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { - ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only"); - } - state = State.ROLLBACK_ONLY; + synchronized (timeoutLock) { + if (isEffective()) { + ActiveMQServerLogger.LOGGER.debug("Trying to mark transaction " + this.id + " xid=" + this.xid + " as rollbackOnly but it was already effective (prepared or committed!)"); + return; + } - this.exception = exception1; + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { + ActiveMQServerLogger.LOGGER.debug("Marking Transaction " + this.id + " as rollback only"); + } + state = State.ROLLBACK_ONLY; + + this.exception = exception1; + } } public synchronized void addOperation(final TransactionOperation operation) { @@ -425,6 +455,7 @@ public class TransactionImpl implements Transaction { return "TransactionImpl [xid=" + xid + ", id=" + id + + ", xid=" + xid + ", state=" + state + ", createTime=" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java index c00985f..4629be3 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ConcurrentDeliveryCancelTest.java @@ -49,51 +49,40 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -/** This test will force two consumers to be waiting on close and introduce a race I saw in a production system */ +/** + * This test will force two consumers to be waiting on close and introduce a race I saw in a production system + */ @RunWith(BMUnitRunner.class) -public class ConcurrentDeliveryCancelTest extends JMSTestBase -{ +public class ConcurrentDeliveryCancelTest extends JMSTestBase { // used to wait the thread to align at the same place and create the race private static final ReusableLatch latchEnter = new ReusableLatch(2); // used to start private static final ReusableLatch latchFlag = new ReusableLatch(1); - public static void enterCancel() - { + public static void enterCancel() { latchEnter.countDown(); - try - { + try { latchFlag.await(); } - catch (Exception ignored) - { + catch (Exception ignored) { } } - public static void resetLatches(int numberOfThreads) - { + public static void resetLatches(int numberOfThreads) { latchEnter.setCount(numberOfThreads); latchFlag.setCount(1); } @Test - @BMRules - ( - rules = - { - @BMRule - ( - name = "enterCancel-holdThere", - targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl", - targetMethod = "close", - targetLocation = "ENTRY", - action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();" - ) - } - ) - public void testConcurrentCancels() throws Exception - { + @BMRules( + rules = {@BMRule( + name = "enterCancel-holdThere", + targetClass = "org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl", + targetMethod = "close", + targetLocation = "ENTRY", + action = "org.apache.activemq.artemis.tests.extras.byteman.ConcurrentDeliveryCancelTest.enterCancel();")}) + public void testConcurrentCancels() throws Exception { server.getAddressSettingsRepository().clear(); AddressSettings settings = new AddressSettings(); @@ -103,10 +92,8 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase cf.setReconnectAttempts(0); cf.setRetryInterval(10); - System.out.println("....."); - for (ServerSession srvSess : server.getSessions()) - { + for (ServerSession srvSess : server.getSessions()) { System.out.println(srvSess); } @@ -120,8 +107,7 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < numberOfMessages; i++) - { + for (int i = 0; i < numberOfMessages; i++) { TextMessage msg = session.createTextMessage("message " + i); msg.setIntProperty("i", i); producer.send(msg); @@ -131,24 +117,22 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase connection.close(); } - for (int i = 0; i < 100; i++) - { + for (int i = 0; i < 100; i++) { XAConnectionFactory xacf = ActiveMQJMSClient.createConnectionFactory("tcp://localhost:61616", "test"); final XAConnection connection = xacf.createXAConnection(); final XASession theSession = connection.createXASession(); - ((ActiveMQSession)theSession).getCoreSession().addMetaData("theSession", "true"); + ((ActiveMQSession) theSession).getCoreSession().addMetaData("theSession", "true"); connection.start(); final MessageConsumer consumer = theSession.createConsumer(queue); - XidImpl xid = newXID(); + XidImpl xid = newXID(); theSession.getXAResource().start(xid, XAResource.TMNOFLAGS); theSession.getXAResource().setTransactionTimeout(1); // I'm setting a small timeout just because I'm lazy to call end myself - for (int msg = 0; msg < 11; msg++) - { + for (int msg = 0; msg < 11; msg++) { Assert.assertNotNull(consumer.receiveNoWait()); } @@ -157,83 +141,68 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase final List serverSessions = new LinkedList(); // We will force now the failure simultaneously from several places - for (ServerSession srvSess : server.getSessions()) - { - if (srvSess.getMetaData("theSession") != null) - { + for (ServerSession srvSess : server.getSessions()) { + if (srvSess.getMetaData("theSession") != null) { System.out.println(srvSess); serverSessions.add(srvSess); } } - resetLatches(2); // from Transactional reaper List threads = new LinkedList(); - threads.add(new Thread("ConsumerCloser") - { - public void run() - { - try - { + threads.add(new Thread("ConsumerCloser") { + public void run() { + try { System.out.println(Thread.currentThread().getName() + " closing consumer"); consumer.close(); System.out.println(Thread.currentThread().getName() + " closed consumer"); } - catch (Exception e) - { + catch (Exception e) { e.printStackTrace(); } } }); - threads.add(new Thread("SessionCloser") - { - public void run() - { - for (ServerSession sess : serverSessions) - { + threads.add(new Thread("SessionCloser") { + public void run() { + for (ServerSession sess : serverSessions) { System.out.println("Thread " + Thread.currentThread().getName() + " starting"); - try - { + try { // A session.close could sneak in through failover or some other scenarios. // a call to RemotingConnection.fail wasn't replicating the issue. // I needed to call Session.close() directly to replicate what was happening in production sess.close(true); } - catch (Exception e) - { + catch (Exception e) { e.printStackTrace(); } System.out.println("Thread " + Thread.currentThread().getName() + " done"); } } }); -// -// consumer.close(); -// -// threads.add(new Thread("ClientFailing") -// { -// public void run() -// { -// ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession(); -// impl.getConnection().fail(new HornetQException("failure")); -// } -// }); -// - - - for (Thread t : threads) - { + // + // consumer.close(); + // + // threads.add(new Thread("ClientFailing") + // { + // public void run() + // { + // ClientSessionInternal impl = (ClientSessionInternal) ((HornetQSession)theSession).getCoreSession(); + // impl.getConnection().fail(new HornetQException("failure")); + // } + // }); + // + + for (Thread t : threads) { t.start(); } Assert.assertTrue(latchEnter.await(10, TimeUnit.MINUTES)); latchFlag.countDown(); - for (Thread t: threads) - { + for (Thread t : threads) { t.join(5000); Assert.assertFalse(t.isAlive()); } @@ -250,19 +219,16 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase MessageConsumer consumer = session.createConsumer(queue); HashMap mapCount = new HashMap(); - while (true) - { - TextMessage message = (TextMessage)consumer.receiveNoWait(); - if (message == null) - { + while (true) { + TextMessage message = (TextMessage) consumer.receiveNoWait(); + if (message == null) { break; } Integer value = message.getIntProperty("i"); AtomicInteger count = mapCount.get(value); - if (count == null) - { + if (count == null) { count = new AtomicInteger(0); mapCount.put(message.getIntProperty("i"), count); } @@ -271,16 +237,13 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase } boolean failed = false; - for (int i = 0; i < numberOfMessages; i++) - { + for (int i = 0; i < numberOfMessages; i++) { AtomicInteger count = mapCount.get(i); - if (count == null) - { + if (count == null) { System.out.println("Message " + i + " not received"); failed = true; } - else if (count.get() > 1) - { + else if (count.get() > 1) { System.out.println("Message " + i + " received " + count.get() + " times"); failed = true; } @@ -290,6 +253,5 @@ public class ConcurrentDeliveryCancelTest extends JMSTestBase connection.close(); - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index ce08f0e..f7f439d 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -1727,7 +1727,7 @@ public class JMSBridgeTest extends BridgeTestBase { @Test public void testSetTMClass() throws Exception { - TransactionManagerLocatorImpl.tm = new DummyTransactionManager(); + TransactionManagerLocatorImpl.setTransactionManager(new DummyTransactionManager()); JMSBridgeImpl bridge = null; try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java index c883f47..7523b98 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/TransactionManagerLocatorImpl.java @@ -23,14 +23,15 @@ import org.apache.activemq.artemis.service.extensions.transactions.TransactionMa public class TransactionManagerLocatorImpl implements TransactionManagerLocator { - public static TransactionManager tm = null; + private static TransactionManager tm = null; @Override public TransactionManager getTransactionManager() { + new Exception("trace").printStackTrace(); return tm; } - public void setTransactionManager(TransactionManager transactionManager) { + public static void setTransactionManager(TransactionManager transactionManager) { tm = transactionManager; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java new file mode 100644 index 0000000..739578e --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java @@ -0,0 +1,412 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.jms.ra; + +import javax.jms.Message; +import javax.resource.ResourceException; +import javax.resource.spi.UnavailableException; +import javax.resource.spi.endpoint.MessageEndpoint; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; +import java.lang.reflect.Method; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.arjuna.ats.arjuna.coordinator.TransactionReaper; +import com.arjuna.ats.arjuna.coordinator.TxControl; +import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.extras.jms.bridge.TransactionManagerLocatorImpl; +import org.apache.activemq.artemis.tests.integration.ra.ActiveMQRATestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simulates several messages being received over multiple instances with reconnects during the process. + */ +public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase { + + final ConcurrentHashMap mapCounter = new ConcurrentHashMap(); + + volatile ActiveMQResourceAdapter resourceAdapter; + + ServerLocator nettyLocator; + + @Before + public void setUp() throws Exception { + nettyLocator = createNettyNonHALocator(); + nettyLocator.setRetryInterval(10); + nettyLocator.setReconnectAttempts(-1); + mapCounter.clear(); + resourceAdapter = null; + super.setUp(); + createQueue(true, "outQueue"); + DummyTMLocator.startTM(); + } + + @After + public void tearDown() throws Exception { + DummyTMLocator.stopTM(); + super.tearDown(); + } + + protected boolean usePersistence() { + return true; + } + + @Override + public boolean useSecurity() { + return false; + } + + @Test + public void testReconnectMDBNoMessageLoss() throws Exception { + AddressSettings settings = new AddressSettings(); + settings.setRedeliveryDelay(1000); + settings.setMaxDeliveryAttempts(-1); + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", settings); + ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter(); + resourceAdapter = qResourceAdapter; + + // qResourceAdapter.setTransactionManagerLocatorClass(DummyTMLocator.class.getName()); + // qResourceAdapter.setTransactionManagerLocatorMethod("getTM"); + + MyBootstrapContext ctx = new MyBootstrapContext(); + + qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY); + qResourceAdapter.start(ctx); + + final int NUMBER_OF_SESSIONS = 10; + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + spec.setMaxSession(NUMBER_OF_SESSIONS); + spec.setTransactionTimeout(1); + spec.setReconnectAttempts(-1); + spec.setConfirmationWindowSize(-1); + spec.setReconnectInterval(1000); + spec.setCallTimeout(1000L); + spec.setResourceAdapter(qResourceAdapter); + spec.setUseJNDI(false); + spec.setDestinationType("javax.jms.Queue"); + spec.setDestination(MDBQUEUE); + spec.setConsumerWindowSize(1024 * 1024); + + TestEndpointFactory endpointFactory = new TestEndpointFactory(true); + qResourceAdapter.endpointActivation(endpointFactory, spec); + + Assert.assertEquals(1, resourceAdapter.getActivations().values().size()); + + final int NUMBER_OF_MESSAGES = 3000; + + Thread producer = new Thread() { + public void run() { + try { + ServerLocator locator = createInVMLocator(0); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(false, false); + + ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + + ClientMessage message = session.createMessage(true); + + message.getBodyBuffer().writeString("teststring " + i); + message.putIntProperty("i", i); + + clientProducer.send(message); + + if (i % 100 == 0) { + session.commit(); + } + } + session.commit(); + } + catch (Exception e) { + e.printStackTrace(); + } + + } + }; + + producer.start(); + + final AtomicBoolean metaDataFailed = new AtomicBoolean(false); + + // This thread will keep bugging the handlers. + // if they behave well with XA, the test pass! + final AtomicBoolean running = new AtomicBoolean(true); + + Thread buggerThread = new Thread() { + public void run() { + while (running.get()) { + try { + Thread.sleep(RandomUtil.randomInterval(100, 200)); + } + catch (InterruptedException intex) { + intex.printStackTrace(); + return; + } + + List serverSessions = new LinkedList<>(); + + for (ServerSession session : server.getSessions()) { + if (session.getMetaData("resource-adapter") != null) { + serverSessions.add(session); + } + } + + System.err.println("Contains " + serverSessions.size() + " RA sessions"); + + if (serverSessions.size() != NUMBER_OF_SESSIONS) { + System.err.println("the server was supposed to have " + NUMBER_OF_SESSIONS + " RA Sessions but it only contained accordingly to the meta-data"); + metaDataFailed.set(true); + } + else if (serverSessions.size() == NUMBER_OF_SESSIONS) { + // it became the same after some reconnect? which would be acceptable + metaDataFailed.set(false); + } + + if (serverSessions.size() > 0) { + + int randomBother = RandomUtil.randomInterval(0, serverSessions.size() - 1); + System.out.println("bugging session " + randomBother); + + RemotingConnection connection = serverSessions.get(randomBother).getRemotingConnection(); + + connection.fail(new ActiveMQException("failed at random " + randomBother)); + } + } + + } + }; + + buggerThread.start(); + + ServerLocator locator = createInVMLocator(0); + ClientSessionFactory factory = locator.createSessionFactory(); + ClientSession session = factory.createSession(false, false); + session.start(); + + ClientConsumer consumer = session.createConsumer("jms.queue.outQueue"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + ClientMessage message = consumer.receive(5000); + if (message == null) { + break; + } + + Assert.assertNotNull(message); + message.acknowledge(); + + Integer value = message.getIntProperty("i"); + AtomicInteger mapCount = new AtomicInteger(1); + + mapCount = mapCounter.putIfAbsent(value, mapCount); + + if (mapCount != null) { + mapCount.incrementAndGet(); + } + + if (i % 200 == 0) { + System.out.println("received " + i); + session.commit(); + } + } + + session.commit(); + Assert.assertNull(consumer.receiveImmediate()); + + boolean failed = false; + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AtomicInteger atomicInteger = mapCounter.get(Integer.valueOf(i)); + + if (atomicInteger == null) { + System.out.println("didn't receive message with i=" + i); + failed = true; + } + else if (atomicInteger.get() > 1) { + System.out.println("message with i=" + i + " received " + atomicInteger.get() + " times"); + failed = true; + } + } + + running.set(false); + + buggerThread.join(); + producer.join(); + + Assert.assertFalse("There was meta-data failures, some sessions didn't reconnect properly", metaDataFailed.get()); + + Assert.assertFalse(failed); + + System.out.println("Received " + NUMBER_OF_MESSAGES + " messages"); + + qResourceAdapter.stop(); + + session.close(); + } + + protected class TestEndpointFactory implements MessageEndpointFactory { + + private final boolean isDeliveryTransacted; + + public TestEndpointFactory(boolean deliveryTransacted) { + isDeliveryTransacted = deliveryTransacted; + } + + public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException { + TestEndpoint retEnd = new TestEndpoint(); + if (xaResource != null) { + retEnd.setXAResource(xaResource); + } + return retEnd; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return isDeliveryTransacted; + } + } + + public class TestEndpoint extends DummyMessageEndpoint { + + ClientSessionFactory factory; + ClientSession endpointSession; + ClientProducer producer; + + Transaction currentTX; + + public TestEndpoint() { + super(null); + try { + factory = nettyLocator.createSessionFactory(); + // buggingList.add(factory); + endpointSession = factory.createSession(true, false, false); + producer = endpointSession.createProducer("jms.queue.outQueue"); + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException { + super.beforeDelivery(method); + try { + DummyTMLocator.tm.begin(); + currentTX = DummyTMLocator.tm.getTransaction(); + currentTX.enlistResource(xaResource); + } + catch (Throwable e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public void onMessage(Message message) { + // try + // { + // System.out.println(Thread.currentThread().getName() + "**** onMessage enter " + message.getIntProperty("i")); + // } + // catch (Exception e) + // { + // } + + Integer value = 0; + + try { + value = message.getIntProperty("i"); + } + catch (Exception e) { + + } + + super.onMessage(message); + + try { + currentTX.enlistResource(endpointSession); + ClientMessage message1 = endpointSession.createMessage(true); + message1.putIntProperty("i", message.getIntProperty("i")); + producer.send(message1); + currentTX.delistResource(endpointSession, XAResource.TMSUCCESS); + } + catch (Exception e) { + e.printStackTrace(); + try { + currentTX.setRollbackOnly(); + } + catch (Exception ex) { + } + e.printStackTrace(); + // throw new RuntimeException(e); + } + } + + @Override + public void afterDelivery() throws ResourceException { + try { + DummyTMLocator.tm.commit(); + // currentTX.commit(); + } + catch (Throwable e) { + } + super.afterDelivery(); + } + } + + public static class DummyTMLocator { + + public static TransactionManagerImple tm; + + public static void stopTM() { + try { + TransactionManagerLocatorImpl.setTransactionManager(null); + TransactionReaper.terminate(true); + TxControl.disable(true); + } + catch (Exception e) { + e.printStackTrace(); + } + tm = null; + } + + public static void startTM() { + tm = new TransactionManagerImple(); + TransactionManagerLocatorImpl.setTransactionManager(tm); + TxControl.enable(); + } + + public TransactionManager getTM() { + return tm; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java index f5895c1..75dc976 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRATestBase.java @@ -138,7 +138,6 @@ public abstract class ActiveMQRATestBase extends JMSTestBase { public void onMessage(Message message) { lastMessage = (ActiveMQMessage) message; - System.err.println(message); } public void reset(CountDownLatch latch) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java index 7448393..3278a64 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java @@ -17,9 +17,14 @@ package org.apache.activemq.artemis.tests.integration.remoting; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.core.server.ServerSession; import org.junit.Test; import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -110,6 +115,97 @@ public class ReconnectTest extends ActiveMQTestBase { } @Test + public void testMetadataAfterReconnectionNetty() throws Exception { + internalMetadataAfterRetry(true); + } + + @Test + public void testMetadataAfterReconnectionInVM() throws Exception { + internalMetadataAfterRetry(false); + } + + public void internalMetadataAfterRetry(final boolean isNetty) throws Exception { + final int pingPeriod = 1000; + + ActiveMQServer server = createServer(false, isNetty); + + server.start(); + + ClientSessionInternal session = null; + + try { + for (int i = 0; i < 100; i++) { + ServerLocator locator = createFactory(isNetty); + locator.setClientFailureCheckPeriod(pingPeriod); + locator.setRetryInterval(1); + locator.setRetryIntervalMultiplier(1d); + locator.setReconnectAttempts(-1); + locator.setConfirmationWindowSize(-1); + ClientSessionFactory factory = createSessionFactory(locator); + + session = (ClientSessionInternal) factory.createSession(); + + session.addMetaData("meta1", "meta1"); + + ServerSession[] sessions = countMetadata(server, "meta1", 1); + Assert.assertEquals(1, sessions.length); + + final AtomicInteger count = new AtomicInteger(0); + + final CountDownLatch latch = new CountDownLatch(1); + + session.addFailoverListener(new FailoverEventListener() { + @Override + public void failoverEvent(FailoverEventType eventType) { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + } + }); + + sessions[0].getRemotingConnection().fail(new ActiveMQException("failure!")); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + sessions = countMetadata(server, "meta1", 1); + + Assert.assertEquals(1, sessions.length); + + locator.close(); + } + } + finally { + try { + session.close(); + } + catch (Throwable e) { + } + + server.stop(); + } + + } + + private ServerSession[] countMetadata(ActiveMQServer server, String parameter, int expected) throws Exception { + List sessionList = new LinkedList(); + + for (int i = 0; i < 10 && sessionList.size() != expected; i++) { + sessionList.clear(); + for (ServerSession sess : server.getSessions()) { + if (sess.getMetaData(parameter) != null) { + sessionList.add(sess); + } + } + + if (sessionList.size() != expected) { + Thread.sleep(100); + } + } + + return sessionList.toArray(new ServerSession[sessionList.size()]); + } + + @Test public void testInterruptReconnectNetty() throws Exception { internalTestInterruptReconnect(true, false); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java index d468617..cb4ba0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/JMSTestBase.java @@ -133,7 +133,9 @@ public class JMSTestBase extends ActiveMQTestBase { mbeanServer = MBeanServerFactory.createMBeanServer(); - Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()).addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + Configuration config = createDefaultConfig(true).setSecurityEnabled(useSecurity()). + addConnectorConfiguration("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)). + setTransactionTimeoutScanPeriod(100); server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, usePersistence())); jmsServer = new JMSServerManagerImpl(server); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java index d467616..45f443a 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java @@ -782,8 +782,7 @@ public class AcknowledgementTest extends JMSTestCase { messageReceived = (TextMessage)consumer.receiveNoWait(); - if (messageReceived != null) - { + if (messageReceived != null) { System.out.println("Message received " + messageReceived.getText()); } Assert.assertNull(messageReceived); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7bbd17cd/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index b79cb7f..3bfd628 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -108,6 +108,11 @@ public class BindingsImplTest extends ActiveMQTestBase { } + @Override + public boolean isEffective() { + return false; + } + public boolean hasTimedOut(long currentTime, int defaultTimeout) { return false; }