activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4426 - default to client ack, so that if a tx aborts we never auto ack or start a local transaction
Date Thu, 19 Sep 2013 15:27:52 GMT
Updated Branches:
  refs/heads/trunk f6f22df4f -> c38752221


https://issues.apache.org/jira/browse/AMQ-4426 - default to client ack, so that if a tx aborts
we never auto ack or start a local transaction


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c3875222
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c3875222
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c3875222

Branch: refs/heads/trunk
Commit: c387522217da4921d3274ff4f1ab93277dface6f
Parents: f6f22df
Author: gtully <gary.tully@gmail.com>
Authored: Thu Sep 19 16:26:07 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Sep 19 16:26:52 2013 +0100

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  2 +-
 .../org/apache/activemq/TransactionContext.java | 19 +++-
 .../apache/activemq/pool/XaConnectionPool.java  |  5 +-
 .../activemq/pool/XAConnectionPoolTest.java     | 98 ++++++++++++++++++++
 4 files changed, 116 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c3875222/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index aee663a..baf5233 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -820,7 +820,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer,
StatsC
                 for (MessageDispatch old : list) {
                     // ensure we don't filter this as a duplicate
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("on close, rollback: " + old.getMessage().getMessageId());
+                        LOG.debug("on close, rollback duplicate: " + old.getMessage().getMessageId());
                     }
                     session.connection.rollbackDuplicate(this, old.getMessage());
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c3875222/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index b16fcba..7ff8702 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -398,8 +398,9 @@ public class TransactionContext implements XAResource {
                 beforeEnd();
             } catch (JMSException e) {
                 throw toXAException(e);
+            } finally {
+                setXid(null);
             }
-            setXid(null);
         } else if ((flags & TMSUCCESS) == TMSUCCESS) {
             // set to null if this is the current xid.
             // otherwise this could be an asynchronous success call
@@ -408,8 +409,9 @@ public class TransactionContext implements XAResource {
                     beforeEnd();
                 } catch (JMSException e) {
                     throw toXAException(e);
+                } finally {
+                    setXid(null);
                 }
-                setXid(null);
             }
         } else {
             throw new XAException(XAException.XAER_INVAL);
@@ -684,6 +686,7 @@ public class TransactionContext implements XAResource {
             this.connection.checkClosedOrFailed();
             this.connection.ensureConnectionInfoSent();
         } catch (JMSException e) {
+            disassociate();
             throw toXAException(e);
         }
 
@@ -699,6 +702,7 @@ public class TransactionContext implements XAResource {
                     LOG.debug("Started XA transaction: " + transactionId);
                 }
             } catch (JMSException e) {
+                disassociate();
                 throw toXAException(e);
             }
 
@@ -712,6 +716,7 @@ public class TransactionContext implements XAResource {
                         LOG.debug("Ended XA transaction: " + transactionId);
                     }
                 } catch (JMSException e) {
+                    disassociate();
                     throw toXAException(e);
                 }
 
@@ -729,12 +734,16 @@ public class TransactionContext implements XAResource {
 	        	}
             }
 
-            // dis-associate
-            associatedXid = null;
-            transactionId = null;
+            disassociate();
         }
     }
 
+    private void disassociate() {
+         // dis-associate
+         associatedXid = null;
+         transactionId = null;
+    }
+
     /**
      * Sends the given command. Also sends the command in case of interruption,
      * so that important commands like rollback and commit are never interrupted.

http://git-wip-us.apache.org/repos/asf/activemq/blob/c3875222/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
index d73dccd..e0699fe 100644
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
@@ -46,8 +46,9 @@ public class XaConnectionPool extends ConnectionPool {
         try {
             boolean isXa = (transactionManager != null && transactionManager.getStatus()
!= Status.STATUS_NO_TRANSACTION);
             if (isXa) {
-                transacted = true;
-                ackMode = Session.SESSION_TRANSACTED;
+                // if the xa tx aborts inflight we don't want to auto create a local transaction
or auto ack
+                transacted = false;
+                ackMode = Session.CLIENT_ACKNOWLEDGE;
             } else if (transactionManager != null) {
                 // cmt or transactionManager managed
                 transacted = false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/c3875222/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
index aef297b..56c4f60 100644
--- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
+++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
@@ -151,6 +151,104 @@ public class XAConnectionPoolTest extends TestSupport {
         connection.close();
     }
 
+    public void testAckModeOfPoolNonXAWithTM() throws Exception {
+        final Vector<Synchronization> syncs = new Vector<Synchronization>();
+        ActiveMQTopic topic = new ActiveMQTopic("test");
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+
+        // simple TM that is in a tx and will track syncs
+        pcf.setTransactionManager(new TransactionManager(){
+            @Override
+            public void begin() throws NotSupportedException, SystemException {
+            }
+
+            @Override
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException,
IllegalStateException, RollbackException, SecurityException, SystemException {
+            }
+
+            @Override
+            public int getStatus() throws SystemException {
+                return Status.STATUS_ACTIVE;
+            }
+
+            @Override
+            public Transaction getTransaction() throws SystemException {
+                return new Transaction() {
+                    @Override
+                    public void commit() throws HeuristicMixedException, HeuristicRollbackException,
RollbackException, SecurityException, SystemException {
+                    }
+
+                    @Override
+                    public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException,
SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public boolean enlistResource(XAResource xaRes) throws IllegalStateException,
RollbackException, SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public int getStatus() throws SystemException {
+                        return 0;
+                    }
+
+                    @Override
+                    public void registerSynchronization(Synchronization synch) throws IllegalStateException,
RollbackException, SystemException {
+                        syncs.add(synch);
+                    }
+
+                    @Override
+                    public void rollback() throws IllegalStateException, SystemException
{
+                    }
+
+                    @Override
+                    public void setRollbackOnly() throws IllegalStateException, SystemException
{
+                    }
+                };
+
+            }
+
+            @Override
+            public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException,
SystemException {
+            }
+
+            @Override
+            public void rollback() throws IllegalStateException, SecurityException, SystemException
{
+            }
+
+            @Override
+            public void setRollbackOnly() throws IllegalStateException, SystemException {
+            }
+
+            @Override
+            public void setTransactionTimeout(int seconds) throws SystemException {
+            }
+
+            @Override
+            public Transaction suspend() throws SystemException {
+                return null;
+            }
+        });
+
+        TopicConnection connection = (TopicConnection) pcf.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertEquals("client ack is enforce", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode());
+        TopicPublisher publisher = session.createPublisher(topic);
+        publisher.publish(session.createMessage());
+
+        // simulate a commit
+        for (Synchronization sync : syncs) {
+            sync.beforeCompletion();
+        }
+        for (Synchronization sync : syncs) {
+            sync.afterCompletion(1);
+        }
+        connection.close();
+    }
+
     public void testInstanceOf() throws  Exception {
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
         assertTrue(pcf instanceof QueueConnectionFactory);


Mime
View raw message