Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A3D0518CB1 for ; Sat, 19 Mar 2016 05:08:13 +0000 (UTC) Received: (qmail 40179 invoked by uid 500); 19 Mar 2016 05:08:10 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 40031 invoked by uid 500); 19 Mar 2016 05:08:10 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37305 invoked by uid 99); 19 Mar 2016 05:08:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Mar 2016 05:08:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EC630E00FF; Sat, 19 Mar 2016 05:08:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Sat, 19 Mar 2016 05:09:08 -0000 Message-Id: <26c1be4034e647349a609707a152f2a5@git.apache.org> In-Reply-To: <097bf1c283774e2ea2098fcd9461446c@git.apache.org> References: <097bf1c283774e2ea2098fcd9461446c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [62/67] [abbrv] activemq-artemis git commit: fix AMQ1924Test fix AMQ1924Test Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1205444 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1205444 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1205444 Branch: refs/heads/refactor-openwire Commit: f12054443c70378b965f6c2acc80ec3580afcbf1 Parents: d8b5431 Author: Howard Gao Authored: Tue Mar 15 23:11:39 2016 +0800 Committer: Clebert Suconic Committed: Sat Mar 19 01:07:37 2016 -0400 ---------------------------------------------------------------------- .../openwire/OpenWireProtocolManager.java | 5 ++ .../core/server/impl/ServerConsumerImpl.java | 64 +++++++++++++++----- .../transport/failover/AMQ1925Test.java | 54 +++++++++-------- 3 files changed, 82 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1205444/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index abfcca5..3cb1215 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -488,6 +488,11 @@ public class OpenWireProtocolManager implements ProtocolManager, Cl if (txSession != null) { txSession.rollback(info); } + else if (info.getTransactionId().isLocalTransaction()) { + //during a broker restart, recovered local transaction may not be registered + //in that case we ignore and let the tx removed silently by connection. + //see AMQ1925Test.testAMQ1925_TXBegin + } else { throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1205444/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b2ca0df..cb2cd38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private Object protocolContext; + private final ActiveMQServer server; + private SlowConsumerDetectionListener slowConsumerListener; /** @@ -153,8 +155,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final SessionCallback callback, final boolean preAcknowledge, final boolean strictUpdateDeliveryCount, - final ManagementService managementService) throws Exception { - this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null); + final ManagementService managementService, + final ActiveMQServer server) throws Exception { + this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, true, null, server); } public ServerConsumerImpl(final long id, @@ -169,7 +172,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final boolean strictUpdateDeliveryCount, final ManagementService managementService, final boolean supportLargeMessage, - final Integer credits) throws Exception { + final Integer credits, + final ActiveMQServer server) throws Exception { this.id = id; this.filter = filter; @@ -214,6 +218,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { availableCredits.set(credits); } } + + this.server = server; } @Override @@ -398,7 +404,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } finally { lockDelivery.readLock().unlock(); + callback.afterDelivery(); } + } @Override @@ -583,12 +591,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { synchronized (lock) { - lockDelivery.writeLock().lock(); + boolean locked = lockDelivery(); + + // This is to make sure nothing would sneak to the client while started = false + // the client will stop the session and perform a rollback in certain cases. + // in case something sneaks to the client you could get to messaging delivering forever until + // you restart the server try { this.started = browseOnly || started; } finally { - lockDelivery.writeLock().unlock(); + if (locked) { + lockDelivery.writeLock().unlock(); + } } } @@ -598,21 +613,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } - @Override - public void setTransferring(final boolean transferring) { - synchronized (lock) { - this.transferring = transferring; - } - - // This is to make sure that the delivery process has finished any pending delivery - // otherwise a message may sneak in on the client while we are trying to stop the consumer + private boolean lockDelivery() { try { - lockDelivery.writeLock().lock(); + if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { + ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); + if (server != null) { + server.threadDump(); + } + return false; + } + return true; } - finally { - lockDelivery.writeLock().unlock(); + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return false; } + } + @Override + public void setTransferring(final boolean transferring) { + synchronized (lock) { + // This is to make sure that the delivery process has finished any pending delivery + // otherwise a message may sneak in on the client while we are trying to stop the consumer + boolean locked = lockDelivery(); + try { + this.transferring = transferring; + } + finally { + if (locked) { + lockDelivery.writeLock().unlock(); + } + } + } // Outside the lock if (transferring) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1205444/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java index 3d75905..564fd86 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java @@ -33,8 +33,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.api.core.SimpleString; @@ -235,32 +233,39 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); boolean restartDone = false; - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message message = consumer.receive(5000); - Assert.assertNotNull(message); + try { + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); - if (i == 222 && !restartDone) { - // Simulate broker failure & restart - bs.stop(); - bs = createNewServer(); - bs.start(); - restartDone = true; - } + if (i == 222 && !restartDone) { + // Simulate broker failure & restart + bs.stop(); + bs = createNewServer(); + bs.start(); + restartDone = true; + } - Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); - try { - session.commit(); - } - catch (TransactionRolledBackException expectedOnOccasion) { - log.info("got rollback: " + expectedOnOccasion); - i--; + Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER)); + try { + session.commit(); + } + catch (TransactionRolledBackException expectedOnOccasion) { + log.info("got rollback: " + expectedOnOccasion); + i--; + } } + Assert.assertNull(consumer.receive(500)); + } + catch (Exception eee) { + log.error("got exception", eee); + throw eee; + } + finally { + consumer.close(); + session.close(); + connection.close(); } - Assert.assertNull(consumer.receive(500)); - - consumer.close(); - session.close(); - connection.close(); assertQueueEmpty(); Assert.assertNull("no exception on connection listener: " + exception, exception); @@ -368,7 +373,6 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis } catch (Exception e) { log.error(e); } - } public void onException(JMSException exception) {