Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE72210D4D for ; Wed, 26 Mar 2014 16:20:38 +0000 (UTC) Received: (qmail 13198 invoked by uid 500); 26 Mar 2014 16:20:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 13111 invoked by uid 500); 26 Mar 2014 16:20:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 13104 invoked by uid 99); 26 Mar 2014 16:20:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Mar 2014 16:20:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A0575949E94; Wed, 26 Mar 2014 16:20:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback exception the client due to indoubt commit Date: Wed, 26 Mar 2014 16:20:33 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 75eb814ca -> 7a0168a4f https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback exception the client due to indoubt commit Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a0168a4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a0168a4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a0168a4 Branch: refs/heads/trunk Commit: 7a0168a4f5368183d912c3e94bfe98bb59cb9a74 Parents: 75eb814 Author: gtully Authored: Wed Mar 26 16:19:46 2014 +0000 Committer: gtully Committed: Wed Mar 26 16:19:46 2014 +0000 ---------------------------------------------------------------------- .../activemq/store/jdbc/TransactionContext.java | 41 ++++--- .../org/apache/activemq/bugs/AMQ4636Test.java | 117 ++++++++++++++----- 2 files changed, 114 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7a0168a4/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java index 6a933b0..8b4ac97 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java @@ -177,10 +177,12 @@ public class TransactionContext { } } catch (SQLException e) { JDBCPersistenceAdapter.log("Commit failed: ", e); - - this.rollback(); - - throw IOExceptionSupport.create(e); + try { + doRollback(); + } catch (Exception ignored) {} + IOException ioe = IOExceptionSupport.create(e); + persistenceAdapter.getBrokerService().handleIOException(ioe); + throw ioe; } finally { inTx = false; close(); @@ -192,20 +194,7 @@ public class TransactionContext { throw new IOException("Not started."); } try { - if (addMessageStatement != null) { - addMessageStatement.close(); - addMessageStatement = null; - } - if (removedMessageStatement != null) { - removedMessageStatement.close(); - removedMessageStatement = null; - } - if (updateLastAckStatement != null) { - updateLastAckStatement.close(); - updateLastAckStatement = null; - } - connection.rollback(); - + doRollback(); } catch (SQLException e) { JDBCPersistenceAdapter.log("Rollback failed: ", e); throw IOExceptionSupport.create(e); @@ -215,6 +204,22 @@ public class TransactionContext { } } + private void doRollback() throws SQLException { + if (addMessageStatement != null) { + addMessageStatement.close(); + addMessageStatement = null; + } + if (removedMessageStatement != null) { + removedMessageStatement.close(); + removedMessageStatement = null; + } + if (updateLastAckStatement != null) { + updateLastAckStatement.close(); + updateLastAckStatement = null; + } + connection.rollback(); + } + public PreparedStatement getAddMessageStatement() { return addMessageStatement; } http://git-wip-us.apache.org/repos/asf/activemq/blob/7a0168a4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java index 419bfed..4373d49 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java @@ -18,6 +18,7 @@ package org.apache.activemq.bugs; import java.io.IOException; import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -27,9 +28,10 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; -import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.store.jdbc.DataSourceServiceSupport; import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; @@ -37,8 +39,12 @@ import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; import org.apache.activemq.store.jdbc.TransactionContext; import org.apache.activemq.util.IOHelper; import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.fail; /** * Testing how the broker reacts when a SQL Exception is thrown from @@ -46,35 +52,66 @@ import org.slf4j.LoggerFactory; *

* see https://issues.apache.org/jira/browse/AMQ-4636 */ - -public class AMQ4636Test extends TestCase { +public class AMQ4636Test { private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC"; private static final Logger LOG = LoggerFactory .getLogger(AMQ4636Test.class); private String transportUrl = "tcp://0.0.0.0:0"; private BrokerService broker; - private TestTransactionContext testTransactionContext; + EmbeddedDataSource embeddedDataSource; + CountDownLatch throwSQLException = new CountDownLatch(0); + + @Before + public void startBroker() throws Exception { + broker = createBroker(); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + LOG.info("Broker started..."); + } - protected BrokerService createBroker(boolean withJMX) throws Exception { - BrokerService broker = new BrokerService(); + @After + public void stopBroker() throws Exception { + if (broker != null) { + LOG.info("Stopping broker..."); + broker.stop(); + broker.waitUntilStopped(); + } + try { + if (embeddedDataSource != null) { + // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup + embeddedDataSource.setShutdownDatabase("shutdown"); + embeddedDataSource.getConnection(); + } + } catch (Exception ignored) { + } finally { + embeddedDataSource.setShutdownDatabase(null); + } + } - broker.setUseJmx(withJMX); + protected BrokerService createBroker() throws Exception { - EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); embeddedDataSource.setCreateDatabase("create"); + embeddedDataSource.getConnection().close(); //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() // method that can be configured to throw a SQL exception on demand JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); jdbc.setDataSource(embeddedDataSource); - testTransactionContext = new TestTransactionContext(jdbc); jdbc.setLockKeepAlivePeriod(1000l); LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); jdbc.setLocker(leaseDatabaseLocker); + broker = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); broker.setPersistenceAdapter(jdbc); broker.setIoExceptionHandler(new JDBCIOExceptionHandler()); @@ -90,15 +127,21 @@ public class AMQ4636Test extends TestCase { * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the * message. SQLException should NOT be returned to client */ - + @Test public void testProducerWithDBShutdown() throws Exception { - broker = this.createBroker(false); - broker.deleteAllMessages(); - broker.start(); - broker.waitUntilStarted(); + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false); - LOG.info("***Broker started..."); + } + + @Test + public void testTransactedProducerCommitWithDBShutdown() throws Exception { // failover but timeout in 1 seconds so the test does not hang String failoverTransportURL = "failover:(" + transportUrl @@ -106,8 +149,24 @@ public class AMQ4636Test extends TestCase { this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); - this.sendMessage(MY_TEST_TOPIC, failoverTransportURL); + try { + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true); + fail("Expect rollback after failover - inddoubt commit"); + } catch (javax.jms.TransactionRolledBackException expectedInDoubt) { + LOG.info("Got rollback after failover failed commit", expectedInDoubt); + } + } + + @Test + public void testTransactedProducerRollbackWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false); } public void createDurableConsumer(String topic, @@ -135,7 +194,7 @@ public class AMQ4636Test extends TestCase { } } - public void sendMessage(String topic, String transportURL) + public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) throws JMSException { Connection connection = null; @@ -145,8 +204,8 @@ public class AMQ4636Test extends TestCase { transportURL); connection = factory.createConnection(); - Session session = connection.createSession(false, - Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(transacted, + transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(topic); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -155,9 +214,17 @@ public class AMQ4636Test extends TestCase { LOG.info("*** send message to broker..."); // trigger SQL exception in transactionContext - testTransactionContext.throwSQLException = true; + throwSQLException = new CountDownLatch(1); producer.send(m); + if (transacted) { + if (commit) { + session.commit(); + } else { + session.rollback(); + } + } + LOG.info("*** Finished send message to broker"); } finally { @@ -174,29 +241,27 @@ public class AMQ4636Test extends TestCase { public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { public TransactionContext getTransactionContext() throws IOException { - return testTransactionContext; + return new TestTransactionContext(this); } } public class TestTransactionContext extends TransactionContext { - public boolean throwSQLException; - public TestTransactionContext( JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException { super(jdbcPersistenceAdapter); } + @Override public void executeBatch() throws SQLException { - if (throwSQLException) { + if (throwSQLException.getCount() > 0) { // only throw exception once - throwSQLException = false; + throwSQLException.countDown(); throw new SQLException("TEST SQL EXCEPTION"); } super.executeBatch(); } - } } \ No newline at end of file