activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5080 - fix up rar transacted delivery and redelivery processing, xarecovery and transaction completion afer failover - using failover maxReconnectAttempts=0 to avoid blocking periodic recovery and neg
Date Mon, 03 Mar 2014 13:30:31 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 2360fb859 -> e8818fafe


https://issues.apache.org/jira/browse/AMQ-5080 - fix up rar transacted delivery and redelivery
processing, xarecovery and transaction completion afer failover - using failover maxReconnectAttempts=0
to avoid blocking periodic recovery and negate replay of aborted transaction state


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e8818faf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e8818faf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e8818faf

Branch: refs/heads/trunk
Commit: e8818fafea0c46fb9fc3029b27f5740f55616eef
Parents: 2360fb8
Author: gtully <gary.tully@gmail.com>
Authored: Mon Mar 3 13:22:30 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Mar 3 13:22:30 2014 +0000

----------------------------------------------------------------------
 .../activemq/broker/ProducerBrokerExchange.java |  15 +-
 .../activemq/broker/TransportConnection.java    |   2 +
 .../org/apache/activemq/ActiveMQConnection.java |   6 +
 .../org/apache/activemq/ActiveMQSession.java    |  49 ++++++-
 .../org/apache/activemq/TransactionContext.java |  38 +++--
 .../activemq/command/XATransactionId.java       |  45 +++++-
 .../activemq/ra/ActiveMQConnectionFactory.java  |   4 +-
 .../activemq/ra/ActiveMQEndpointWorker.java     |   4 +-
 .../activemq/ra/ActiveMQManagedConnection.java  |  20 +--
 .../activemq/ra/ActiveMQResourceAdapter.java    | 146 ++++++++++++++++---
 .../activemq/ra/LocalAndXATransaction.java      |   6 +-
 .../apache/activemq/ra/ServerSessionImpl.java   |   9 +-
 .../activemq/ra/ServerSessionPoolImpl.java      |  19 ++-
 .../ra/ActiveMQConnectionFactoryTest.java       |   6 +
 14 files changed, 306 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
index b3b383e..bf1d21e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
@@ -143,15 +143,22 @@ public class ProducerBrokerExchange {
                 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
                 if (producerSequenceId <= lastStoredForMessageProducer) {
                     canDispatch = false;
-                    LOG.debug("suppressing duplicate message send [{}] from network producer
with producerSequence [{}] less than last stored: {}", new Object[]{
+                    LOG.warn("suppressing duplicate message send [{}] from network producer
with producerSequence [{}] less than last stored: {}", new Object[]{
                             (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()),
producerSequenceId, lastStoredForMessageProducer
                     });
                 }
             } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
                 canDispatch = false;
-                LOG.debug("suppressing duplicated message send [{}] with producerSequenceId
[{}] less than last stored: {}", new Object[]{
-                        (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()),
producerSequenceId, lastSendSequenceNumber
-                });
+                if (messageSend.isInTransaction()) {
+                    LOG.warn("suppressing duplicated message send [{}] with producerSequenceId
[{}] <= last stored: {}", new Object[]{
+                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()),
producerSequenceId, lastSendSequenceNumber
+                    });
+                } else {
+                    LOG.debug("suppressing duplicated message send [{}] with producerSequenceId
[{}] <= last stored: {}", new Object[]{
+                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()),
producerSequenceId, lastSendSequenceNumber
+                    });
+
+                }
             } else {
                 // track current so we can suppress duplicates later in the stream
                 lastSendSequenceNumber.set(producerSequenceId);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 65d044b..557b88c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -503,6 +503,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
         ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
         if (consumerExchange != null) {
             broker.acknowledge(consumerExchange, ack);
+        } else if (ack.isInTransaction()) {
+            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index d5f1c17..3f17c1b 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -1849,6 +1849,10 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
         dispatchers.remove(consumerId);
     }
 
+    public boolean hasDispatcher(ConsumerId consumerId) {
+        return dispatchers.containsKey(consumerId);
+    }
+
     /**
      * @param o - the command to consume
      */
@@ -1878,6 +1882,8 @@ public class ActiveMQConnection implements Connection, TopicConnection,
QueueCon
                                 md.setMessage(msg);
                             }
                             dispatcher.dispatch(md);
+                        } else {
+                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
                         }
                         return null;
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 47ed980..1f9ef32 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -649,7 +649,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         }
     }
 
+    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
     void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
+        clearRequestsCounter.incrementAndGet();
         executor.clearMessagesInProgress();
         // we are called from inside the transport reconnection logic which involves us
         // clearing all the connections' consumers dispatch and delivered lists. So rather
@@ -860,9 +862,25 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
             final MessageDispatch md = messageDispatch;
             ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-            if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message))
{
-                // TODO: Ack it without delivery to client
-                continue;
+
+            MessageAck earlyAck = null;
+            if (message.isExpired()) {
+                earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1);
+            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
+                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
+                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
+                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
+            }
+            if (earlyAck != null) {
+                try {
+                    asyncSendPacket(earlyAck);
+                } catch (Throwable t) {
+                    LOG.error("error dispatching ack: {} ", earlyAck, t);
+                    connection.onClientInternalException(t);
+                } finally {
+                    continue;
+                }
             }
 
             if (isClientAcknowledge()||isIndividualAcknowledge()) {
@@ -886,16 +904,36 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                 if (ack.getTransactionId() != null) {
                     getTransactionContext().addSynchronization(new Synchronization() {
 
-                        @Override
+                        final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE
? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
                         public void beforeEnd() throws Exception {
-                            asyncSendPacket(ack);
+                            // validate our consumer so we don't push stale acks that get
ignored
+                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId()))
{
+                                LOG.debug("forcing rollback - {} consumer no longer active
on {}", ack, connection);
+                                throw new TransactionRolledBackException("consumer " + ack.getConsumerId()
+ " no longer active on " + connection);
+                            }
+                            LOG.trace("beforeEnd ack {}", ack);
+                            sendAck(ack);
                         }
 
                         @Override
                         public void afterRollback() throws Exception {
+                            LOG.trace("rollback {}", ack, new Throwable("here"));
                             md.getMessage().onMessageRolledBack();
                             // ensure we don't filter this as a duplicate
                             connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
+
+                            // don't redeliver if we have been interrupted b/c the broker
will redeliver on reconnect
+                            if (clearRequestsCounter.get() > clearRequestCount) {
+                                LOG.debug("No redelivery of {} on rollback of {} due to failover
of {}", md, ack.getTransactionId(), connection.getTransport());
+                                return;
+                            }
+
+                            // validate our consumer so we don't push stale acks that get
ignored or redeliver what will be redispatched
+                            if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId()))
{
+                                LOG.debug("No local redelivery of {} on rollback of {} because
consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
+                                return;
+                            }
+
                             RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
                             int redeliveryCounter = md.getMessage().getRedeliveryCounter();
                             if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
@@ -932,6 +970,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                     });
                 }
 
+                LOG.trace("{} onMessage({})", this, message.getMessageId());
                 messageListener.onMessage(message);
 
             } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 43aacf7..e780783 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -40,6 +40,7 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transport.failover.FailoverTransport;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
@@ -71,9 +72,8 @@ public class TransactionContext implements XAResource {
     private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS
=
     		new HashMap<TransactionId, List<TransactionContext>>();
 
-    private final ActiveMQConnection connection;
+    private ActiveMQConnection connection;
     private final LongSequenceGenerator localTransactionIdGenerator;
-    private final ConnectionId connectionId;
     private List<Synchronization> synchronizations;
 
     // To track XA transactions.
@@ -82,10 +82,14 @@ public class TransactionContext implements XAResource {
     private LocalTransactionEventListener localTransactionEventListener;
     private int beforeEndIndex;
 
+    // for RAR recovery
+    public TransactionContext() {
+        localTransactionIdGenerator = null;
+    }
+
     public TransactionContext(ActiveMQConnection connection) {
         this.connection = connection;
         this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
-        this.connectionId = connection.getConnectionInfo().getConnectionId();
     }
 
     public boolean isInXATransaction() {
@@ -231,7 +235,7 @@ public class TransactionContext implements XAResource {
         if (transactionId == null) {
             synchronizations = null;
             beforeEndIndex = 0;
-            this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
+            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.BEGIN);
             this.connection.ensureConnectionInfoSent();
             this.connection.asyncSendPacket(info);
@@ -646,6 +650,13 @@ public class TransactionContext implements XAResource {
         TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
         try {
             this.connection.checkClosedOrFailed();
+            final FailoverTransport failoverTransport = this.connection.getTransport().narrow(FailoverTransport.class);
+            if (failoverTransport != null && !failoverTransport.isConnected()) {
+                // otherwise call will block on reconnect forfeting any app level periodic
check
+                XAException xaException = new XAException("Failover transport not connected:
" + this.getConnection().getTransport());
+                xaException.errorCode = XAException.XAER_RMERR;
+                throw xaException;
+            }
             this.connection.ensureConnectionInfoSent();
 
             DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
@@ -657,6 +668,7 @@ public class TransactionContext implements XAResource {
                 answer = new XATransactionId[data.length];
                 System.arraycopy(data, 0, answer, 0, data.length);
             }
+            LOG.trace("recover({})={}", flag, answer);
             return answer;
         } catch (JMSException e) {
             throw toXAException(e);
@@ -676,7 +688,7 @@ public class TransactionContext implements XAResource {
     // Helper methods.
     //
     // ///////////////////////////////////////////////////////////
-    private String getResourceManagerId() throws JMSException {
+    protected String getResourceManagerId() throws JMSException {
         return this.connection.getResourceManagerId();
     }
 
@@ -695,11 +707,11 @@ public class TransactionContext implements XAResource {
             associatedXid = xid;
             transactionId = new XATransactionId(xid);
 
-            TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
+            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.BEGIN);
             try {
                 this.connection.asyncSendPacket(info);
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Started XA transaction: " + transactionId);
+                    LOG.debug("{} started XA transaction {} ", this, transactionId);
                 }
             } catch (JMSException e) {
                 disassociate();
@@ -709,11 +721,11 @@ public class TransactionContext implements XAResource {
         } else {
 
             if (transactionId != null) {
-                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
+                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.END);
                 try {
                     syncSendPacketWithInterruptionHandling(info);
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Ended XA transaction: " + transactionId);
+                        LOG.debug("{} ended XA transaction {}", this, transactionId);
                     }
                 } catch (JMSException e) {
                     disassociate();
@@ -800,6 +812,14 @@ public class TransactionContext implements XAResource {
         return connection;
     }
 
+
+    // for RAR xa recovery where xaresource connection is per request
+    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
+        ActiveMQConnection existing = this.connection;
+        this.connection = connection;
+        return existing;
+    }
+
     public void cleanup() {
         associatedXid = null;
         transactionId = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
index 5f786e5..84fea7a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java
@@ -102,19 +102,52 @@ public class XATransactionId extends TransactionId implements Xid, Comparable
{
         if (transactionKey == null) {
             StringBuffer s = new StringBuffer();
             s.append("XID:[" + formatId + ",globalId=");
-            for (int i = 0; i < globalTransactionId.length; i++) {
-                s.append(Integer.toHexString(globalTransactionId[i]));
-            }
+            s.append(stringForm(formatId, globalTransactionId));
             s.append(",branchId=");
-            for (int i = 0; i < branchQualifier.length; i++) {
-                s.append(Integer.toHexString(branchQualifier[i]));
-            }
+            s.append(stringForm(formatId, branchQualifier));
             s.append("]");
             transactionKey = s.toString();
         }
         return transactionKey;
     }
 
+    private String stringForm(int format, byte[] uid) {
+        StringBuffer s = new StringBuffer();
+        switch (format) {
+            case 131077:  // arjuna
+                stringFormArj(s, uid);
+                break;
+            default: // aries
+                stringFormDefault(s, uid);
+        }
+        return s.toString();
+    }
+
+    private void stringFormDefault(StringBuffer s, byte[] uid) {
+        for (int i = 0; i < uid.length; i++) {
+            s.append(Integer.toHexString(uid[i]));
+        }
+    }
+
+    private void stringFormArj(StringBuffer s, byte[] uid) {
+        try {
+            DataByteArrayInputStream byteArrayInputStream = new DataByteArrayInputStream(uid);
+            s.append(Long.toString(byteArrayInputStream.readLong(), 16));
+            s.append(':');
+            s.append(Long.toString(byteArrayInputStream.readLong(), 16));
+            s.append(':');
+
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+            s.append(':');
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+            s.append(':');
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+
+        } catch (Exception ignored) {
+            stringFormDefault(s, uid);
+        }
+    }
+
     public String toString() {
         return getTransactionKey();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java
index 4ef3684..234fc30 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java
@@ -98,7 +98,9 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
                 throw (JMSException)e.getCause();
             }
             LOG.debug("Connection could not be created:", e);
-            throw new JMSException(e.getMessage());
+            JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
index 1e12751..ac35959 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
@@ -235,7 +235,7 @@ public class ActiveMQEndpointWorker {
                 c.close();
             }
         } catch (JMSException e) {
-            //
+            LOG.trace("failed to close c {}", c, e);
         }
     }
 
@@ -249,7 +249,7 @@ public class ActiveMQEndpointWorker {
                 cc.close();
             }
         } catch (JMSException e) {
-            //
+            LOG.trace("failed to close cc {}", cc, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
index f8caf09..a694c12 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java
@@ -201,13 +201,15 @@ public class ActiveMQManagedConnection implements ManagedConnection,
ExceptionLi
             return;
         }
 
-        cleanup();
-
         try {
-            physicalConnection.close();
-            destroyed = true;
-        } catch (JMSException e) {
-            LOG.info("Error occurred during close of a JMS connection.", e);
+            cleanup();
+        } finally {
+            try {
+                physicalConnection.close();
+                destroyed = true;
+            } catch (JMSException e) {
+                LOG.trace("Error occurred during close of a JMS connection.", e);
+            }
         }
     }
 
@@ -233,10 +235,10 @@ public class ActiveMQManagedConnection implements ManagedConnection,
ExceptionLi
             physicalConnection.cleanup();
         } catch (JMSException e) {
             throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e);
+        } finally {
+            // defer transaction cleanup till after close so that close is aware of the current
tx
+            localAndXATransaction.cleanup();
         }
-        // defer transaction cleanup till after close so that close is aware of the current
tx
-        localAndXATransaction.cleanup();
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index 68b2178..32fdb13 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -16,22 +16,20 @@
  */
 package org.apache.activemq.ra;
 
-import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 
-import javax.jms.Connection;
 import javax.jms.JMSException;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
 import javax.resource.NotSupportedException;
 import javax.resource.ResourceException;
 import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
 import javax.resource.spi.ResourceAdapterInternalException;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 
+import javax.transaction.xa.Xid;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
@@ -39,6 +37,8 @@ import org.apache.activemq.TransactionContext;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Knows how to connect to one ActiveMQ server. It can then activate endpoints
@@ -50,7 +50,7 @@ import org.apache.activemq.util.ServiceSupport;
  * 
  */
 public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter
{
-
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
     private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers
= new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
 
     private BootstrapContext bootstrapContext;
@@ -233,21 +233,129 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport
implement
      */
     public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException
{
         try {
-            return new XAResource[]{(XAResource)
-                    java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[]{XAResource.class},
-                            new java.lang.reflect.InvocationHandler () {
-                                @Override
-                                public Object invoke(Object proxy, Method method, Object[]
args) throws Throwable {
-                                    ActiveMQConnection connection = makeConnection();
-                                    try {
-                                        return method.invoke(new TransactionContext(connection),
args);
-                                    } finally {
-                                        try {
-                                            connection.close();
-                                        } catch (Throwable ignore) {}
-                                    }
+            return new XAResource[]{
+                    new TransactionContext() {
+
+                        @Override
+                        public boolean isSameRM(XAResource xaresource) throws XAException
{
+                            ActiveMQConnection original = null;
+                            try {
+                                original = setConnection(newConnection());
+                                boolean result = super.isSameRM(xaresource);
+                                LOG.trace("{}.recover({})={}", getConnection(), xaresource,
result);
+                                return result;
+
+                            } catch (JMSException e) {
+                                LOG.trace("isSameRM({}) failed", xaresource, e);
+                                XAException xaException = new XAException(e.getMessage());
+                                throw xaException;
+                            } finally {
+                                closeConnection(original);
+                            }
+                        }
+
+                        @Override
+                        protected String getResourceManagerId() throws JMSException {
+                            ActiveMQConnection original = null;
+                            try {
+                                original = setConnection(newConnection());
+                                return super.getResourceManagerId();
+                            } finally {
+                                closeConnection(original);
+                            }
+                        }
+
+                        @Override
+                        public void commit(Xid xid, boolean onePhase) throws XAException
{
+                            ActiveMQConnection original = null;
+                            try {
+                                setConnection(newConnection());
+                                super.commit(xid, onePhase);
+                                LOG.trace("{}.commit({},{})", getConnection(), xid);
+
+                            } catch (JMSException e) {
+                                LOG.trace("{}.commit({},{}) failed", getConnection(), xid,
onePhase, e);
+                                throwXAException(e);
+                            } finally {
+                                closeConnection(original);
+                            }
+                        }
+
+                        @Override
+                        public void rollback(Xid xid) throws XAException {
+                            ActiveMQConnection original = null;
+                            try {
+                                original = setConnection(newConnection());
+                                super.rollback(xid);
+                                LOG.trace("{}.rollback({})", getConnection(), xid);
+
+                            } catch (JMSException e) {
+                                LOG.trace("{}.rollback({}) failed", getConnection(), xid,
e);
+                                throwXAException(e);
+                            } finally {
+                               closeConnection(original);
+                            }
+                        }
+
+                        @Override
+                        public Xid[] recover(int flags) throws XAException {
+                            Xid[] result = new Xid[]{};
+                            ActiveMQConnection original = null;
+                            try {
+                                original = setConnection(newConnection());
+                                result = super.recover(flags);
+                                LOG.trace("{}.recover({})={}", getConnection(), flags, result);
+
+                            } catch (JMSException e) {
+                                LOG.trace("{}.recover({}) failed", getConnection(), flags,
e);
+                                throwXAException(e);
+                            } finally {
+                                closeConnection(original);
+                            }
+                            return result;
+                        }
+
+                        @Override
+                        public void forget(Xid xid) throws XAException {
+                            ActiveMQConnection original = null;
+                            try {
+                                original = setConnection(newConnection());
+                                super.forget(xid);
+                                LOG.trace("{}.forget({})", getConnection(), xid);
+
+                            } catch (JMSException e) {
+                                LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
+                                throwXAException(e);
+                            } finally {
+                                closeConnection(original);
+                            }
+                        }
+
+                        private void throwXAException(JMSException e) throws XAException
{
+                            XAException xaException = new XAException(e.getMessage());
+                            xaException.errorCode = XAException.XAER_RMFAIL;
+                            throw xaException;
+                        }
+
+                        private ActiveMQConnection newConnection() throws JMSException {
+                            ActiveMQConnection connection = makeConnection();
+                            connection.start();
+                            return connection;
+                        }
+
+                        private void closeConnection(ActiveMQConnection original) {
+                            ActiveMQConnection connection = getConnection();
+                            if (connection != null) {
+                                try {
+                                    connection.close();
+                                } catch (JMSException ignored) {
+
+                                } finally {
+                                    setConnection(original);
                                 }
-                            })};
+                            }
+                        }
+                    }};
 
         } catch (Exception e) {
             throw new ResourceException(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
index c6d91ef..f93ee0f 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java
@@ -33,13 +33,17 @@ import org.slf4j.LoggerFactory;
 public class LocalAndXATransaction implements XAResource, LocalTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class);
 
-    private final TransactionContext transactionContext;
+    private TransactionContext transactionContext;
     private boolean inManagedTx;
 
     public LocalAndXATransaction(TransactionContext transactionContext) {
         this.transactionContext = transactionContext;
     }
 
+    public void setTransactionContext(TransactionContext transactionContext) {
+        this.transactionContext = transactionContext;
+    }
+
     public void setInManagedTx(boolean inManagedTx) throws JMSException {
         this.inManagedTx = inManagedTx;
         if (!inManagedTx) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
index 27c75b1..a4382ee 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
@@ -168,15 +168,15 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
                 if ( session.isRunning() ) {
                     session.run();
                 } else {
-                    log.debug("JMS Session is no longer running (maybe due to loss of connection?),
marking ServerSesison as stale");
+                    log.debug("JMS Session {} with unconsumed {} is no longer running (maybe
due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
                     stale = true;
                 }
             } catch (Throwable e) {
                 stale = true;
                 if ( log.isDebugEnabled() ) {
-                    log.debug("Endpoint failed to process message.", e);
+                    log.debug("Endpoint {} failed to process message.", session, e);
                 } else if ( log.isInfoEnabled() ) {
-                    log.info("Endpoint failed to process message. Reason: " + e.getMessage());
                   
+                    log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(),
session);
                 }
             } finally {
                 InboundContextSupport.unregister(this);
@@ -190,6 +190,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
                     }
                     if (!session.hasUncomsumedMessages()) {
                         runningFlag = false;
+                        log.debug("Session has no unconsumed message, returning to pool");
                         pool.returnToPool(this);
                         break;
                     }
@@ -255,7 +256,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
      */
     @Override
     public String toString() {
-        return "ServerSessionImpl:" + serverSessionId;
+        return "ServerSessionImpl:" + serverSessionId + "{" + session +"}";
     }
 
     public void close() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
index 25de03d..393ed35 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
@@ -227,11 +227,24 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
         try {
             ActiveMQSession session = (ActiveMQSession)ss.getSession();
             List l = session.getUnconsumedMessages();
-            for (Iterator i = l.iterator(); i.hasNext();) {
-                dispatchToSession((MessageDispatch)i.next());
+            if (!l.isEmpty()) {
+                ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
+                if (connection != null) {
+                    for (Iterator i = l.iterator(); i.hasNext();) {
+                        MessageDispatch md = (MessageDispatch)i.next();
+                        if (connection.hasDispatcher(md.getConsumerId())) {
+                            dispatchToSession(md);
+                            LOG.trace("on remove of {} redispatch of {}", session, md);
+                        } else {
+                            LOG.trace("on remove not redispatching {}, dispatcher no longer
present on {}", md, session.getConnection());
+                        }
+                    }
+                } else {
+                    LOG.trace("on remove of {} not redispatching while disconnected", session);
+                }
             }
         } catch (Throwable t) {
-            LOG.error("Error redispatching unconsumed messages from stale session", t);
+            LOG.error("Error redispatching unconsumed messages from stale server session
{}", ss, t);
         }
         ss.close();
         synchronized (closing) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
index 2191148..e511a12 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java
@@ -117,5 +117,11 @@ public class ActiveMQConnectionFactoryTest extends TestCase {
         assertEquals("one resource", 1, resources.length);
 
         assertEquals("no pending transactions", 0, resources[0].recover(100).length);
+
+        // validate equality
+        XAResource[] resource2 = ra.getXAResources(null);
+        assertEquals("one resource", 1, resource2.length);
+        assertTrue("isSameRM true", resources[0].isSameRM(resource2[0]));
+        assertFalse("no tthe same instance", resources[0].equals(resource2[0]));
     }
 }


Mime
View raw message