Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 24549 invoked from network); 21 Oct 2010 10:59:57 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 Oct 2010 10:59:57 -0000 Received: (qmail 26309 invoked by uid 500); 21 Oct 2010 10:59:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 26241 invoked by uid 500); 21 Oct 2010 10:59:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 26234 invoked by uid 99); 21 Oct 2010 10:59:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Oct 2010 10:59:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Oct 2010 10:59:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 475F223889E1; Thu, 21 Oct 2010 10:58:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1025939 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/store/ main/java/org/apa... Date: Thu, 21 Oct 2010 10:58:52 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101021105853.475F223889E1@eris.apache.org> Author: gtully Date: Thu Oct 21 10:58:51 2010 New Revision: 1025939 URL: http://svn.apache.org/viewvc?rev=1025939&view=rev Log: jdbc variant of https://issues.apache.org/activemq/browse/AMQ-2985 - jdbc store cannot ack out of order, the cleanup task query needed to be priority aware. fix for https://issues.apache.org/activemq/browse/AMQ-2980 JDBC - priority needed to be considered in count and recovery, additional tests included Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java activemq/trunk/activemq-core/src/test/resources/log4j.properties Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Oct 21 10:58:51 2010 @@ -78,6 +78,7 @@ public class DurableTopicSubscription ex */ public void unmatched(MessageReference node) throws IOException { MessageAck ack = new MessageAck(); + ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE); ack.setMessageID(node.getMessageId()); node.getRegionDestination().acknowledge(this.getContext(), this, ack, node); } @@ -111,14 +112,14 @@ public class DurableTopicSubscription ex public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception { - LOG.debug("Activating " + this); if (!active) { this.active = true; this.context = context; this.info = info; + LOG.debug("Activating " + this); int prefetch = info.getPrefetchSize(); if (prefetch>0) { - prefetch += prefetch/2; + prefetch += prefetch/2; } int depth = Math.max(prefetch, this.pending.getMaxAuditDepth()); this.pending.setMaxAuditDepth(depth); @@ -150,7 +151,7 @@ public class DurableTopicSubscription ex } public void deactivate(boolean keepDurableSubsActive) throws Exception { - LOG.debug("Dectivating " + this); + LOG.debug("Deactivating " + this); active = false; this.usageManager.getMemoryUsage().removeUsageListener(this); synchronized (pending) { @@ -198,7 +199,7 @@ public class DurableTopicSubscription ex } prefetchExtension = 0; } - + protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { MessageDispatch md = super.createMessageDispatch(node, message); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Oct 21 10:58:51 2010 @@ -502,7 +502,7 @@ public class Topic extends BaseDestinati if (topicStore != null && node.isPersistent()) { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); - topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId()); + topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), ack); } messageConsumed(context, node); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Oct 21 10:58:51 2010 @@ -248,6 +248,10 @@ public abstract class AbstractStoreCurso protected final synchronized void fillBatch() { + if (LOG.isTraceEnabled()) { + LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded + + ", hasMessages=" + this.storeHasMessages + ", size=" + this.size); + } if (batchResetNeeded) { resetBatch(); this.batchResetNeeded = false; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java Thu Oct 21 10:58:51 2010 @@ -58,7 +58,12 @@ public class MessageAck extends BaseComm * The ack case where a client wants only an individual message to be discarded. */ public static final byte INDIVIDUAL_ACK_TYPE = 4; - + +/** + * The ack case where a durable topic subscription does not match a selector. + */ + public static final byte UNMATCHED_ACK_TYPE = 5; + protected byte ackType; protected ConsumerId consumerId; protected MessageId firstMessageId; @@ -118,6 +123,10 @@ public class MessageAck extends BaseComm return ackType == INDIVIDUAL_ACK_TYPE; } + public boolean isUnmatchedAck() { + return ackType == UNMATCHED_ACK_TYPE; + } + /** * @openwire:property version=1 cache=true */ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -74,8 +74,8 @@ public class ProxyTopicMessageStore impl } public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId) throws IOException { - delegate.acknowledge(context, clientId, subscriptionName, messageId); + MessageId messageId, MessageAck ack) throws IOException { + delegate.acknowledge(context, clientId, subscriptionName, messageId, ack); } public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -19,6 +19,7 @@ package org.apache.activemq.store; import java.io.IOException; import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; @@ -39,7 +40,7 @@ public interface TopicMessageStore exten * @param subscriptionPersistentId * @throws IOException */ - void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; + void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException; /** * @param clientId Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -79,7 +79,8 @@ public class AMQTopicMessageStore extend /** */ - public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException { + public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, + final MessageId messageId, final MessageAck originalAck) throws IOException { final boolean debug = LOG.isDebugEnabled(); JournalTopicAck ack = new JournalTopicAck(); ack.setDestination(destination); @@ -140,7 +141,7 @@ public class AMQTopicMessageStore extend try { SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName); if (sub != null) { - topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId); + topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null); return true; } } catch (Throwable e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Oct 21 10:58:51 2010 @@ -83,9 +83,9 @@ public interface JDBCAdapter { int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; - void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; + void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception; - long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; + long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Oct 21 10:58:51 2010 @@ -44,7 +44,8 @@ public class JDBCMessageStore extends Ab protected final WireFormat wireFormat; protected final JDBCAdapter adapter; protected final JDBCPersistenceAdapter persistenceAdapter; - protected AtomicLong lastStoreSequenceId = new AtomicLong(-1); + protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); + protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); protected ActiveMQMessageAudit audit; @@ -144,7 +145,7 @@ public class JDBCMessageStore extends Ab public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId()); + long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0]; // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -225,14 +226,15 @@ public class JDBCMessageStore extends Ab public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); try { - adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() { + adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { if (listener.hasSpace()) { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); listener.recoverMessage(msg); - lastStoreSequenceId.set(sequenceId); + lastRecoveredSequenceId.set(sequenceId); + lastRecoveredPriority.set(msg.getPriority()); return true; } return false; @@ -259,32 +261,35 @@ public class JDBCMessageStore extends Ab * @see org.apache.activemq.store.MessageStore#resetBatching() */ public void resetBatching() { - if (LOG.isDebugEnabled()) { - LOG.debug(destination.getPhysicalName() + " resetBatch, existing last seqId: " + lastStoreSequenceId.get()); + if (LOG.isTraceEnabled()) { + LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); } - lastStoreSequenceId.set(-1); + lastRecoveredSequenceId.set(-1); + lastRecoveredPriority.set(Byte.MAX_VALUE - 1); } @Override public void setBatch(MessageId messageId) { - long storeSequenceId = -1; try { - storeSequenceId = getStoreSequenceIdForMessageId(messageId); + long[] storedValues = getStoreSequenceIdForMessageId(messageId); + lastRecoveredSequenceId.set(storedValues[0]); + lastRecoveredPriority.set(storedValues[1]); } catch (IOException ignoredAsAlreadyLogged) { - // reset batch in effect with default -1 value + lastRecoveredSequenceId.set(-1); + lastRecoveredPriority.set(Byte.MAX_VALUE -1); } - if (LOG.isDebugEnabled()) { - LOG.debug(destination.getPhysicalName() + " setBatch: new sequenceId: " + storeSequenceId + ",existing last seqId: " + lastStoreSequenceId.get()); + if (LOG.isTraceEnabled()) { + LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() + + ", priority: " + lastRecoveredPriority.get()); } - lastStoreSequenceId.set(storeSequenceId); } - private long getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { - long result = -1; + private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException { + long[] result = new long[]{-1, Byte.MAX_VALUE -1}; TransactionContext c = persistenceAdapter.getTransactionContext(); try { - result = adapter.getStoreSequenceId(c, destination, messageId)[0]; + result = adapter.getStoreSequenceId(c, destination, messageId); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Oct 21 10:58:51 2010 @@ -527,6 +527,7 @@ public class JDBCPersistenceAdapter exte getAdapter().doDropTables(c); getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); getAdapter().doCreateTables(c); + LOG.info("Persistence store purged."); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create(e); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -26,6 +26,7 @@ import org.apache.activemq.ActiveMQMessa import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; @@ -33,12 +34,15 @@ import org.apache.activemq.store.TopicMe import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.6 $ */ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore { + private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class); private Map subscriberLastMessageMap = new ConcurrentHashMap(); private Map subscriberLastPriorityMap = new ConcurrentHashMap(); @@ -46,12 +50,21 @@ public class JDBCTopicMessageStore exten super(persistenceAdapter, adapter, wireFormat, topic, audit); } - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { + if (ack != null && ack.isUnmatchedAck()) { + if (LOG.isTraceEnabled()) { + LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); + } + return; + } // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); try { long[] res = adapter.getStoreSequenceId(c, destination, messageId); adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]); + if (LOG.isTraceEnabled()) { + LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]); + } } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e); @@ -93,12 +106,15 @@ public class JDBCTopicMessageStore exten AtomicLong last = subscriberLastMessageMap.get(subcriberId); AtomicLong priority = subscriberLastPriorityMap.get(subcriberId); if (last == null) { - long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName); - last = new AtomicLong(lastAcked); + long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName); + last = new AtomicLong(lastAcked[0]); subscriberLastMessageMap.put(subcriberId, last); - priority = new AtomicLong(Byte.MAX_VALUE - 1); + priority = new AtomicLong(lastAcked[1]); subscriberLastMessageMap.put(subcriberId, priority); } + if (LOG.isTraceEnabled()) { + LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority); + } final AtomicLong finalLast = last; final AtomicLong finalPriority = priority; try { @@ -137,10 +153,6 @@ public class JDBCTopicMessageStore exten subscriberLastPriorityMap.remove(subcriberId); } - /** - * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo, - * boolean) - */ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { TransactionContext c = persistenceAdapter.getTransactionContext(); try { @@ -207,6 +219,9 @@ public class JDBCTopicMessageStore exten } finally { c.close(); } + if (LOG.isTraceEnabled()) { + LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); + } return result; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Oct 21 10:58:51 2010 @@ -307,7 +307,7 @@ public class Statements { + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" - + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; + + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <= D.PRIORITY"; } return durableSubscriberMessageCountStatement; } @@ -336,11 +336,17 @@ public class Statements { public String getDeleteOldMessagesStatement() { if (deleteOldMessagesStatement == null) { deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() - + " WHERE ( EXPIRATION<>0 AND EXPIRATION0 AND EXPIRATION= " + + " ( SELECT min(" + getFullAckTableName() + ".PRIORITY) " + + " FROM " + getFullAckTableName() + " WHERE " + + getFullAckTableName() + ".CONTAINER=" + + getFullMessageTableName() + ".CONTAINER ))"; } return deleteOldMessagesStatement; } @@ -391,7 +397,9 @@ public class Statements { public String getFindNextMessagesByPriorityStatement() { if (findNextMessagesByPriorityStatement == null) { findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() - + " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID"; + + " WHERE CONTAINER=?" + + " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)" + + " ORDER BY PRIORITY DESC, ID"; } return findNextMessagesByPriorityStatement; } @@ -401,9 +409,11 @@ public class Statements { */ public String getLastAckedDurableSubscriberMessageStatement() { if (lastAckedDurableSubscriberMessageStatement == null) { - lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID), PRIORITY FROM " + getFullAckTableName() - + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; + + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" + + " GROUP BY PRIORITY" + + " ORDER BY PRIORITY ASC"; } return lastAckedDurableSubscriberMessageStatement; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Oct 21 10:58:51 2010 @@ -508,8 +508,6 @@ public class DefaultJDBCAdapter implemen * @param retroactive * @throws SQLException * @throws IOException - * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, - * org.apache.activemq.service.SubscriptionInfo) */ public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive) throws SQLException, IOException { @@ -644,11 +642,11 @@ public class DefaultJDBCAdapter implemen } } - public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, + public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; - long result = -1; + long[] result = new long[]{-1, Byte.MAX_VALUE - 1}; try { s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); s.setString(1, destination.getQualifiedName()); @@ -656,7 +654,8 @@ public class DefaultJDBCAdapter implemen s.setString(3, subscriberName); rs = s.executeQuery(); if (rs.next()) { - result = rs.getLong(1); + result[0] = rs.getLong(1); + result[1] = rs.getLong(2); } rs.close(); s.close(); @@ -784,7 +783,7 @@ public class DefaultJDBCAdapter implemen } public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, - int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { + long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; try { @@ -795,8 +794,10 @@ public class DefaultJDBCAdapter implemen } s.setMaxRows(maxReturned * 2); s.setString(1, destination.getQualifiedName()); - if (!isPrioritizedMessages()) { - s.setLong(2, nextSeq); + s.setLong(2, nextSeq); + if (isPrioritizedMessages()) { + s.setLong(3, priority); + s.setLong(4, priority); } rs = s.executeQuery(); int count = 0; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -25,6 +25,7 @@ import org.apache.activemq.broker.Connec import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; @@ -82,7 +83,7 @@ public class JournalTopicMessageStore ex /** */ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - final MessageId messageId) throws IOException { + final MessageId messageId, MessageAck originalAck) throws IOException { final boolean debug = LOG.isDebugEnabled(); JournalTopicAck ack = new JournalTopicAck(); @@ -138,7 +139,7 @@ public class JournalTopicMessageStore ex try { SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName); if (sub != null) { - longTermStore.acknowledge(context, clientId, subscritionName, messageId); + longTermStore.acknowledge(context, clientId, subscritionName, messageId, null); } } catch (Throwable e) { LOG.debug("Could not replay acknowledge for message '" + messageId @@ -177,7 +178,7 @@ public class JournalTopicMessageStore ex SubscriptionKey subscriptionKey = iterator.next(); MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, - subscriptionKey.subscriptionName, identity); + subscriptionKey.subscriptionName, identity, null); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHa import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.ListContainer; @@ -79,7 +80,7 @@ public class KahaTopicMessageStore exten } public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId) throws IOException { + MessageId messageId, MessageAck ack) throws IOException { String subcriberId = getSubscriptionKey(clientId, subscriptionName); TopicSubContainer container = subscriberMessages.get(subcriberId); if (container != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Oct 21 10:58:51 2010 @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHa import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.ListContainer; @@ -224,7 +225,7 @@ public class KahaTopicReferenceStore ext } public void acknowledge(ConnectionContext context, - String clientId, String subscriptionName, MessageId messageId) throws IOException { + String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { acknowledgeReference(context, clientId, subscriptionName, messageId); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Oct 21 10:58:51 2010 @@ -611,7 +611,8 @@ public class KahaDBStore extends Message } } - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, + MessageId messageId, MessageAck ack) throws IOException { String subscriptionKey = subscriptionKey(clientId, subscriptionName); if (isConcurrentStoreAndDispatchTopics()) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Thu Oct 21 10:58:51 2010 @@ -277,7 +277,8 @@ public class TempKahaDBStore extends Tem super(destination); } - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, + MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Thu Oct 21 10:58:51 2010 @@ -25,6 +25,7 @@ import java.util.Map.Entry; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; @@ -66,7 +67,8 @@ public class MemoryTopicMessageStore ext } } - public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException { + public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, + MessageId messageId, MessageAck ack) throws IOException { SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); MemoryTopicSub sub = topicSubMap.get(key); if (sub != null) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java Thu Oct 21 10:58:51 2010 @@ -74,7 +74,7 @@ public abstract class TestSupport extend * Returns the name of the destination used in this test case */ protected String getDestinationString() { - return getClass().getName() + "." + getName(); + return getClass().getName() + "." + getName(true); } /** Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Thu Oct 21 10:58:51 2010 @@ -47,14 +47,15 @@ abstract public class MessagePriorityTes ActiveMQConnectionFactory factory; Connection conn; - Session sess; + protected Session sess; public boolean useCache; + public boolean dispatchAsync = false; public int prefetchVal = 500; - int MSG_NUM = 1000; - int HIGH_PRI = 7; - int LOW_PRI = 3; + public int MSG_NUM = 600; + public int HIGH_PRI = 7; + public int LOW_PRI = 3; abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; @@ -78,6 +79,7 @@ abstract public class MessagePriorityTes prefetch.setAll(prefetchVal); factory.setPrefetchPolicy(prefetch); factory.setWatchTopicAdvisories(false); + factory.setDispatchAsync(dispatchAsync); conn = factory.createConnection(); conn.setClientID("priority"); conn.start(); @@ -110,7 +112,7 @@ abstract public class MessagePriorityTes } - class ProducerThread extends Thread { + protected class ProducerThread extends Thread { int priority; int messageCount; @@ -154,7 +156,8 @@ abstract public class MessagePriorityTes MessageConsumer queueConsumer = sess.createConsumer(queue); for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = queueConsumer.receive(1000); + Message msg = queueConsumer.receive(5000); + LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null)); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); } @@ -196,6 +199,7 @@ abstract public class MessagePriorityTes public void initCombosForTestDurableSubsReconnect() { addCombinationValues("prefetchVal", new Object[] {new Integer(1000), new Integer(MSG_NUM/2)}); + addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE, Boolean.FALSE}); } public void testDurableSubsReconnect() throws Exception { @@ -217,7 +221,8 @@ abstract public class MessagePriorityTes final int closeFrequency = MSG_NUM/4; sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 2; i++) { - Message msg = sub.receive(5000); + Message msg = sub.receive(30000); + LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null)); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority()); if (i>0 && i%closeFrequency==0) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Thu Oct 21 10:58:51 2010 @@ -17,14 +17,20 @@ package org.apache.activemq.store.jdbc; +import javax.jms.Message; +import javax.jms.TopicSubscriber; import junit.framework.Test; - +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.MessagePriorityTest; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.derby.jdbc.EmbeddedDataSource; public class JDBCMessagePriorityTest extends MessagePriorityTest { + private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class); + @Override protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); @@ -33,16 +39,56 @@ public class JDBCMessagePriorityTest ext dataSource.setCreateDatabase("create"); jdbc.setDataSource(dataSource); jdbc.deleteAllMessages(); + jdbc.setCleanupPeriod(1000); return jdbc; } - + + // this cannot be a general test as kahaDB just has support for 3 priority levels + public void testDurableSubsReconnectWithFourLevels() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); + final String subName = "priorityDisconnect"; + TopicSubscriber sub = sess.createDurableSubscriber(topic, subName); + sub.close(); + + final int MED_PRI = LOW_PRI + 1; + final int MED_HIGH_PRI = HIGH_PRI - 1; + + ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI); + ProducerThread medPri = new ProducerThread(topic, MSG_NUM, MED_PRI); + ProducerThread medHighPri = new ProducerThread(topic, MSG_NUM, MED_HIGH_PRI); + ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI); + + lowPri.start(); + highPri.start(); + medPri.start(); + medHighPri.start(); + + lowPri.join(); + highPri.join(); + medPri.join(); + medHighPri.join(); + + + final int closeFrequency = MSG_NUM; + final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI, LOW_PRI}; + sub = sess.createDurableSubscriber(topic, subName); + for (int i = 0; i < MSG_NUM * 4; i++) { + Message msg = sub.receive(30000); + LOG.debug("received i=" + i + ", m=" + (msg!=null? msg.getJMSMessageID() : null)); + assertNotNull("Message " + i + " was null", msg); + assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority()); + if (i > 0 && i % closeFrequency == 0) { + LOG.info("Closing durable sub.. on: " + i); + sub.close(); + sub = sess.createDurableSubscriber(topic, subName); + } + } + LOG.info("closing on done!"); + sub.close(); + } + public static Test suite() { return suite(JDBCMessagePriorityTest.class); } - // pending fix... - @Override - public void testDurableSubsReconnect() throws Exception { - // TODO: fix jdbc durable sub recovery - } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Oct 21 10:58:51 2010 @@ -16,9 +16,12 @@ */ package org.apache.activemq.usecases; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -27,11 +30,12 @@ import java.io.File; public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { + public Boolean usePrioritySupport = Boolean.TRUE; private BrokerService broker; private ActiveMQTopic topic; protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://" + getName()); + return new ActiveMQConnectionFactory("vm://" + getName(true)); } @Override @@ -42,6 +46,10 @@ public class DurableSubscriptionOfflineT return con; } + public static Test suite() { + return suite(DurableSubscriptionOfflineTest.class); + } + protected void setUp() throws Exception { topic = (ActiveMQTopic) createDestination(); createBroker(); @@ -54,15 +62,19 @@ public class DurableSubscriptionOfflineT } private void createBroker() throws Exception { - broker = BrokerFactory.createBroker("broker:(vm://localhost)"); - broker.setBrokerName(getName()); + broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); + broker.setBrokerName(getName(true)); broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); - persistenceAdapter.setDirectory(new File("activemq-data-kaha/" + getName())); - broker.setPersistenceAdapter(persistenceAdapter); - + if (usePrioritySupport) { + PolicyEntry policy = new PolicyEntry(); + policy.setPrioritizedMessages(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + } + + setDefaultPersistenceAdapter(broker); broker.start(); } @@ -71,6 +83,13 @@ public class DurableSubscriptionOfflineT broker.stop(); } + public void initCombosForTestOfflineSubscription() throws Exception { + this.addCombinationValues("defaultPersistenceAdapter", + new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); + this.addCombinationValues("usePrioritySupport", + new Object[]{ Boolean.TRUE, Boolean.FALSE}); + } + public void testOfflineSubscription() throws Exception { // create durable subscription Connection con = createConnection(); Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1025939&r1=1025938&r2=1025939&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original) +++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Oct 21 10:58:51 2010 @@ -21,7 +21,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.broker.scheduler=DEBUG -#log4j.logger.org.apache.activemq=DEBUG +#log4j.logger.org.apache.activemq=TRACE #log4j.logger.org.apache.activemq.store.jdbc=DEBUG #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG #log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG