activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [02/18] git commit: https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId attribute to allow resource manager identity to be mapped to the connection id, thus overiding the default broker identity association. usefu
Date Thu, 20 Mar 2014 18:43:58 GMT
https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId
attribute to allow resource manager identity to be mapped to the connection id, thus overiding
the default broker identity association. usefull is a TM does not end a start with the join
flags. We may want to implement join but it is not trivial to determin the existing associated
state and clear it


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b87a014
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b87a014
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b87a014

Branch: refs/heads/activemq-5.9
Commit: 8b87a01494c42a7f3458dd25350888254ce06670
Parents: b50080f
Author: gtully <gary.tully@gmail.com>
Authored: Thu Feb 6 15:17:13 2014 +0000
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Thu Mar 20 13:52:37 2014 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java | 12 ++++++
 .../activemq/ActiveMQConnectionFactory.java     | 19 +++++++++-
 .../activemq/ra/LocalAndXATransaction.java      | 21 ++++++----
 .../ActiveMQXAConnectionFactoryTest.java        | 40 ++++++++++++++++++++
 4 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8b87a014/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index a722e0b..d5f1c17 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -205,6 +205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
     private boolean messagePrioritySupported = true;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
+    private boolean rmIdFromConnectionId = false;
 
     private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
     private RejectedExecutionHandler rejectedTaskHandler = null;
@@ -1654,6 +1655,9 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
      * @throws JMSException
      */
     public String getResourceManagerId() throws JMSException {
+        if (isRmIdFromConnectionId()) {
+            return info.getConnectionId().getValue();
+        }
         waitForBrokerInfo();
         if (brokerInfo == null) {
             throw new JMSException("Connection failed before Broker info was received.");
@@ -2590,6 +2594,14 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
         this.nonBlockingRedelivery = nonBlockingRedelivery;
     }
 
+    public boolean isRmIdFromConnectionId() {
+        return rmIdFromConnectionId;
+    }
+
+    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
+        this.rmIdFromConnectionId = rmIdFromConnectionId;
+    }
+
     /**
      * Removes any TempDestinations that this connection has cached, ignoring
      * any exceptions generated because the destination is in use as they should

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b87a014/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 8495d84..35d4a69 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -179,6 +179,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
     private TaskRunnerFactory sessionTaskRunner;
     private RejectedExecutionHandler rejectedTaskHandler = null;
     protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection
in sub class
+    private boolean rmIdFromConnectionId = false;
 
     // /////////////////////////////////////////////
     //
@@ -221,7 +222,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         }
     }
 
-    /**
+    /*boolean*
      * @param brokerURL
      * @return
      * @throws URISyntaxException
@@ -401,6 +402,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         connection.setSessionTaskRunner(getSessionTaskRunner());
         connection.setRejectedTaskHandler(getRejectedTaskHandler());
         connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
+        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -821,6 +823,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
         props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
         props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
+        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
     }
 
     public boolean isUseCompression() {
@@ -1205,4 +1208,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
     public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval)
{
         this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
     }
+
+
+    public boolean isRmIdFromConnectionId() {
+        return rmIdFromConnectionId;
+    }
+
+    /**
+     * uses the connection id as the resource identity for XAResource.isSameRM
+     * ensuring join will only occur on a single connection
+     */
+    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
+        this.rmIdFromConnectionId = rmIdFromConnectionId;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b87a014/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
index e53cd5c..0f27393 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
@@ -24,11 +24,14 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.TransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Used to provide a LocalTransaction and XAResource to a JMS session.
  */
 public class LocalAndXATransaction implements XAResource, LocalTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
 
     private final TransactionContext transactionContext;
     private boolean inManagedTx;
@@ -86,6 +89,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public void end(Xid arg0, int arg1) throws XAException {
+        LOG.debug("{} end {} with {}", new Object[]{this, arg0, arg1});
         try {
             transactionContext.end(arg0, arg1);
         } finally {
@@ -106,14 +110,16 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public boolean isSameRM(XAResource xaresource) throws XAException {
-        if (xaresource == null) {
-            return false;
-        }
-        // Do we have to unwrap?
-        if (xaresource instanceof LocalAndXATransaction) {
-            xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
+        boolean isSame = false;
+        if (xaresource != null) {
+            // Do we have to unwrap?
+            if (xaresource instanceof LocalAndXATransaction) {
+                xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
+            }
+            isSame = transactionContext.isSameRM(xaresource);
         }
-        return transactionContext.isSameRM(xaresource);
+        LOG.trace("{} isSameRM({}) = {}", new Object[]{this, xaresource, isSame});
+        return isSame;
     }
 
     public int prepare(Xid arg0) throws XAException {
@@ -133,6 +139,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public void start(Xid arg0, int arg1) throws XAException {
+        LOG.trace("{} start {} with {}", new Object[]{this, arg0, arg1});
         transactionContext.start(arg0, arg1);
         try {
             setInManagedTx(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b87a014/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
index 7da3035..5fd6e5c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
@@ -185,6 +185,46 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport
{
         }
     }
 
+    public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException
{
+
+        XAConnection connection1 = null;
+        XAConnection connection2 = null;
+        try {
+            ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
+            connection1 = (XAConnection)cf1.createConnection();
+            XASession session1 = connection1.createXASession();
+            XAResource resource1 = session1.getXAResource();
+
+            ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection2 = (XAConnection)cf2.createConnection();
+            XASession session2 = connection2.createXASession();
+            XAResource resource2 = session2.getXAResource();
+
+            assertFalse(resource1.isSameRM(resource2));
+
+            // ensure identity is preserved
+            XASession session1a = connection1.createXASession();
+            assertTrue(resource1.isSameRM(session1a.getXAResource()));
+            session1.close();
+            session2.close();
+        } finally {
+            if (connection1 != null) {
+                try {
+                    connection1.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            if (connection2 != null) {
+                try {
+                    connection2.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
     public void testVanilaTransactionalProduceReceive() throws Exception {
 
         XAConnection connection1 = null;


Mime
View raw message