activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r923300 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/broker/region/cur...
Date Mon, 15 Mar 2010 15:37:59 GMT
Author: dejanb
Date: Mon Mar 15 15:37:58 2010
New Revision: 923300

URL: http://svn.apache.org/viewvc?rev=923300&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2594 - jdbc and broker seq id - intial commit

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Mar 15 15:37:58 2010
@@ -171,7 +171,9 @@ public abstract class AbstractStoreCurso
             if (cacheEnabled) {
                 cacheEnabled=false;
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size);
+                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
+                            + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
+                            + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
                 }
                 // sync with store on disabling the cache
                 if (lastCachedId != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Mon Mar 15 15:37:58 2010
@@ -34,11 +34,11 @@ public interface JDBCAdapter {
 
     void doDropTables(TransactionContext c) throws SQLException, IOException;
 
-    void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
byte[] data, long expiration) throws SQLException, IOException;
+    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data, long expiration) throws SQLException, IOException;
 
-    void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination
destination, long expirationTime, String messageRef) throws SQLException, IOException;
+    void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId,
ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException,
IOException;
 
-    byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
+    byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
 
     String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
 
@@ -58,7 +58,7 @@ public interface JDBCAdapter {
 
     SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName) throws SQLException, IOException;
 
-    long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException;
+    long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException;
 
     void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws
SQLException, IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Mon Mar 15 15:37:58 2010
@@ -44,9 +44,10 @@ public class JDBCMessageStore extends Ab
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
-    protected AtomicLong lastMessageId = new AtomicLong(-1);
-    protected ActiveMQMessageAudit audit;
+    protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
 
+    protected ActiveMQMessageAudit audit;
+    
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
         super(destination);
         this.persistenceAdapter = persistenceAdapter;
@@ -67,6 +68,8 @@ public class JDBCMessageStore extends Ab
             return;
         }
         
+        long sequenceId = persistenceAdapter.getNextSequenceId();
+        
         // Serialize the Message..
         byte data[];
         try {
@@ -78,8 +81,8 @@ public class JDBCMessageStore extends Ab
 
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
-        try {
-            adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
+        try {      
+            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + "
in container: " + e, e);
@@ -92,7 +95,7 @@ public class JDBCMessageStore extends Ab
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-            adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
+            adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId,
destination, expirationTime, messageRef);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + "
in container: " + e, e);
@@ -102,13 +105,10 @@ public class JDBCMessageStore extends Ab
     }
 
     public Message getMessage(MessageId messageId) throws IOException {
-
-        long id = messageId.getBrokerSequenceId();
-
         // Get a connection and pull the message out of the DB
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            byte data[] = adapter.doGetMessage(c, id);
+            byte data[] = adapter.doGetMessage(c, messageId);
             if (data == null) {
                 return null;
             }
@@ -143,7 +143,8 @@ public class JDBCMessageStore extends Ab
     }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
-        long seq = ack.getLastMessageId().getBrokerSequenceId();
+    	
+    	long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
 
         // Get a connection and remove the message from the DB
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -225,14 +226,14 @@ public class JDBCMessageStore extends Ab
         TransactionContext c = persistenceAdapter.getTransactionContext();
 
         try {
-            adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned,
new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned,
new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception
{
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         listener.recoverMessage(msg);
-                        lastMessageId.set(sequenceId);
+                        lastStoreSequenceId.set(sequenceId);
                         return true;
                     }
                     return false;
@@ -259,13 +260,38 @@ public class JDBCMessageStore extends Ab
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
     public void resetBatching() {
-        lastMessageId.set(-1);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId:
" + lastStoreSequenceId.get());
+        }
+        lastStoreSequenceId.set(-1);
 
     }
 
     @Override
     public void setBatch(MessageId messageId) {
-        lastMessageId.set(messageId.getBrokerSequenceId());
+        long storeSequenceId = -1;
+        try {
+            storeSequenceId = getStoreSequenceIdForMessageId(messageId);
+        } catch (IOException ignoredAsAlreadyLogged) {
+            // reset batch in effect with default -1 value
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId
+ ",existing last seqId: " + lastStoreSequenceId.get());
+        }
+        lastStoreSequenceId.set(storeSequenceId);
     }
 
+    private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
+        long result = -1;
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            result = adapter.getStoreSequenceId(c, messageId);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId:
" + messageId +", on: " + destination + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Mon Mar 15 15:37:58 2010
@@ -46,6 +46,7 @@ import org.apache.activemq.store.memory.
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -93,6 +94,8 @@ public class JDBCPersistenceAdapter exte
     protected boolean enableAudit=true;
     protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
+    
+    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
 
     public JDBCPersistenceAdapter() {
     }
@@ -152,6 +155,28 @@ public class JDBCPersistenceAdapter exte
             }
     	}
     }
+    
+    public void initSequenceIdGenerator() {
+        TransactionContext c = null;
+        try {
+            c = getTransactionContext();
+            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener()
{
+                public void messageId(MessageId id) {
+                    audit.isDuplicate(id);
+                }
+            });
+        } catch (Exception e) {
+            LOG.error("Failed to reload store message audit for JDBC persistence adapter",
e);
+        } finally {
+            if (c != null) {
+                try {
+                    c.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+        
+    }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
         MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination,
audit);
@@ -655,5 +680,11 @@ public class JDBCPersistenceAdapter exte
     public void setAuditRecoveryDepth(int auditRecoveryDepth) {
         this.auditRecoveryDepth = auditRecoveryDepth;
     }
+
+    public long getNextSequenceId() {
+        synchronized(sequenceGenerator) {
+            return sequenceGenerator.getNextSequenceId();
+        }
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Mon Mar 15 15:37:58 2010
@@ -46,10 +46,10 @@ public class JDBCTopicMessageStore exten
     }
 
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId) throws IOException {
-        long seq = messageId.getBrokerSequenceId();
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
+        	long seq = adapter.getStoreSequenceId(c, messageId);
             adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Mon Mar 15 15:37:58 2010
@@ -134,7 +134,7 @@ public class Statements {
 
     public String getFindMessageStatement() {
         if (findMessageStatement == null) {
-            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE
ID=?";
+            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE
MSGID_PROD=? AND MSGID_SEQ=?";
         }
         return findMessageStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Mon Mar 15 15:37:58 2010
@@ -22,13 +22,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
@@ -59,7 +55,6 @@ public class DefaultJDBCAdapter implemen
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments = true;
-    private Set<Long> lastRecoveredMessagesIds = Collections.synchronizedSet(new TreeSet<Long>());
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException
{
         s.setBytes(index, data);
@@ -169,7 +164,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination
destination, byte[] data,
+    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data,
             long expiration) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
@@ -179,7 +174,7 @@ public class DefaultJDBCAdapter implemen
                     c.setAddMessageStatement(s);
                 }
             }
-            s.setLong(1, messageID.getBrokerSequenceId());
+            s.setLong(1, sequence);
             s.setString(2, messageID.getProducerId().toString());
             s.setLong(3, messageID.getProducerSequenceId());
             s.setString(4, destination.getQualifiedName());
@@ -199,7 +194,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination
destination,
+    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID,
ActiveMQDestination destination,
             long expirationTime, String messageRef) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
@@ -227,7 +222,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException {
+    public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
@@ -245,12 +240,13 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException
{
+    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException
{
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
-            s.setLong(1, seq);
+            s.setString(1, id.getProducerId().toString());
+            s.setLong(2, id.getProducerSequenceId());
             rs = s.executeQuery();
             if (!rs.next()) {
                 return null;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Mon Mar 15 15:37:58 2010
@@ -135,6 +135,7 @@ public class NegativeQueueTest extends T
     
     public void testWithNoPrefetch() throws Exception{
         PREFETCH_SIZE = 1;
+        NUM_CONSUMERS = 20;
         blastAndConsume();
     }
     
@@ -192,7 +193,7 @@ public class NegativeQueueTest extends T
             consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix],
consumerSession, QUEUE_2_NAME, latch1, consumerList1));
         }
         
-        latch1.await(300000, TimeUnit.MILLISECONDS);
+        latch1.await(200000, TimeUnit.MILLISECONDS);
         if(DEBUG){
             System.out.println("");
             System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize());
@@ -295,6 +296,11 @@ public class NegativeQueueTest extends T
         PolicyEntry policy = new PolicyEntry();
         policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); 
         policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
+        
+        // disable the cache to be sure setBatch is the problem
+        // will get lots of duplicates
+        // policy.setUseCache(false);
+        
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
Mon Mar 15 15:37:58 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.Atomi
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
@@ -33,6 +34,7 @@ import org.apache.activemq.command.Messa
 abstract public class PersistenceAdapterTestSupport extends TestCase {
 
     protected PersistenceAdapter pa;
+    protected BrokerService brokerService = new BrokerService();
 
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws
Exception;
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java?rev=923300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
Mon Mar 15 15:37:58 2010
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//  https://issues.apache.org/activemq/browse/AMQ-2594
+public abstract class StoreOrderTest {
+
+    private static final Log LOG = LogFactory.getLog(StoreOrderTest.class);
+    
+    protected BrokerService broker;
+    private ActiveMQConnection connection;
+    public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");
+    
+    protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;
+    protected void dumpMessages() throws Exception {}
+
+    public class TransactedSend implements Runnable {
+
+        private CountDownLatch readyForCommit;
+        private CountDownLatch firstDone;
+        private boolean first;
+        private Session session;
+        private MessageProducer producer;
+
+        public TransactedSend(CountDownLatch readyForCommit,
+                CountDownLatch firstDone, boolean b) throws Exception {
+            this.readyForCommit = readyForCommit;
+            this.firstDone = firstDone;
+            this.first = b;
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            producer = session.createProducer(destination);
+        }
+
+        public void run() {
+            try {
+                if (!first) {              
+                    firstDone.await(30, TimeUnit.SECONDS);
+                }
+                producer.send(session.createTextMessage(first ? "first" : "second"));
+                if (first) {
+                    firstDone.countDown();
+                }
+                readyForCommit.countDown();
+            
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("unexpected ex on run " + e);
+            }
+        }
+        
+        public void commit() throws Exception {
+            session.commit();
+            session.close();
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        broker = createBroker();
+        initConnection();
+    }
+    
+    public void initConnection() throws Exception {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setWatchTopicAdvisories(false);
+        connection.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+    
+    @Test
+    public void validateUnorderedTxCommit() throws Exception {
+        
+        Executor executor = Executors.newCachedThreadPool();
+        CountDownLatch readyForCommit = new CountDownLatch(2);
+        CountDownLatch firstDone = new CountDownLatch(1);
+        
+        TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
+        TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
+        executor.execute(first);
+        executor.execute(second);
+        
+        assertTrue("both started", readyForCommit.await(20, TimeUnit.SECONDS));
+        
+        LOG.info("commit out of order");        
+        // send interleaved so sequence id at time of commit could be reversed
+        second.commit();
+        
+        // force usage over the limit before second commit to flush cache
+        enqueueOneMessage();
+        
+        // can get lost in the cursor as it is behind the last sequenceId that was cached
+        first.commit();
+        
+        LOG.info("send/commit done..");
+        
+        dumpMessages();
+        
+        String received1, received2, received3 = null;
+        if (true) {
+            LOG.info("receive and rollback...");
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            received1 = receive(session);
+            received2 = receive(session);
+            received3 = receive(session);
+            
+            assertEquals("second", received1);
+            assertEquals("middle", received2);
+            assertEquals("first", received3);
+            
+            session.rollback();
+            session.close();
+        }
+        
+        
+        LOG.info("restart broker");
+        stopBroker();
+        broker = createRestartedBroker();
+        initConnection();
+        
+        if (true) {
+            LOG.info("receive and rollback after restart...");
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            received1 = receive(session);
+            received2 = receive(session);
+            received3 = receive(session);
+            assertEquals("second", received1);
+            assertEquals("middle", received2);
+            assertEquals("first", received3);
+            session.rollback();
+            session.close();
+        }
+        
+        LOG.info("receive and ack each message");
+        received1 = receiveOne();
+        received2 = receiveOne();
+        received3 = receiveOne();
+        
+        assertEquals("second", received1);
+        assertEquals("middle", received2);
+        assertEquals("first", received3);
+    }
+    
+    private void enqueueOneMessage() throws Exception {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(session.createTextMessage("middle"));
+        session.commit();
+        session.close();
+    }
+
+
+    private String receiveOne() throws Exception {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        String received = receive(session);
+        session.commit();
+        session.close();
+        return received;
+    }
+    
+    private String receive(Session session) throws Exception {
+        MessageConsumer consumer = session.createConsumer(destination);
+        String result = null;
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        if (message != null) {
+            LOG.info("got message: " + message);
+            result  = message.getText();
+        }
+        consumer.close();
+        return result;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        boolean deleteMessagesOnStartup = true;
+        return startBroker(deleteMessagesOnStartup);
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        boolean deleteMessagesOnStartup = false;
+        return startBroker(deleteMessagesOnStartup);
+    }   
+
+    protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception
{
+        BrokerService newBroker = new BrokerService();   
+        configureBroker(newBroker);
+        newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
+        newBroker.start();
+        return newBroker;
+    }
+    
+    protected void configureBroker(BrokerService brokerService) throws Exception {
+        setPersistentAdapter(brokerService);
+        brokerService.setAdvisorySupport(false);
+        
+        PolicyMap map = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setMemoryLimit(1024*3);
+        defaultEntry.setCursorMemoryHighWaterMark(68);
+        map.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(map);
+    }
+
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
Mon Mar 15 15:37:58 2010
@@ -28,6 +28,9 @@ public class JDBCPersistenceAdapterTest 
     
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException
{
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        brokerService.setSchedulerSupport(false);
+        brokerService.setPersistenceAdapter(jdbc);
+        jdbc.setBrokerService(brokerService);
         EmbeddedDataSource dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java?rev=923300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
Mon Mar 15 15:37:58 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.StoreOrderTest;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+//  https://issues.apache.org/activemq/browse/AMQ-2594
+public class JDBCStoreOrderTest extends StoreOrderTest {
+
+    private static final Log LOG = LogFactory.getLog(JDBCStoreOrderTest.class);
+    
+    @Override
+     protected void dumpMessages() throws Exception {
+        WireFormat wireFormat = new OpenWireFormat();
+        java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
+        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
   
+        ResultSet result = statement.executeQuery();
+        while(result.next()) {
+            long id = result.getLong(1);
+            Message message = (Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
+            LOG.error("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId()
+ ", MSG: " + message);
+        }
+        statement.close();
+        conn.close();
+    }
+    
+     @Override
+     protected void setPersistentAdapter(BrokerService brokerService)
+             throws Exception {
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        brokerService.setPersistenceAdapter(jdbc);
+    }
+
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java?rev=923300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
Mon Mar 15 15:37:58 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.StoreOrderTest;
+
+//  https://issues.apache.org/activemq/browse/AMQ-2594
+public class KahaDBStoreOrderTest extends StoreOrderTest {
+    
+    @Override
+    protected void setPersistentAdapter(BrokerService brokerService)
+             throws Exception {
+        KahaDBStore kaha = new KahaDBStore();
+        File directory = new File("target/activemq-data/kahadb/storeOrder");
+        kaha.setDirectory(directory);
+        brokerService.setPersistenceAdapter(kaha);
+    }
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=923300&r1=923299&r2=923300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Mon Mar 15 15:37:58 2010
@@ -21,6 +21,9 @@
 log4j.rootLogger=INFO, out, stdout
 
 #log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
+log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
+log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender



Mime
View raw message