activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1025939 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/store/ main/java/org/apa...
Date Thu, 21 Oct 2010 10:58:52 GMT
Author: gtully
Date: Thu Oct 21 10:58:51 2010
New Revision: 1025939

URL: http://svn.apache.org/viewvc?rev=1025939&view=rev
Log:
jdbc variant of https://issues.apache.org/activemq/browse/AMQ-2985 - jdbc store cannot ack out of order, the cleanup task query needed to be priority aware. fix for https://issues.apache.org/activemq/browse/AMQ-2980 JDBC - priority needed to be considered in count and recovery, additional tests included

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
    activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Oct 21 10:58:51 2010
@@ -78,6 +78,7 @@ public class DurableTopicSubscription ex
      */
     public void unmatched(MessageReference node) throws IOException {
         MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
         ack.setMessageID(node.getMessageId());
         node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
     }
@@ -111,14 +112,14 @@ public class DurableTopicSubscription ex
 
     public void activate(SystemUsage memoryManager, ConnectionContext context,
             ConsumerInfo info) throws Exception {
-        LOG.debug("Activating " + this);
         if (!active) {
             this.active = true;
             this.context = context;
             this.info = info;
+            LOG.debug("Activating " + this);
             int prefetch = info.getPrefetchSize();
             if (prefetch>0) {
-            prefetch += prefetch/2;
+                prefetch += prefetch/2;
             }
             int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
             this.pending.setMaxAuditDepth(depth);
@@ -150,7 +151,7 @@ public class DurableTopicSubscription ex
     }
 
     public void deactivate(boolean keepDurableSubsActive) throws Exception {
-        LOG.debug("Dectivating " + this);
+        LOG.debug("Deactivating " + this);
         active = false;
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {
@@ -198,7 +199,7 @@ public class DurableTopicSubscription ex
         }
         prefetchExtension = 0;
     }
-    
+
     
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
         MessageDispatch md = super.createMessageDispatch(node, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Oct 21 10:58:51 2010
@@ -502,7 +502,7 @@ public class Topic extends BaseDestinati
         if (topicStore != null && node.isPersistent()) {
             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
             SubscriptionKey key = dsub.getSubscriptionKey();
-            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
+            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack);
         }
         messageConsumed(context, node);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Oct 21 10:58:51 2010
@@ -248,6 +248,10 @@ public abstract class AbstractStoreCurso
     
     
     protected final synchronized void fillBatch() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
+                    + ", hasMessages=" + this.storeHasMessages + ", size=" + this.size);
+        }
         if (batchResetNeeded) {
             resetBatch();
             this.batchResetNeeded = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java Thu Oct 21 10:58:51 2010
@@ -58,7 +58,12 @@ public class MessageAck extends BaseComm
      * The  ack case where a client wants only an individual message to be discarded.
      */
     public static final byte INDIVIDUAL_ACK_TYPE = 4;
-    
+
+/**
+     * The ack case where a durable topic subscription does not match a selector.
+     */
+    public static final byte UNMATCHED_ACK_TYPE = 5;
+
     protected byte ackType;
     protected ConsumerId consumerId;
     protected MessageId firstMessageId;
@@ -118,6 +123,10 @@ public class MessageAck extends BaseComm
         return ackType == INDIVIDUAL_ACK_TYPE;
     }
 
+    public boolean isUnmatchedAck() {
+        return ackType == UNMATCHED_ACK_TYPE;
+    }
+
     /**
      * @openwire:property version=1 cache=true
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -74,8 +74,8 @@ public class ProxyTopicMessageStore impl
     }
 
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                            MessageId messageId) throws IOException {
-        delegate.acknowledge(context, clientId, subscriptionName, messageId);
+                            MessageId messageId, MessageAck ack) throws IOException {
+        delegate.acknowledge(context, clientId, subscriptionName, messageId, ack);
     }
 
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.store;
 import java.io.IOException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 
@@ -39,7 +40,7 @@ public interface TopicMessageStore exten
      * @param subscriptionPersistentId
      * @throws IOException
      */
-    void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
+    void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException;
     
     /**
      * @param clientId

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -79,7 +79,8 @@ public class AMQTopicMessageStore extend
 
     /**
      */
-    public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
+    public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName,
+                            final MessageId messageId, final MessageAck originalAck) throws IOException {
         final boolean debug = LOG.isDebugEnabled();
         JournalTopicAck ack = new JournalTopicAck();
         ack.setDestination(destination);
@@ -140,7 +141,7 @@ public class AMQTopicMessageStore extend
         try {
             SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
             if (sub != null) {
-                topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
+                topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null);
                 return true;
             }
         } catch (Throwable e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Oct 21 10:58:51 2010
@@ -83,9 +83,9 @@ public interface JDBCAdapter {
 
     int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
 
-    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
+    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
 
-    long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
+    long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
 
     void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Oct 21 10:58:51 2010
@@ -44,7 +44,8 @@ public class JDBCMessageStore extends Ab
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
-    protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
+    protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
+    protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
 
     protected ActiveMQMessageAudit audit;
     
@@ -144,7 +145,7 @@ public class JDBCMessageStore extends Ab
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
     	
-    	long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
+    	long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
 
         // Get a connection and remove the message from the DB
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -225,14 +226,15 @@ public class JDBCMessageStore extends Ab
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         listener.recoverMessage(msg);
-                        lastStoreSequenceId.set(sequenceId);
+                        lastRecoveredSequenceId.set(sequenceId);
+                        lastRecoveredPriority.set(msg.getPriority());
                         return true;
                     }
                     return false;
@@ -259,32 +261,35 @@ public class JDBCMessageStore extends Ab
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
     public void resetBatching() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
         }
-        lastStoreSequenceId.set(-1);
+        lastRecoveredSequenceId.set(-1);
+        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
 
     }
 
     @Override
     public void setBatch(MessageId messageId) {
-        long storeSequenceId = -1;
         try {
-            storeSequenceId = getStoreSequenceIdForMessageId(messageId);
+            long[] storedValues = getStoreSequenceIdForMessageId(messageId);
+            lastRecoveredSequenceId.set(storedValues[0]);
+            lastRecoveredPriority.set(storedValues[1]);
         } catch (IOException ignoredAsAlreadyLogged) {
-            // reset batch in effect with default -1 value
+            lastRecoveredSequenceId.set(-1);
+            lastRecoveredPriority.set(Byte.MAX_VALUE -1);
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
+                    + ", priority: " + lastRecoveredPriority.get());
         }
-        lastStoreSequenceId.set(storeSequenceId);
     }
 
-    private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
-        long result = -1;
+    private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
+        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            result = adapter.getStoreSequenceId(c, destination, messageId)[0];
+            result = adapter.getStoreSequenceId(c, destination, messageId);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Oct 21 10:58:51 2010
@@ -527,6 +527,7 @@ public class JDBCPersistenceAdapter exte
             getAdapter().doDropTables(c);
             getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
             getAdapter().doCreateTables(c);
+            LOG.info("Persistence store purged.");
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create(e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.ActiveMQMessa
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -33,12 +34,15 @@ import org.apache.activemq.store.TopicMe
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.6 $
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
+    private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
     private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
     private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String, AtomicLong>();
 
@@ -46,12 +50,21 @@ public class JDBCTopicMessageStore exten
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+        if (ack != null && ack.isUnmatchedAck()) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
+            }
+            return;
+        }
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
         	long[] res = adapter.getStoreSequenceId(c, destination, messageId);
             adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]);
+            }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
@@ -93,12 +106,15 @@ public class JDBCTopicMessageStore exten
         AtomicLong last = subscriberLastMessageMap.get(subcriberId);
         AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
         if (last == null) {
-            long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
-            last = new AtomicLong(lastAcked);
+            long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
+            last = new AtomicLong(lastAcked[0]);
             subscriberLastMessageMap.put(subcriberId, last);
-            priority = new AtomicLong(Byte.MAX_VALUE - 1);
+            priority = new AtomicLong(lastAcked[1]);
             subscriberLastMessageMap.put(subcriberId, priority);
         }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority);
+        }
         final AtomicLong finalLast = last;
         final AtomicLong finalPriority = priority;
         try {
@@ -137,10 +153,6 @@ public class JDBCTopicMessageStore exten
         subscriberLastPriorityMap.remove(subcriberId);
     }
 
-    /**
-     * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
-     *      boolean)
-     */
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -207,6 +219,9 @@ public class JDBCTopicMessageStore exten
         } finally {
             c.close();
         }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
+        }
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Oct 21 10:58:51 2010
@@ -307,7 +307,7 @@ public class Statements {
                                                      + getFullAckTableName()
                                                      + " D "
                                                      + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-                                                     + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+                                                     + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY";
         }
         return durableSubscriberMessageCountStatement;
     }
@@ -336,11 +336,17 @@ public class Statements {
     public String getDeleteOldMessagesStatement() {
         if (deleteOldMessagesStatement == null) {
             deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
-                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID < "
-                                         + "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
-                                         + "FROM " + getFullAckTableName() + " WHERE "
-                                         + getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()
-                                         + ".CONTAINER)";
+                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+                                         + " OR (ID < "
+                                         + "   ( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID)"
+                                         + "      FROM " + getFullAckTableName() + " WHERE "
+                                         +          getFullAckTableName() + ".CONTAINER="
+                                         +          getFullMessageTableName() + ".CONTAINER )"
+                                         + "   AND PRIORITY >= "
+                                         + "   ( SELECT min(" + getFullAckTableName() + ".PRIORITY) "
+                                         + "     FROM " + getFullAckTableName() + " WHERE "
+                                         +          getFullAckTableName() + ".CONTAINER="
+                                         + getFullMessageTableName() + ".CONTAINER ))";
         }
         return deleteOldMessagesStatement;
     }
@@ -391,7 +397,9 @@ public class Statements {
     public String getFindNextMessagesByPriorityStatement() {
         if (findNextMessagesByPriorityStatement == null) {
             findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
-                                        + " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID";
+                                        + " WHERE CONTAINER=?"
+                                        + " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)"
+                                        + " ORDER BY PRIORITY DESC, ID";
         }
         return findNextMessagesByPriorityStatement;
     }    
@@ -401,9 +409,11 @@ public class Statements {
      */
     public String getLastAckedDurableSubscriberMessageStatement() {
         if (lastAckedDurableSubscriberMessageStatement == null) {
-            lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM "
+            lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID), PRIORITY FROM "
                                                          + getFullAckTableName()
-                                                         + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+                                                         + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
+                                                         + " GROUP BY PRIORITY"
+                                                         + " ORDER BY PRIORITY ASC";
         }
         return lastAckedDurableSubscriberMessageStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Oct 21 10:58:51 2010
@@ -508,8 +508,6 @@ public class DefaultJDBCAdapter implemen
      * @param retroactive 
      * @throws SQLException 
      * @throws IOException 
-     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
-     *      org.apache.activemq.service.SubscriptionInfo)
      */
     public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
             throws SQLException, IOException {
@@ -644,11 +642,11 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
+    public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
             String clientId, String subscriberName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        long result = -1;
+        long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
             s.setString(1, destination.getQualifiedName());
@@ -656,7 +654,8 @@ public class DefaultJDBCAdapter implemen
             s.setString(3, subscriberName);
             rs = s.executeQuery();
             if (rs.next()) {
-                result = rs.getLong(1);
+                result[0] = rs.getLong(1);
+                result[1] = rs.getLong(2);
             }
             rs.close();
             s.close();
@@ -784,7 +783,7 @@ public class DefaultJDBCAdapter implemen
     }
 
     public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
-            int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+            long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
@@ -795,8 +794,10 @@ public class DefaultJDBCAdapter implemen
             }
             s.setMaxRows(maxReturned * 2);
             s.setString(1, destination.getQualifiedName());
-            if (!isPrioritizedMessages()) {
-                s.setLong(2, nextSeq);
+            s.setLong(2, nextSeq);
+            if (isPrioritizedMessages()) {
+                s.setLong(3, priority);
+                s.setLong(4, priority);
             }
             rs = s.executeQuery();
             int count = 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -25,6 +25,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -82,7 +83,7 @@ public class JournalTopicMessageStore ex
     /**
      */
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                            final MessageId messageId) throws IOException {
+                            final MessageId messageId, MessageAck originalAck) throws IOException {
         final boolean debug = LOG.isDebugEnabled();
 
         JournalTopicAck ack = new JournalTopicAck();
@@ -138,7 +139,7 @@ public class JournalTopicMessageStore ex
         try {
             SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
             if (sub != null) {
-                longTermStore.acknowledge(context, clientId, subscritionName, messageId);
+                longTermStore.acknowledge(context, clientId, subscritionName, messageId, null);
             }
         } catch (Throwable e) {
             LOG.debug("Could not replay acknowledge for message '" + messageId
@@ -177,7 +178,7 @@ public class JournalTopicMessageStore ex
                     SubscriptionKey subscriptionKey = iterator.next();
                     MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
                     longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
-                                              subscriptionKey.subscriptionName, identity);
+                                              subscriptionKey.subscriptionName, identity, null);
                 }
 
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
@@ -79,7 +80,7 @@ public class KahaTopicMessageStore exten
     }
 
     public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                                         MessageId messageId) throws IOException {
+                                         MessageId messageId, MessageAck ack) throws IOException {
         String subcriberId = getSubscriptionKey(clientId, subscriptionName);
         TopicSubContainer container = subscriberMessages.get(subcriberId);
         if (container != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Oct 21 10:58:51 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
@@ -224,7 +225,7 @@ public class KahaTopicReferenceStore ext
     }
 
     public void acknowledge(ConnectionContext context,
-			String clientId, String subscriptionName, MessageId messageId) throws IOException {
+			String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
 	    acknowledgeReference(context, clientId, subscriptionName, messageId);
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Oct 21 10:58:51 2010
@@ -611,7 +611,8 @@ public class KahaDBStore extends Message
             }
         }
 
-        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                MessageId messageId, MessageAck ack)
                 throws IOException {
             String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             if (isConcurrentStoreAndDispatchTopics()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Thu Oct 21 10:58:51 2010
@@ -277,7 +277,8 @@ public class TempKahaDBStore extends Tem
             super(destination);
         }
         
-        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                MessageId messageId, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Thu Oct 21 10:58:51 2010
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -66,7 +67,8 @@ public class MemoryTopicMessageStore ext
         }
     }
 
-    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                         MessageId messageId, MessageAck ack) throws IOException {
         SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
         MemoryTopicSub sub = topicSubMap.get(key);
         if (sub != null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java Thu Oct 21 10:58:51 2010
@@ -74,7 +74,7 @@ public abstract class TestSupport extend
      * Returns the name of the destination used in this test case
      */
     protected String getDestinationString() {
-        return getClass().getName() + "." + getName();
+        return getClass().getName() + "." + getName(true);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Thu Oct 21 10:58:51 2010
@@ -47,14 +47,15 @@ abstract public class MessagePriorityTes
     
     ActiveMQConnectionFactory factory;
     Connection conn;
-    Session sess;
+    protected Session sess;
     
     public boolean useCache;
+    public boolean dispatchAsync = false;
     public int prefetchVal = 500;
 
-    int MSG_NUM = 1000;
-    int HIGH_PRI = 7;
-    int LOW_PRI = 3;
+    public int MSG_NUM = 600;
+    public int HIGH_PRI = 7;
+    public int LOW_PRI = 3;
     
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
     
@@ -78,6 +79,7 @@ abstract public class MessagePriorityTes
         prefetch.setAll(prefetchVal);
         factory.setPrefetchPolicy(prefetch);
         factory.setWatchTopicAdvisories(false);
+        factory.setDispatchAsync(dispatchAsync);
         conn = factory.createConnection();
         conn.setClientID("priority");
         conn.start();
@@ -110,7 +112,7 @@ abstract public class MessagePriorityTes
         
     }
     
-    class ProducerThread extends Thread {
+    protected class ProducerThread extends Thread {
 
         int priority;
         int messageCount;
@@ -154,7 +156,8 @@ abstract public class MessagePriorityTes
         
         MessageConsumer queueConsumer = sess.createConsumer(queue);
         for (int i = 0; i < MSG_NUM * 2; i++) {
-            Message msg = queueConsumer.receive(1000);
+            Message msg = queueConsumer.receive(5000);
+            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
         }
@@ -196,6 +199,7 @@ abstract public class MessagePriorityTes
 
     public void initCombosForTestDurableSubsReconnect() {
         addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)});
+        addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE});
     }
     
     public void testDurableSubsReconnect() throws Exception {
@@ -217,7 +221,8 @@ abstract public class MessagePriorityTes
         final int closeFrequency = MSG_NUM/4;
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i = 0; i < MSG_NUM * 2; i++) {
-            Message msg = sub.receive(5000);
+            Message msg = sub.receive(30000);
+            LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
             if (i>0 && i%closeFrequency==0) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Thu Oct 21 10:58:51 2010
@@ -17,14 +17,20 @@
 
 package org.apache.activemq.store.jdbc;
 
+import javax.jms.Message;
+import javax.jms.TopicSubscriber;
 import junit.framework.Test;
-
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.MessagePriorityTest;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
 public class JDBCMessagePriorityTest extends MessagePriorityTest {
 
+    private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class);
+    
     @Override
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
@@ -33,16 +39,56 @@ public class JDBCMessagePriorityTest ext
         dataSource.setCreateDatabase("create");
         jdbc.setDataSource(dataSource);
         jdbc.deleteAllMessages();
+        jdbc.setCleanupPeriod(1000);
         return jdbc;
     }
-    
+
+    // this cannot be a general test as kahaDB just has support for 3 priority levels
+    public void testDurableSubsReconnectWithFourLevels() throws Exception {
+        ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+        final String subName = "priorityDisconnect";
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        final int MED_PRI = LOW_PRI + 1;
+        final int MED_HIGH_PRI = HIGH_PRI - 1;
+
+        ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+        ProducerThread medPri = new ProducerThread(topic, MSG_NUM, MED_PRI);
+        ProducerThread medHighPri = new ProducerThread(topic, MSG_NUM, MED_HIGH_PRI);
+        ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+        lowPri.start();
+        highPri.start();
+        medPri.start();
+        medHighPri.start();
+
+        lowPri.join();
+        highPri.join();
+        medPri.join();
+        medHighPri.join();
+
+
+        final int closeFrequency = MSG_NUM;
+        final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI};
+        sub = sess.createDurableSubscriber(topic, subName);
+        for (int i = 0; i < MSG_NUM * 4; i++) {
+            Message msg = sub.receive(30000);
+            LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null));
+            assertNotNull("Message " + i + " was null", msg);
+            assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority());
+            if (i > 0 && i % closeFrequency == 0) {
+                LOG.info("Closing durable sub.. on: " + i);
+                sub.close();
+                sub = sess.createDurableSubscriber(topic, subName);
+            }
+        }
+        LOG.info("closing on done!");
+        sub.close();
+    }
+
     public static Test suite() {
         return suite(JDBCMessagePriorityTest.class);
     }
 
-    // pending fix...
-    @Override
-    public void testDurableSubsReconnect() throws Exception {
-        // TODO: fix jdbc durable sub recovery 
-    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Oct 21 10:58:51 2010
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.usecases;
 
+import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 
@@ -27,11 +30,12 @@ import java.io.File;
 
 public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
 
+    public Boolean usePrioritySupport = Boolean.TRUE;
     private BrokerService broker;
     private ActiveMQTopic topic;
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://" + getName());
+        return new ActiveMQConnectionFactory("vm://" + getName(true));
     }
 
     @Override
@@ -42,6 +46,10 @@ public class DurableSubscriptionOfflineT
         return con;
     }
 
+    public static Test suite() {
+        return suite(DurableSubscriptionOfflineTest.class);
+    }
+    
     protected void setUp() throws Exception {
         topic = (ActiveMQTopic) createDestination();
         createBroker();
@@ -54,15 +62,19 @@ public class DurableSubscriptionOfflineT
     }
 
     private void createBroker() throws Exception {
-        broker = BrokerFactory.createBroker("broker:(vm://localhost)");
-        broker.setBrokerName(getName());
+        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
+        broker.setBrokerName(getName(true));
         broker.setDeleteAllMessagesOnStartup(true);
 
-        broker.setPersistent(true);
-        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
-        persistenceAdapter.setDirectory(new File("activemq-data-kaha/" + getName()));
-        broker.setPersistenceAdapter(persistenceAdapter);
-
+        if (usePrioritySupport) {
+            PolicyEntry policy = new PolicyEntry();
+            policy.setPrioritizedMessages(true);
+            PolicyMap policyMap = new PolicyMap();
+            policyMap.setDefaultEntry(policy);
+            broker.setDestinationPolicy(policyMap);
+        }
+        
+        setDefaultPersistenceAdapter(broker);
         broker.start();
     }
 
@@ -71,6 +83,13 @@ public class DurableSubscriptionOfflineT
             broker.stop();
     }
 
+    public void initCombosForTestOfflineSubscription() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+        this.addCombinationValues("usePrioritySupport",
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+    }
+
     public void testOfflineSubscription() throws Exception {
         // create durable subscription
         Connection con = createConnection();

Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Oct 21 10:58:51 2010
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
-#log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=DEBUG
 #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
 #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG



Mime
View raw message