activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: make perpared xa transactions visible in kahadb persistenceadapter view mbean
Date Mon, 03 Mar 2014 13:56:23 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk e8818fafe -> 69c0d399f


make perpared xa transactions visible in kahadb persistenceadapter view mbean


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/69c0d399
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/69c0d399
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/69c0d399

Branch: refs/heads/trunk
Commit: 69c0d399fb374700b1b7351671fcaf897a1d3e16
Parents: e8818fa
Author: gtully <gary.tully@gmail.com>
Authored: Mon Mar 3 13:55:27 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Mar 3 13:55:27 2014 +0000

----------------------------------------------------------------------
 .../broker/jmx/PersistenceAdapterViewMBean.java |  2 +-
 .../activemq/store/kahadb/MessageDatabase.java  | 27 ++++++++++++++++++--
 .../activemq/broker/XARecoveryBrokerTest.java   | 23 ++++++++++++++---
 3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
index e99fef2..b860e9c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java
@@ -21,7 +21,7 @@ public interface PersistenceAdapterViewMBean {
     @MBeanInfo("Name of this persistence adapter.")
     String getName();
 
-    @MBeanInfo("Current inflight local transactions.")
+    @MBeanInfo("Inflight transactions.")
     String getTransactions();
 
     @MBeanInfo("Current data.")

http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 4775f1b..78e26a9 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -563,6 +563,18 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 }
             }
         }
+        synchronized (preparedTransactions) {
+            if (!preparedTransactions.isEmpty()) {
+                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet())
{
+                    TranInfo info = new TranInfo();
+                    info.id = entry.getKey();
+                    for (Operation operation : entry.getValue()) {
+                        info.track(operation);
+                    }
+                    infos.add(info);
+                }
+            }
+        }
         return infos.toString();
     }
 
@@ -2290,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     @SuppressWarnings("rawtypes")
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions
= new LinkedHashMap<TransactionId, List<Operation>>();
     protected final Set<String> ackedAndPrepared = new HashSet<String>();
+    protected final Set<String> rolledBackAcks = new HashSet<String>();
 
     // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome
is rollback,
     // till then they are skipped by the store.
@@ -2305,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         }
     }
 
-    public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException
{
+    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws
IOException {
         if (acks != null) {
             this.indexLock.writeLock().lock();
             try {
                 for (MessageAck ack : acks) {
-                    ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
+                    final String id = ack.getLastMessageId().toProducerKey();
+                    ackedAndPrepared.remove(id);
+                    if (rollback) {
+                        rolledBackAcks.add(id);
+                    }
                 }
             } finally {
                 this.indexLock.writeLock().unlock();
@@ -2933,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             return lastGetPriority;
         }
 
+        public boolean alreadyDispatched(Long sequence) {
+            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition
>= sequence) ||
+                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition
>= sequence) ||
+                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition
>= sequence);
+        }
+
         class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
             Iterator<Entry<Long, MessageKeys>>currentIterator;
             final Iterator<Entry<Long, MessageKeys>>highIterator;

http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index edab489..fb570b2 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -25,10 +25,13 @@ import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import junit.framework.Test;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
 import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.*;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.JMXSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +80,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         DataArrayResponse dar = (DataArrayResponse)response;
         assertEquals(4, dar.getData().length);
 
+        // view prepared in kahadb view
+        if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
+            PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
+            String txFromView = kahadbView.getTransactions();
+            LOG.info("Tx view fromm PA:" + txFromView);
+            assertTrue("xid with our dud format in transaction string " + txFromView, txFromView.contains("XID:[55,"));
+        }
+
         // restart the broker.
         restartBroker();
 
@@ -125,6 +136,12 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
         }
     }
 
+    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws
MalformedObjectNameException, JMSException {
+       return (PersistenceAdapterViewMBean)broker.getManagementContext().newProxyInstance(
+               BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(),
name),
+               PersistenceAdapterViewMBean.class, true);
+    }
+
     private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId
xid) throws MalformedObjectNameException, JMSException {
 
         ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,Xid="
+
@@ -216,7 +233,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
 
         // Commit the prepared transactions.
         for (int i = 0; i < dar.getData().length; i++) {
-            connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
+            connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId)
dar.getData()[i]));
         }
 
         // We should get the committed transactions.
@@ -304,7 +321,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
 
         // Commit the prepared transactions.
         for (int i = 0; i < dar.getData().length; i++) {
-            connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)
dar.getData()[i]));
+            connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId)
dar.getData()[i]));
         }
 
         // We should get the committed transactions.
@@ -1057,7 +1074,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
             }
             MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
             ack.setTransactionId(txid);
-            connection.send(ack);
+            connection.request(ack);
         }
 
         // Don't commit


Mime
View raw message