Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BBCD8200BCC for ; Tue, 29 Nov 2016 13:05:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BABBD160B15; Tue, 29 Nov 2016 12:05:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC767160B05 for ; Tue, 29 Nov 2016 13:05:14 +0100 (CET) Received: (qmail 11247 invoked by uid 500); 29 Nov 2016 12:05:14 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 11238 invoked by uid 99); 29 Nov 2016 12:05:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2016 12:05:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0FE3E02AB; Tue, 29 Nov 2016 12:05:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: <7dfc6679810d4c36b54e46df8ac2421e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: [no jira] unit test that exercises failover with xa and missing replys Date: Tue, 29 Nov 2016 12:05:13 +0000 (UTC) archived-at: Tue, 29 Nov 2016 12:05:15 -0000 Repository: activemq Updated Branches: refs/heads/master dad629e88 -> cf57559f1 [no jira] unit test that exercises failover with xa and missing replys Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cf57559f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cf57559f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cf57559f Branch: refs/heads/master Commit: cf57559f1c95bdcf8f58937eb883111887d31192 Parents: dad629e Author: gtully Authored: Tue Nov 29 12:04:24 2016 +0000 Committer: gtully Committed: Tue Nov 29 12:04:24 2016 +0000 ---------------------------------------------------------------------- .../failover/FailoverXATransactionTest.java | 220 +++++++++++++++++++ 1 file changed, 220 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/cf57559f/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java new file mode 100644 index 0000000..bce5e09 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverXATransactionTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.util.TestUtils; +import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; + +public class FailoverXATransactionTest { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverXATransactionTest.class); + private static final String QUEUE_NAME = "Failover.WithXaTx"; + private static final String TRANSPORT_URI = "tcp://localhost:0"; + private String url; + BrokerService broker; + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup); + broker.start(); + } + + public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = createBroker(deleteAllMessagesOnStartup, bindAddress); + broker.start(); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); + } + + public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { + broker = new BrokerService(); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.addConnector(bindAddress); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUsePrefetchExtension(false); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + + url = broker.getTransportConnectors().get(0).getConnectUri().toString(); + + return broker; + } + + @org.junit.Test + public void testFailoverSendPrepareReplyLost() throws Exception { + + broker = createBroker(true); + + final AtomicBoolean first = new AtomicBoolean(false); + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public int prepareTransaction(final ConnectionContext context, + TransactionId xid) throws Exception { + int result = super.prepareTransaction(context, xid); + if (first.compareAndSet(false, true)) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker on prepare"); + try { + context.getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + return result; + } + } + }); + broker.start(); + + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + url + ")"); + XAConnection connection = cf.createXAConnection(); + connection.start(); + final XASession session = connection.createXASession(); + Queue destination = session.createQueue(QUEUE_NAME); + + Xid xid = TestUtils.createXid(); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + produceMessage(session, destination); + session.getXAResource().end(xid, XAResource.TMSUCCESS); + + try { + session.getXAResource().prepare(xid); + } catch (Exception expected) { + expected.printStackTrace(); + } + + try { + session.getXAResource().rollback(xid); + } catch (Exception expected) { + expected.printStackTrace(); + } + + connection.close(); + + assertEquals(0, broker.getAdminView().getTotalMessageCount()); + } + + @org.junit.Test + public void testFailoverSendCommitReplyLost() throws Exception { + + broker = createBroker(true); + + final AtomicBoolean first = new AtomicBoolean(false); + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void commitTransaction(final ConnectionContext context, + TransactionId xid, boolean onePhase) throws Exception { + super.commitTransaction(context, xid, onePhase); + if (first.compareAndSet(false, true)) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker on prepare"); + try { + context.getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + broker.start(); + + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + url + ")"); + XAConnection connection = cf.createXAConnection(); + connection.start(); + final XASession session = connection.createXASession(); + Queue destination = session.createQueue(QUEUE_NAME); + + Xid xid = TestUtils.createXid(); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + produceMessage(session, destination); + session.getXAResource().end(xid, XAResource.TMSUCCESS); + + try { + session.getXAResource().prepare(xid); + } catch (Exception expected) { + expected.printStackTrace(); + } + + try { + session.getXAResource().commit(xid, false); + } catch (Exception expected) { + expected.printStackTrace(); + } + + connection.close(); + + assertEquals(1, broker.getAdminView().getTotalMessageCount()); + } + + private void produceMessage(final Session producerSession, Queue destination) + throws JMSException { + MessageProducer producer = producerSession.createProducer(destination); + TextMessage message = producerSession.createTextMessage("Test message"); + producer.send(message); + producer.close(); + } + +}