qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: QPIDJMS-271 Cache the transaction state for the active transaction
Date Thu, 09 Mar 2017 16:51:12 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 6e1c49d72 -> 253c8d34b


QPIDJMS-271 Cache the transaction state for the active transaction

Use a cached value for messages sent or acknowledged in a transaction. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/253c8d34
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/253c8d34
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/253c8d34

Branch: refs/heads/master
Commit: 253c8d34b33fce0e82b05869f0631e5fc2a90dc6
Parents: 6e1c49d
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Mar 9 11:50:42 2017 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Mar 9 11:50:42 2017 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java     | 12 ++++++------
 .../jms/provider/amqp/AmqpFixedProducer.java     |  6 +-----
 .../provider/amqp/AmqpTransactionContext.java    | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/253c8d34/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index b682c73..99e9057 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -43,7 +43,6 @@ import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
@@ -294,10 +293,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
                     Binary txnId = session.getTransactionContext().getAmqpTransactionId();
                     if (txnId != null) {
-                        TransactionalState txState = new TransactionalState();
-                        txState.setOutcome(Accepted.getInstance());
-                        txState.setTxnId(txnId);
-                        delivery.disposition(txState);
+                        delivery.disposition(session.getTransactionContext().getTxnAcceptState());
                         delivery.settle();
                         session.getTransactionContext().registerTxConsumer(this);
                     }
@@ -586,6 +582,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
     protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
         int count;
 
+        // Attempt to preemptively size the buffer for the incoming delivery.
+        if (incomingBuffer.capacity() < incoming.available()) {
+            incomingBuffer.capacity(incoming.available());
+        }
+
         while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(),
incomingBuffer.writableBytes())) > 0) {
             incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
             if (!incomingBuffer.isWritable()) {
@@ -680,7 +681,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
                 current.settle();
             }
         }
-
     }
 
     //----- Inner class used to report on deferred close ---------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/253c8d34/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4297147..73ee264 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -33,7 +33,6 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
@@ -133,10 +132,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         if (session.isTransacted()) {
             AmqpTransactionContext context = session.getTransactionContext();
-            Binary amqpTxId = context.getAmqpTransactionId();
-            TransactionalState state = new TransactionalState();
-            state.setTxnId(amqpTxId);
-            delivery.disposition(state);
+            delivery.disposition(context.getTxnEnrolledState());
             context.registerTxProducer(this);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/253c8d34/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 1a43aef..1f012e2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -31,6 +31,8 @@ import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpTransactionCoordinatorBuilder;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +51,8 @@ public class AmqpTransactionContext implements AmqpResourceParent {
     private final Map<JmsProducerId, AmqpProducer> txProducers = new HashMap<>();
 
     private JmsTransactionId current;
+    private TransactionalState cachedAcceptedState;
+    private TransactionalState cachedTransactedState;
     private AmqpTransactionCoordinator coordinator;
 
     /**
@@ -73,12 +77,19 @@ public class AmqpTransactionContext implements AmqpResourceParent {
             @Override
             public void onSuccess() {
                 current = txId;
+                cachedAcceptedState = new TransactionalState();
+                cachedAcceptedState.setOutcome(Accepted.getInstance());
+                cachedAcceptedState.setTxnId(getAmqpTransactionId());
+                cachedTransactedState = new TransactionalState();
+                cachedTransactedState.setTxnId(getAmqpTransactionId());
                 request.onSuccess();
             }
 
             @Override
             public void onFailure(Throwable result) {
                 current = null;
+                cachedAcceptedState = null;
+                cachedTransactedState = null;
                 request.onFailure(result);
             }
 
@@ -178,6 +189,14 @@ public class AmqpTransactionContext implements AmqpResourceParent {
         return session;
     }
 
+    public TransactionalState getTxnAcceptState() {
+        return cachedAcceptedState;
+    }
+
+    public TransactionalState getTxnEnrolledState() {
+        return cachedTransactedState;
+    }
+
     public JmsTransactionId getTransactionId() {
         return current;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message