activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5847
Date Tue, 16 Jun 2015 20:41:32 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 7b5c8be37 -> ed266835b


https://issues.apache.org/jira/browse/AMQ-5847

Add workarounds to allow for TX work to take place in multiple sessions
on the same connection.  Future work needed to properly support TXN
Capabilities defined in the spec and support checking of violations of
expected behavior.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ed266835
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ed266835
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ed266835

Branch: refs/heads/master
Commit: ed266835b5aabfcb05e382f3056353a72347f158
Parents: 7b5c8be
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jun 16 16:41:18 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jun 16 16:41:18 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java | 20 ++++++++
 .../transport/amqp/protocol/AmqpReceiver.java   |  8 +--
 .../transport/amqp/protocol/AmqpSender.java     | 16 +++---
 .../transport/amqp/protocol/AmqpSession.java    | 13 +++++
 .../protocol/AmqpTransactionCoordinator.java    | 51 ++++++++++++--------
 .../amqp/JMSClientTransactionTest.java          | 32 ++++++++++++
 6 files changed, 108 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 577fcad..c04a61f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -60,11 +60,13 @@ import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.transport.InactivityIOException;
 import org.apache.activemq.transport.amqp.AmqpHeader;
 import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
@@ -142,10 +144,12 @@ public class AmqpConnection implements AmqpProtocolConverter {
     private final ConnectionInfo connectionInfo = new ConnectionInfo();
     private long nextSessionId;
     private long nextTempDestinationId;
+    private long nextTransactionId;
     private boolean closing;
     private boolean closedSocket;
     private AmqpAuthenticator authenticator;
 
+    private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new
HashMap<TransactionId, AmqpTransactionCoordinator>();
     private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer,
ResponseHandler>();
     private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId =
new ConcurrentHashMap<ConsumerId, AmqpSender>();
 
@@ -667,6 +671,22 @@ public class AmqpConnection implements AmqpProtocolConverter {
         subscriptionsByConsumerId.remove(consumerId);
     }
 
+    void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator)
{
+        transactions.put(txId, coordinator);
+    }
+
+    void unregisterTransaction(TransactionId txId) {
+        transactions.remove(txId);
+    }
+
+    AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) {
+        return transactions.get(txId);
+    }
+
+    LocalTransactionId getNextTransactionId() {
+        return new LocalTransactionId(getConnectionId(), ++nextTransactionId);
+    }
+
     ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException
{
         ConsumerInfo result = null;
         RegionBroker regionBroker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 6e52fec..e62ad04 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -31,6 +31,7 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
 import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
@@ -205,9 +206,10 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
 
             final DeliveryState remoteState = delivery.getRemoteState();
             if (remoteState != null && remoteState instanceof TransactionalState)
{
-                TransactionalState s = (TransactionalState) remoteState;
-                long txid = toLong(s.getTxnId());
-                message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(),
txid));
+                TransactionalState txState = (TransactionalState) remoteState;
+                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(),
toLong(txState.getTxnId()));
+                session.enlist(txId);
+                message.setTransactionId(txId);
             }
 
             message.onSend();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 1dd99d2..4cbf744 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -34,6 +34,7 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.ResponseHandler;
 import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
@@ -447,14 +448,13 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
             DeliveryState remoteState = delivery.getRemoteState();
             if (remoteState != null && remoteState instanceof TransactionalState)
{
-                TransactionalState s = (TransactionalState) remoteState;
-                long txid = toLong(s.getTxnId());
-                LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(),
txid);
-                ack.setTransactionId(localTxId);
-
-                // Store the message sent in this TX we might need to
-                // re-send on rollback
-                md.getMessage().setTransactionId(localTxId);
+                TransactionalState txState = (TransactionalState) remoteState;
+                TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(),
toLong(txState.getTxnId()));
+                ack.setTransactionId(txId);
+
+                // Store the message sent in this TX we might need to re-send on rollback
+                session.enlist(txId);
+                md.getMessage().setTransactionId(txId);
                 dispatchedInTx.addFirst(md);
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index ca3a90f..20a8b9f 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -41,6 +41,7 @@ import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.AmqpProtocolException;
@@ -72,6 +73,7 @@ public class AmqpSession implements AmqpResource {
     private final Session protonSession;
     private final SessionId sessionId;
 
+    private boolean enlisted;
     private long nextProducerId = 0;
     private long nextConsumerId = 0;
 
@@ -122,6 +124,8 @@ public class AmqpSession implements AmqpResource {
         for (AmqpSender consumer : consumers.values()) {
             consumer.commit();
         }
+
+        enlisted = false;
     }
 
     /**
@@ -133,6 +137,8 @@ public class AmqpSession implements AmqpResource {
         for (AmqpSender consumer : consumers.values()) {
             consumer.rollback();
         }
+
+        enlisted = false;
     }
 
     /**
@@ -367,6 +373,13 @@ public class AmqpSession implements AmqpResource {
         connection.unregisterSender(consumerId);
     }
 
+    public void enlist(TransactionId txId) {
+        if (!enlisted) {
+            connection.getTxCoordinator(txId).enlist(this);
+            enlisted = true;
+        }
+    }
+
     //----- Configuration accessors ------------------------------------------//
 
     public AmqpConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
index 576ce20..40bcda5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -20,6 +20,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
 import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionId;
@@ -54,7 +56,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
 
-    private long nextTransactionId;
+    private final Set<AmqpSession> txSessions = new HashSet<AmqpSession>();
 
     /**
      * Creates a new Transaction coordinator used to manage AMQP transactions.
@@ -82,7 +84,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
         }
 
         final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
-        ConnectionId connectionId = session.getConnection().getConnectionId();
+        final ConnectionId connectionId = session.getConnection().getConnectionId();
         final Object action = ((AmqpValue) message.getBody()).getValue();
 
         LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
@@ -92,35 +94,41 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
                 throw new Exception("don't know how to handle a declare /w a set GlobalId");
             }
 
-            long txid = getNextTransactionId();
-            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId,
txid), TransactionInfo.BEGIN);
-            sendToActiveMQ(txinfo, null);
-            LOG.trace("started transaction {}", txid);
+            LocalTransactionId txId = session.getConnection().getNextTransactionId();
+            TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
+            session.getConnection().registerTransaction(txId, this);
+            sendToActiveMQ(txInfo, null);
+            LOG.trace("started transaction {}", txId.getValue());
 
             Declared declared = new Declared();
-            declared.setTxnId(new Binary(toBytes(txid)));
+            declared.setTxnId(new Binary(toBytes(txId.getValue())));
             delivery.disposition(declared);
             delivery.settle();
         } else if (action instanceof Discharge) {
-            Discharge discharge = (Discharge) action;
-            long txid = toLong(discharge.getTxnId());
-
+            final Discharge discharge = (Discharge) action;
+            final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId()));
             final byte operation;
+
             if (discharge.getFail()) {
-                LOG.trace("rollback transaction {}", txid);
+                LOG.trace("rollback transaction {}", txId.getValue());
                 operation = TransactionInfo.ROLLBACK;
             } else {
-                LOG.trace("commit transaction {}", txid);
+                LOG.trace("commit transaction {}", txId.getValue());
                 operation = TransactionInfo.COMMIT_ONE_PHASE;
             }
 
-            if (operation == TransactionInfo.ROLLBACK) {
-                session.rollback();
-            } else {
-                session.commit();
+            for (AmqpSession txSession : txSessions) {
+                if (operation == TransactionInfo.ROLLBACK) {
+                    txSession.rollback();
+                } else {
+                    txSession.commit();
+                }
             }
 
-            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId,
txid), operation);
+            txSessions.clear();
+            session.getConnection().unregisterTransaction(txId);
+
+            TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation);
             sendToActiveMQ(txinfo, new ResponseHandler() {
                 @Override
                 public void onResponse(AmqpProtocolConverter converter, Response response)
throws IOException {
@@ -132,6 +140,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
                     } else {
                         delivery.disposition(Accepted.getInstance());
                     }
+
                     LOG.debug("TX: {} settling {}", operation, action);
                     delivery.settle();
                     session.pumpProtonToSocket();
@@ -157,10 +166,6 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver
{
         }
     }
 
-    private long getNextTransactionId() {
-        return ++nextTransactionId;
-    }
-
     @Override
     public ActiveMQDestination getDestination() {
         return null;
@@ -169,4 +174,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
     @Override
     public void setDestination(ActiveMQDestination destination) {
     }
+
+    public void enlist(AmqpSession session) {
+        txSessions.add(session);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ed266835/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 508638e..560edda 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -44,6 +44,38 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
     private final int MSG_COUNT = 1000;
 
     @Test(timeout = 60000)
+    public void testProduceOneConsumeOneInTx() throws Exception {
+        connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Destination queue = session.createQueue(getTestName());
+        MessageProducer messageProducer = session.createProducer(queue);
+
+        messageProducer.send(session.createMessage());
+        session.rollback();
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(0, queueView.getQueueSize());
+
+        messageProducer.send(session.createMessage());
+        session.commit();
+
+        assertEquals(1, queueView.getQueueSize());
+
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        assertNotNull(messageConsumer.receive(5000));
+        session.rollback();
+
+        assertEquals(1, queueView.getQueueSize());
+
+        assertNotNull(messageConsumer.receive(5000));
+        session.commit();
+
+        assertEquals(0, queueView.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
     public void testSingleConsumedMessagePerTxCase() throws Exception {
         connection = createConnection();
         connection.start();


Mime
View raw message