activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r923699 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/java/org/apache/activemq/t...
Date Tue, 16 Mar 2010 11:46:50 GMT
Author: dejanb
Date: Tue Mar 16 11:46:50 2010
New Revision: 923699

URL: http://svn.apache.org/viewvc?rev=923699&view=rev
Log:
merging 923300 and 923694 - https://issues.apache.org/activemq/browse/AMQ-2594 - jdbc order
of messages

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
      - copied unchanged from r923300, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
      - copied unchanged from r923300, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
      - copied unchanged from r923300, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/resources/log4j.properties

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Tue Mar 16 11:46:50 2010
@@ -171,7 +171,9 @@ public abstract class AbstractStoreCurso
             if (cacheEnabled) {
                 cacheEnabled=false;
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size);
+                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
+                            + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
+                            + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
                 }
                 // sync with store on disabling the cache
                 if (lastCachedId != null) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Tue Mar 16 11:46:50 2010
@@ -34,11 +34,11 @@ public interface JDBCAdapter {
 
     void doDropTables(TransactionContext c) throws SQLException, IOException;
 
-    void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
byte[] data, long expiration) throws SQLException, IOException;
+    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data, long expiration) throws SQLException, IOException;
 
-    void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination
destination, long expirationTime, String messageRef) throws SQLException, IOException;
+    void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId,
ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException,
IOException;
 
-    byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
+    byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
 
     String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
 
@@ -58,7 +58,7 @@ public interface JDBCAdapter {
 
     SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) throws SQLException, IOException;
 
-    long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException;
+    long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException;
 
     void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws
SQLException, IOException;
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Tue Mar 16 11:46:50 2010
@@ -44,9 +44,10 @@ public class JDBCMessageStore extends Ab
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
-    protected AtomicLong lastMessageId = new AtomicLong(-1);
-    protected ActiveMQMessageAudit audit;
+    protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
 
+    protected ActiveMQMessageAudit audit;
+    
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
         super(destination);
         this.persistenceAdapter = persistenceAdapter;
@@ -67,6 +68,8 @@ public class JDBCMessageStore extends Ab
             return;
         }
         
+        long sequenceId = persistenceAdapter.getNextSequenceId();
+        
         // Serialize the Message..
         byte data[];
         try {
@@ -78,8 +81,8 @@ public class JDBCMessageStore extends Ab
 
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
-        try {
-            adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
+        try {      
+            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + "
in container: " + e, e);
@@ -92,7 +95,7 @@ public class JDBCMessageStore extends Ab
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-            adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
+            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId,
destination, expirationTime, messageRef);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + "
in container: " + e, e);
@@ -102,13 +105,10 @@ public class JDBCMessageStore extends Ab
     }
 
     public Message getMessage(MessageId messageId) throws IOException {
-
-        long id = messageId.getBrokerSequenceId();
-
         // Get a connection and pull the message out of the DB
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            byte data[] = adapter.doGetMessage(c, id);
+            byte data[] = adapter.doGetMessage(c, messageId);
             if (data == null) {
                 return null;
             }
@@ -143,7 +143,8 @@ public class JDBCMessageStore extends Ab
     }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
-        long seq = ack.getLastMessageId().getBrokerSequenceId();
+    	
+    	long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
 
         // Get a connection and remove the message from the DB
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -225,14 +226,14 @@ public class JDBCMessageStore extends Ab
         TransactionContext c = persistenceAdapter.getTransactionContext();
 
         try {
-            adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned,
new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned,
new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception
{
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         listener.recoverMessage(msg);
-                        lastMessageId.set(sequenceId);
+                        lastStoreSequenceId.set(sequenceId);
                         return true;
                     }
                     return false;
@@ -259,13 +260,38 @@ public class JDBCMessageStore extends Ab
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
     public void resetBatching() {
-        lastMessageId.set(-1);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId:
" + lastStoreSequenceId.get());
+        }
+        lastStoreSequenceId.set(-1);
 
     }
 
     @Override
     public void setBatch(MessageId messageId) {
-        lastMessageId.set(messageId.getBrokerSequenceId());
+        long storeSequenceId = -1;
+        try {
+            storeSequenceId = getStoreSequenceIdForMessageId(messageId);
+        } catch (IOException ignoredAsAlreadyLogged) {
+            // reset batch in effect with default -1 value
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId
+ ",existing last seqId: " + lastStoreSequenceId.get());
+        }
+        lastStoreSequenceId.set(storeSequenceId);
     }
 
+    private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
+        long result = -1;
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            result = adapter.getStoreSequenceId(c, messageId);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId:
" + messageId +", on: " + destination + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Tue Mar 16 11:46:50 2010
@@ -46,6 +46,7 @@ import org.apache.activemq.store.memory.
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -93,6 +94,8 @@ public class JDBCPersistenceAdapter exte
     protected boolean enableAudit=true;
     protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
+    
+    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
 
     public JDBCPersistenceAdapter() {
     }
@@ -152,6 +155,28 @@ public class JDBCPersistenceAdapter exte
             }
     	}
     }
+    
+    public void initSequenceIdGenerator() {
+        TransactionContext c = null;
+        try {
+            c = getTransactionContext();
+            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener()
{
+                public void messageId(MessageId id) {
+                    audit.isDuplicate(id);
+                }
+            });
+        } catch (Exception e) {
+            LOG.error("Failed to reload store message audit for JDBC persistence adapter",
e);
+        } finally {
+            if (c != null) {
+                try {
+                    c.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+        
+    }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
         MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination,
audit);
@@ -655,5 +680,11 @@ public class JDBCPersistenceAdapter exte
     public void setAuditRecoveryDepth(int auditRecoveryDepth) {
         this.auditRecoveryDepth = auditRecoveryDepth;
     }
+
+    public long getNextSequenceId() {
+        synchronized(sequenceGenerator) {
+            return sequenceGenerator.getNextSequenceId();
+        }
+    }
     
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Tue Mar 16 11:46:50 2010
@@ -46,10 +46,10 @@ public class JDBCTopicMessageStore exten
     }
 
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId) throws IOException {
-        long seq = messageId.getBrokerSequenceId();
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
+        	long seq = adapter.getStoreSequenceId(c, messageId);
             adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Tue Mar 16 11:46:50 2010
@@ -134,7 +134,7 @@ public class Statements {
 
     public String getFindMessageStatement() {
         if (findMessageStatement == null) {
-            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE
ID=?";
+            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE
MSGID_PROD=? AND MSGID_SEQ=?";
         }
         return findMessageStatement;
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Tue Mar 16 11:46:50 2010
@@ -23,11 +23,8 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
-import java.util.TreeSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -57,7 +54,6 @@ public class DefaultJDBCAdapter implemen
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments = true;
-    private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException
{
         s.setBytes(index, data);
@@ -167,7 +163,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination
destination, byte[] data,
+    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data,
             long expiration) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
@@ -177,7 +173,7 @@ public class DefaultJDBCAdapter implemen
                     c.setAddMessageStatement(s);
                 }
             }
-            s.setLong(1, messageID.getBrokerSequenceId());
+            s.setLong(1, sequence);
             s.setString(2, messageID.getProducerId().toString());
             s.setLong(3, messageID.getProducerSequenceId());
             s.setString(4, destination.getQualifiedName());
@@ -197,7 +193,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination
destination,
+    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID,
ActiveMQDestination destination,
             long expirationTime, String messageRef) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
@@ -225,7 +221,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException {
+    public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
@@ -243,12 +239,13 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException
{
+    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException
{
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
-            s.setLong(1, seq);
+            s.setString(1, id.getProducerId().toString());
+            s.setLong(2, id.getProducerSequenceId());
             rs = s.executeQuery();
             if (!rs.next()) {
                 return null;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
Tue Mar 16 11:46:50 2010
@@ -67,18 +67,20 @@ public class LocalTransaction extends Tr
 
         setState(Transaction.FINISHED_STATE);
         context.getTransactions().remove(xid);
-        transactionStore.commit(getTransactionId(), false);
+        synchronized (transactionStore) {
+            transactionStore.commit(getTransactionId(), false);
 
-        try {
-            fireAfterCommit();
-        } catch (Throwable e) {
-            // I guess this could happen. Post commit task failed
-            // to execute properly.
-            LOG.warn("POST COMMIT FAILED: ", e);
-            XAException xae = new XAException("POST COMMIT FAILED");
-            xae.errorCode = XAException.XAER_RMERR;
-            xae.initCause(e);
-            throw xae;
+            try {
+                fireAfterCommit();
+            } catch (Throwable e) {
+                // I guess this could happen. Post commit task failed
+                // to execute properly.
+                LOG.warn("POST COMMIT FAILED: ", e);
+                XAException xae = new XAException("POST COMMIT FAILED");
+                xae.errorCode = XAException.XAER_RMERR;
+                xae.initCause(e);
+                throw xae;
+            }
         }
     }
 
@@ -90,16 +92,18 @@ public class LocalTransaction extends Tr
         }
         setState(Transaction.FINISHED_STATE);
         context.getTransactions().remove(xid);
-        transactionStore.rollback(getTransactionId());
+        synchronized (transactionStore) {
+           transactionStore.rollback(getTransactionId());
 
-        try {
-            fireAfterRollback();
-        } catch (Throwable e) {
-            LOG.warn("POST ROLLBACK FAILED: ", e);
-            XAException xae = new XAException("POST ROLLBACK FAILED");
-            xae.errorCode = XAException.XAER_RMERR;
-            xae.initCause(e);
-            throw xae;
+            try {
+                fireAfterRollback();
+            } catch (Throwable e) {
+                LOG.warn("POST ROLLBACK FAILED: ", e);
+                XAException xae = new XAException("POST ROLLBACK FAILED");
+                xae.errorCode = XAException.XAER_RMERR;
+                xae.initCause(e);
+                throw xae;
+            }
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Tue Mar 16 11:46:50 2010
@@ -135,6 +135,7 @@ public class NegativeQueueTest extends T
     
     public void testWithNoPrefetch() throws Exception{
         PREFETCH_SIZE = 1;
+        NUM_CONSUMERS = 20;
         blastAndConsume();
     }
     
@@ -192,7 +193,7 @@ public class NegativeQueueTest extends T
             consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix],
consumerSession, QUEUE_2_NAME, latch1, consumerList1));
         }
         
-        latch1.await(300000, TimeUnit.MILLISECONDS);
+        latch1.await(200000, TimeUnit.MILLISECONDS);
         if(DEBUG){
             System.out.println("");
             System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
@@ -295,6 +296,11 @@ public class NegativeQueueTest extends T
         PolicyEntry policy = new PolicyEntry();
         policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); 
         policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
+        
+        // disable the cache to be sure setBatch is the problem
+        // will get lots of duplicates
+        // policy.setUseCache(false);
+        
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
Tue Mar 16 11:46:50 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.Atomi
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -33,6 +34,7 @@ import org.apache.activemq.command.Messa
 abstract public class PersistenceAdapterTestSupport extends TestCase {
 
     protected PersistenceAdapter pa;
+    protected BrokerService brokerService = new BrokerService();
 
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws
Exception;
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
Tue Mar 16 11:46:50 2010
@@ -28,6 +28,8 @@ public class JDBCPersistenceAdapterTest 
     
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException
{
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        brokerService.setPersistenceAdapter(jdbc);
+        jdbc.setBrokerService(brokerService);
         EmbeddedDataSource dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/resources/log4j.properties?rev=923699&r1=923698&r2=923699&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/resources/log4j.properties Tue Mar
16 11:46:50 2010
@@ -21,6 +21,9 @@
 log4j.rootLogger=INFO, out, stdout
 
 #log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
+log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
+log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender



Mime
View raw message