activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: AMQ-5920 - make using a vt transaction configurable, a transaction negates concurrentstoreanddispatch and imposes local 2pc on mKahadb so needs to be off by default
Date Mon, 17 Aug 2015 21:15:46 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x 6668f7b14 -> 4c343ebd1


AMQ-5920 - make using a vt transaction configurable, a transaction negates concurrentstoreanddispatch
and imposes local 2pc on mKahadb so needs to be off by default

(cherry picked from commit ffdaeb2bd19b299613de7aebc809079bc2cd4416)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4c343ebd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4c343ebd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4c343ebd

Branch: refs/heads/activemq-5.12.x
Commit: 4c343ebd1246c6148392b9ab88700af5c5f87391
Parents: 6668f7b
Author: gtully <gary.tully@gmail.com>
Authored: Mon Aug 17 15:35:36 2015 +0100
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Aug 17 17:06:35 2015 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/virtual/VirtualTopic.java   | 13 +++++++++++++
 .../broker/region/virtual/VirtualTopicInterceptor.java |  5 ++++-
 .../broker/virtual/VirtualTopicFanoutPerfTest.java     |  1 +
 3 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4c343ebd/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index 95fa333..14ea3fe 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -43,6 +43,7 @@ public class VirtualTopic implements VirtualDestination {
     private boolean selectorAware = false;
     private boolean local = false;
     private boolean concurrentSend = false;
+    private boolean transactedSend = false;
 
     @Override
     public ActiveMQDestination getVirtualDestination() {
@@ -181,4 +182,16 @@ public class VirtualTopic implements VirtualDestination {
     public void setConcurrentSend(boolean concurrentSend) {
         this.concurrentSend = concurrentSend;
     }
+
+    public boolean isTransactedSend() {
+        return transactedSend;
+    }
+
+    /**
+     * When true, dispatch to matching destinations always uses a transaction.
+     * @param transactedSend
+     */
+    public void setTransactedSend(boolean transactedSend) {
+        this.transactedSend = transactedSend;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4c343ebd/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index 7967562..36c08e0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -43,6 +43,8 @@ public class VirtualTopicInterceptor extends DestinationFilter {
     private final String postfix;
     private final boolean local;
     private final boolean concurrentSend;
+    private final boolean transactedSend;
+
     private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,
ActiveMQQueue>();
 
     public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
@@ -51,6 +53,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
         this.postfix = virtualTopic.getPostfix();
         this.local = virtualTopic.isLocal();
         this.concurrentSend = virtualTopic.isConcurrentSend();
+        this.transactedSend = virtualTopic.isTransactedSend();
     }
 
     public Topic getTopic() {
@@ -120,7 +123,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
 
     private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext
connectionContext, Message message) throws Exception {
         LocalTransactionId result = null;
-        if (numDestinations > 1 && message.isPersistent() && message.getTransactionId()
== null) {
+        if (transactedSend && numDestinations > 1 && message.isPersistent()
&& message.getTransactionId() == null) {
             result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()),
message.getMessageId().getProducerSequenceId());
             connectionContext.getBroker().beginTransaction(connectionContext, result);
             connectionContext.setTransaction(connectionContext.getTransactions().get(result));

http://git-wip-us.apache.org/repos/asf/activemq/blob/4c343ebd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
index 90cdeea..4ba82eb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java
@@ -55,6 +55,7 @@ public class VirtualTopicFanoutPerfTest {
                 for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor)
destinationInterceptor).getVirtualDestinations()) {
                     if (virtualDestination instanceof VirtualTopic) {
                         ((VirtualTopic) virtualDestination).setConcurrentSend(true);
+                        ((VirtualTopic) virtualDestination).setTransactedSend(true);
                 }
             }
         }


Mime
View raw message