activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback exception the client due to indoubt commit
Date Wed, 26 Mar 2014 16:20:33 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 75eb814ca -> 7a0168a4f


https://issues.apache.org/jira/browse/AMQ-4636 - tidy up commit failure case to redirect via
IOExceptionHandler - failover still suppressed the commit on recovery - resulting in rollback
exception the client due to indoubt commit


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a0168a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a0168a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a0168a4

Branch: refs/heads/trunk
Commit: 7a0168a4f5368183d912c3e94bfe98bb59cb9a74
Parents: 75eb814
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 26 16:19:46 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 26 16:19:46 2014 +0000

----------------------------------------------------------------------
 .../activemq/store/jdbc/TransactionContext.java |  41 ++++---
 .../org/apache/activemq/bugs/AMQ4636Test.java   | 117 ++++++++++++++-----
 2 files changed, 114 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7a0168a4/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index 6a933b0..8b4ac97 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -177,10 +177,12 @@ public class TransactionContext {
             }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("Commit failed: ", e);
-            
-            this.rollback(); 
-            
-            throw IOExceptionSupport.create(e);
+            try {
+                doRollback();
+            } catch (Exception ignored) {}
+            IOException ioe = IOExceptionSupport.create(e);
+            persistenceAdapter.getBrokerService().handleIOException(ioe);
+            throw ioe;
         } finally {
             inTx = false;
             close();
@@ -192,20 +194,7 @@ public class TransactionContext {
             throw new IOException("Not started.");
         }
         try {
-            if (addMessageStatement != null) {
-                addMessageStatement.close();
-                addMessageStatement = null;
-            }
-            if (removedMessageStatement != null) {
-                removedMessageStatement.close();
-                removedMessageStatement = null;
-            }
-            if (updateLastAckStatement != null) {
-                updateLastAckStatement.close();
-                updateLastAckStatement = null;
-            }
-            connection.rollback();
-
+            doRollback();
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("Rollback failed: ", e);
             throw IOExceptionSupport.create(e);
@@ -215,6 +204,22 @@ public class TransactionContext {
         }
     }
 
+    private void doRollback() throws SQLException {
+        if (addMessageStatement != null) {
+            addMessageStatement.close();
+            addMessageStatement = null;
+        }
+        if (removedMessageStatement != null) {
+            removedMessageStatement.close();
+            removedMessageStatement = null;
+        }
+        if (updateLastAckStatement != null) {
+            updateLastAckStatement.close();
+            updateLastAckStatement = null;
+        }
+        connection.rollback();
+    }
+
     public PreparedStatement getAddMessageStatement() {
         return addMessageStatement;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7a0168a4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
index 419bfed..4373d49 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
@@ -18,6 +18,7 @@ package org.apache.activemq.bugs;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -27,9 +28,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
-import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
 import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
@@ -37,8 +39,12 @@ import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
 import org.apache.activemq.store.jdbc.TransactionContext;
 import org.apache.activemq.util.IOHelper;
 import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.junit.Assert.fail;
 
 /**
  * Testing how the broker reacts when a SQL Exception is thrown from
@@ -46,35 +52,66 @@ import org.slf4j.LoggerFactory;
  * <p/>
  * see https://issues.apache.org/jira/browse/AMQ-4636
  */
-
-public class AMQ4636Test extends TestCase {
+public class AMQ4636Test {
 
     private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
     private static final Logger LOG = LoggerFactory
             .getLogger(AMQ4636Test.class);
     private String transportUrl = "tcp://0.0.0.0:0";
     private BrokerService broker;
-    private TestTransactionContext testTransactionContext;
+    EmbeddedDataSource embeddedDataSource;
+    CountDownLatch throwSQLException = new CountDownLatch(0);
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = createBroker();
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+        LOG.info("Broker started...");
+    }
 
-    protected BrokerService createBroker(boolean withJMX) throws Exception {
-        BrokerService broker = new BrokerService();
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            LOG.info("Stopping broker...");
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        try {
+            if (embeddedDataSource != null) {
+                // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup
+                embeddedDataSource.setShutdownDatabase("shutdown");
+                embeddedDataSource.getConnection();
+            }
+        } catch (Exception ignored) {
+        } finally {
+            embeddedDataSource.setShutdownDatabase(null);
+        }
+    }
 
-        broker.setUseJmx(withJMX);
+    protected BrokerService createBroker() throws Exception {
 
-        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+        embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
         embeddedDataSource.setCreateDatabase("create");
+        embeddedDataSource.getConnection().close();
 
         //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
         // method that can be configured to throw a SQL exception on demand
         JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
         jdbc.setDataSource(embeddedDataSource);
-        testTransactionContext = new TestTransactionContext(jdbc);
 
         jdbc.setLockKeepAlivePeriod(1000l);
         LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
         leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
         jdbc.setLocker(leaseDatabaseLocker);
 
+        broker = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
         broker.setPersistenceAdapter(jdbc);
 
         broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
@@ -90,15 +127,21 @@ public class AMQ4636Test extends TestCase {
      * Expectation: SQLException triggers a connection shutdown and failover should kick
and try to redeliver the
      * message. SQLException should NOT be returned to client
      */
-
+    @Test
     public void testProducerWithDBShutdown() throws Exception {
 
-        broker = this.createBroker(false);
-        broker.deleteAllMessages();
-        broker.start();
-        broker.waitUntilStarted();
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
+
+        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false);
 
-        LOG.info("***Broker started...");
+    }
+
+    @Test
+    public void testTransactedProducerCommitWithDBShutdown() throws Exception {
 
         // failover but timeout in 1 seconds so the test does not hang
         String failoverTransportURL = "failover:(" + transportUrl
@@ -106,8 +149,24 @@ public class AMQ4636Test extends TestCase {
 
         this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
 
-        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL);
+        try {
+            this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true);
+            fail("Expect rollback after failover - inddoubt commit");
+        } catch (javax.jms.TransactionRolledBackException expectedInDoubt) {
+            LOG.info("Got rollback after failover failed commit", expectedInDoubt);
+        }
+    }
+
+    @Test
+    public void testTransactedProducerRollbackWithDBShutdown() throws Exception {
+
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
 
+        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false);
     }
 
     public void createDurableConsumer(String topic,
@@ -135,7 +194,7 @@ public class AMQ4636Test extends TestCase {
         }
     }
 
-    public void sendMessage(String topic, String transportURL)
+    public void sendMessage(String topic, String transportURL, boolean transacted, boolean
commit)
             throws JMSException {
         Connection connection = null;
 
@@ -145,8 +204,8 @@ public class AMQ4636Test extends TestCase {
                     transportURL);
 
             connection = factory.createConnection();
-            Session session = connection.createSession(false,
-                    Session.AUTO_ACKNOWLEDGE);
+            Session session = connection.createSession(transacted,
+                    transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createTopic(topic);
             MessageProducer producer = session.createProducer(destination);
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -155,9 +214,17 @@ public class AMQ4636Test extends TestCase {
             LOG.info("*** send message to broker...");
 
             // trigger SQL exception in transactionContext
-            testTransactionContext.throwSQLException = true;
+            throwSQLException = new CountDownLatch(1);
             producer.send(m);
 
+            if (transacted) {
+                if (commit) {
+                    session.commit();
+                } else {
+                    session.rollback();
+                }
+            }
+
             LOG.info("*** Finished send message to broker");
 
         } finally {
@@ -174,29 +241,27 @@ public class AMQ4636Test extends TestCase {
     public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
 
         public TransactionContext getTransactionContext() throws IOException {
-            return testTransactionContext;
+            return new TestTransactionContext(this);
         }
     }
 
     public class TestTransactionContext extends TransactionContext {
 
-        public boolean throwSQLException;
-
         public TestTransactionContext(
                 JDBCPersistenceAdapter jdbcPersistenceAdapter)
                 throws IOException {
             super(jdbcPersistenceAdapter);
         }
 
+        @Override
         public void executeBatch() throws SQLException {
-            if (throwSQLException) {
+            if (throwSQLException.getCount() > 0) {
                 // only throw exception once
-                throwSQLException = false;
+                throwSQLException.countDown();
                 throw new SQLException("TEST SQL EXCEPTION");
             }
             super.executeBatch();
         }
-
     }
 
 }
\ No newline at end of file


Mime
View raw message