activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r995119 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/transaction/ test/java/org/apache/activemq/store/jdbc/
Date Wed, 08 Sep 2010 16:27:45 GMT
Author: gtully
Date: Wed Sep  8 16:27:44 2010
New Revision: 995119

URL: http://svn.apache.org/viewvc?rev=995119&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2880 - variant of patch applied with
test case, added one and two phase variants, context needed to deal with failure as did xa
transaction

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=995119&r1=995118&r2=995119&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Wed Sep  8 16:27:44 2010
@@ -520,6 +520,19 @@ public class TransactionContext implemen
             }
 
         } catch (JMSException e) {
+            LOG.warn("commit of: " + x + " failed with: " + e, e);
+            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
+            if (l != null && !l.isEmpty()) {
+                for (TransactionContext ctx : l) {
+                    try {
+                        ctx.afterRollback();
+                    } catch (Throwable ignored) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("failed to firing afterRollback callbacks commit failure,
txid: " + x + ", context: " + ctx, ignored);
+                        }
+                    }
+                }
+            }
             throw toXAException(e);
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=995119&r1=995118&r2=995119&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
Wed Sep  8 16:27:44 2010
@@ -63,21 +63,36 @@ public class XATransaction extends Trans
             checkForPreparedState(onePhase);
             doPrePrepare();
             setStateFinished();
-            transactionStore.commit(getTransactionId(), false, preCommitTask,postCommitTask);
-            waitPostCommitDone(postCommitTask);
+            storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
             break;
         case PREPARED_STATE:
             // 2 phase commit, work done.
             // We would record commit here.
             setStateFinished();
-            transactionStore.commit(getTransactionId(), true, preCommitTask,postCommitTask);
-            waitPostCommitDone(postCommitTask);
+            storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
             break;
         default:
             illegalStateTransition("commit");
         }
     }
 
+    private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable
postCommit)
+            throws XAException, IOException {
+        try {
+            transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
+            waitPostCommitDone(postCommitTask);
+        } catch (XAException xae) {
+            throw xae;
+        } catch (Throwable t) {
+            LOG.warn("Store COMMIT FAILED: ", t);
+            rollback();
+            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
+            xae.errorCode = XAException.XA_RBOTHER;
+            xae.initCause(t);
+            throw xae;
+        }
+    }
+
     private void illegalStateTransition(String callName) throws XAException {
         XAException xae = new XAException("Cannot call " + callName + " now.");
         xae.errorCode = XAException.XAER_PROTO;
@@ -131,6 +146,11 @@ public class XATransaction extends Trans
             transactionStore.rollback(getTransactionId());
             doPostRollback();
             break;
+        case FINISHED_STATE:
+            // failure to commit
+            transactionStore.rollback(getTransactionId());
+            doPostRollback();
+            break;
         default:
             throw new XAException("Invalid state");
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java?rev=995119&r1=995118&r2=995119&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
Wed Sep  8 16:27:44 2010
@@ -48,19 +48,30 @@ public class JDBCCommitExceptionTest ext
 
     private static final Log LOG = LogFactory.getLog(JDBCCommitExceptionTest.class);
 
-    private static final int messagesExpected = 10;
-    private ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+    protected static final int messagesExpected = 10;
+    protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
             "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);

-    private BrokerService broker;
-    private EmbeddedDataSource dataSource;
-    private java.sql.Connection dbConnection;
-    private MyPersistenceAdapter jdbc;
+    protected BrokerService broker;
+    protected EmbeddedDataSource dataSource;
+    protected java.sql.Connection dbConnection;
+    protected BrokenPersistenceAdapter jdbc;
 
-    public void testSqlException() throws Exception {
+
+    public void setUp() throws Exception {
         broker = createBroker();
         broker.start();
+    }
 
 
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    public void testSqlException() throws Exception {
+        doTestSqlException();
+    }
+
+    public void doTestSqlException() throws Exception {
         sendMessages(messagesExpected);
         int messagesReceived = receiveMessages(messagesExpected);
 
@@ -144,10 +155,9 @@ public class JDBCCommitExceptionTest ext
     }
 
     protected BrokerService createBroker() throws Exception {
-        Properties p = System.getProperties();
 
         BrokerService broker = new BrokerService();
-        jdbc = new MyPersistenceAdapter();
+        jdbc = new BrokenPersistenceAdapter();
 
         dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("target/derbyDb");
@@ -164,27 +174,6 @@ public class JDBCCommitExceptionTest ext
         return broker;
     }
 
-    class MyPersistenceAdapter extends JDBCPersistenceAdapter {
-
-        private  final Log LOG = LogFactory.getLog(MyPersistenceAdapter.class);
-
-        private boolean shouldBreak = false;
-
-        @Override
-        public void commitTransaction(ConnectionContext context) throws IOException {
-            if ( shouldBreak ) {
-                LOG.warn("Throwing exception on purpose");
-                throw new IOException("Breaking on purpose");
-            }
-            LOG.debug("in commitTransaction");
-            super.commitTransaction(context);
-        }
-
-        public void setShouldBreak(boolean shouldBreak) {
-            this.shouldBreak = shouldBreak;
-        }
-    }
-
 }
 
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java?rev=995119&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
Wed Sep  8 16:27:44 2010
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.jdbc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+// https://issues.apache.org/activemq/browse/AMQ-2880
+public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
+    private static final Log LOG = LogFactory.getLog(JDBCXACommitExceptionTest.class);
+
+    private long txGenerator = System.currentTimeMillis();
+
+    protected ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(
+            "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);

+
+    boolean onePhase = true;
+
+    public void testTwoPhaseSqlException() throws Exception {
+        onePhase = false;
+        doTestSqlException();
+    }
+
+    @Override
+    protected int receiveMessages(int messagesExpected) throws Exception {
+        XAConnection connection = factory.createXAConnection();
+        connection.start();
+        XASession session = connection.createXASession();
+
+        jdbc.setShouldBreak(true);
+
+        // first try and receive these messages, they'll continually fail
+        receiveMessages(messagesExpected, session, onePhase);
+
+        jdbc.setShouldBreak(false);
+
+        // now that the store is sane, try and get all the messages sent
+        return receiveMessages(messagesExpected, session, onePhase);
+    }
+
+    protected int receiveMessages(int messagesExpected, XASession session, boolean onePhase)
throws Exception {
+        int messagesReceived = 0;
+
+        for (int i=0; i<messagesExpected; i++) {
+            Destination destination = session.createQueue("TEST");
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            XAResource resource = session.getXAResource();
+            resource.recover(XAResource.TMSTARTRSCAN);
+            resource.recover(XAResource.TMNOFLAGS);
+
+            Xid tid = createXid();
+
+            Message message = null;
+            try {
+                LOG.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
+                resource.start(tid, XAResource.TMNOFLAGS);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                resource.end(tid, XAResource.TMSUCCESS);
+                if (message != null) {
+                    if (onePhase) {
+                        resource.commit(tid, true);
+                    } else {
+                        resource.prepare(tid);
+                        resource.commit(tid, false);
+                    }
+                    messagesReceived++;
+                }
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+
+                try {
+                    LOG.debug("Rolling back transaction (just in case, no need to do this
as it is implicit in a commit failure) " + tid);
+                    resource.rollback(tid);
+                }
+                catch (XAException ex) {
+                    try {
+                        LOG.debug("Caught exception during rollback: " + ex + " forgetting
transaction " + tid);
+                        resource.forget(tid);
+                    }
+                    catch (XAException ex1) {
+                        LOG.debug("rollback/forget failed: " + ex1.errorCode);
+                    }
+                }
+            } finally {
+                if (consumer != null) {
+                    consumer.close();
+                }
+            }
+        }
+        return messagesReceived;
+    }
+
+    public Xid createXid() throws IOException {
+        
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+
+    }
+
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message