activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId attribute to allow resource manager identity to be mapped to the connection id, thus overiding the default broker identity association. usefull is a
Date Thu, 06 Feb 2014 15:19:06 GMT
Updated Branches:
  refs/heads/trunk 117f86e10 -> 221a751d0


https://issues.apache.org/jira/browse/AMQ-5031 - add connection factory rmIdFromConnectionId
attribute to allow resource manager identity to be mapped to the connection id, thus overiding
the default broker identity association. usefull is a TM does not end a start with the join
flags. We may want to implement join but it is not trivial to determin the existing associated
state and clear it


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/221a751d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/221a751d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/221a751d

Branch: refs/heads/trunk
Commit: 221a751d0a67e8d1f240d30a2f56c71a31d7627c
Parents: 117f86e
Author: gtully <gary.tully@gmail.com>
Authored: Thu Feb 6 15:17:13 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Feb 6 15:18:30 2014 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQConnection.java | 12 ++++++
 .../activemq/ActiveMQConnectionFactory.java     | 19 +++++++++-
 .../activemq/ra/LocalAndXATransaction.java      | 21 ++++++----
 .../ActiveMQXAConnectionFactoryTest.java        | 40 ++++++++++++++++++++
 4 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/221a751d/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index a722e0b..d5f1c17 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -205,6 +205,7 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
     private boolean messagePrioritySupported = true;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
+    private boolean rmIdFromConnectionId = false;
 
     private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
     private RejectedExecutionHandler rejectedTaskHandler = null;
@@ -1654,6 +1655,9 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
      * @throws JMSException
      */
     public String getResourceManagerId() throws JMSException {
+        if (isRmIdFromConnectionId()) {
+            return info.getConnectionId().getValue();
+        }
         waitForBrokerInfo();
         if (brokerInfo == null) {
             throw new JMSException("Connection failed before Broker info was received.");
@@ -2590,6 +2594,14 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
         this.nonBlockingRedelivery = nonBlockingRedelivery;
     }
 
+    public boolean isRmIdFromConnectionId() {
+        return rmIdFromConnectionId;
+    }
+
+    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
+        this.rmIdFromConnectionId = rmIdFromConnectionId;
+    }
+
     /**
      * Removes any TempDestinations that this connection has cached, ignoring
      * any exceptions generated because the destination is in use as they should

http://git-wip-us.apache.org/repos/asf/activemq/blob/221a751d/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 8495d84..35d4a69 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -179,6 +179,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
     private TaskRunnerFactory sessionTaskRunner;
     private RejectedExecutionHandler rejectedTaskHandler = null;
     protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection
in sub class
+    private boolean rmIdFromConnectionId = false;
 
     // /////////////////////////////////////////////
     //
@@ -221,7 +222,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         }
     }
 
-    /**
+    /*boolean*
      * @param brokerURL
      * @return
      * @throws URISyntaxException
@@ -401,6 +402,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         connection.setSessionTaskRunner(getSessionTaskRunner());
         connection.setRejectedTaskHandler(getRejectedTaskHandler());
         connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
+        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -821,6 +823,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
         props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
         props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
         props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
+        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
     }
 
     public boolean isUseCompression() {
@@ -1205,4 +1208,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements
Conne
     public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval)
{
         this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
     }
+
+
+    public boolean isRmIdFromConnectionId() {
+        return rmIdFromConnectionId;
+    }
+
+    /**
+     * uses the connection id as the resource identity for XAResource.isSameRM
+     * ensuring join will only occur on a single connection
+     */
+    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
+        this.rmIdFromConnectionId = rmIdFromConnectionId;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/221a751d/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
index e53cd5c..0f27393 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
@@ -24,11 +24,14 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.TransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Used to provide a LocalTransaction and XAResource to a JMS session.
  */
 public class LocalAndXATransaction implements XAResource, LocalTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
 
     private final TransactionContext transactionContext;
     private boolean inManagedTx;
@@ -86,6 +89,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public void end(Xid arg0, int arg1) throws XAException {
+        LOG.debug("{} end {} with {}", new Object[]{this, arg0, arg1});
         try {
             transactionContext.end(arg0, arg1);
         } finally {
@@ -106,14 +110,16 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public boolean isSameRM(XAResource xaresource) throws XAException {
-        if (xaresource == null) {
-            return false;
-        }
-        // Do we have to unwrap?
-        if (xaresource instanceof LocalAndXATransaction) {
-            xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
+        boolean isSame = false;
+        if (xaresource != null) {
+            // Do we have to unwrap?
+            if (xaresource instanceof LocalAndXATransaction) {
+                xaresource = ((LocalAndXATransaction)xaresource).transactionContext;
+            }
+            isSame = transactionContext.isSameRM(xaresource);
         }
-        return transactionContext.isSameRM(xaresource);
+        LOG.trace("{} isSameRM({}) = {}", new Object[]{this, xaresource, isSame});
+        return isSame;
     }
 
     public int prepare(Xid arg0) throws XAException {
@@ -133,6 +139,7 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction
{
     }
 
     public void start(Xid arg0, int arg1) throws XAException {
+        LOG.trace("{} start {} with {}", new Object[]{this, arg0, arg1});
         transactionContext.start(arg0, arg1);
         try {
             setInManagedTx(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/221a751d/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
index 7da3035..5fd6e5c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
@@ -185,6 +185,46 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport
{
         }
     }
 
+    public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException
{
+
+        XAConnection connection1 = null;
+        XAConnection connection2 = null;
+        try {
+            ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true");
+            connection1 = (XAConnection)cf1.createConnection();
+            XASession session1 = connection1.createXASession();
+            XAResource resource1 = session1.getXAResource();
+
+            ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+            connection2 = (XAConnection)cf2.createConnection();
+            XASession session2 = connection2.createXASession();
+            XAResource resource2 = session2.getXAResource();
+
+            assertFalse(resource1.isSameRM(resource2));
+
+            // ensure identity is preserved
+            XASession session1a = connection1.createXASession();
+            assertTrue(resource1.isSameRM(session1a.getXAResource()));
+            session1.close();
+            session2.close();
+        } finally {
+            if (connection1 != null) {
+                try {
+                    connection1.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            if (connection2 != null) {
+                try {
+                    connection2.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
     public void testVanilaTransactionalProduceReceive() throws Exception {
 
         XAConnection connection1 = null;


Mime
View raw message