From commits-return-7118-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Oct 01 20:03:35 2007 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 47309 invoked from network); 1 Oct 2007 20:03:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Oct 2007 20:03:34 -0000 Received: (qmail 15257 invoked by uid 500); 1 Oct 2007 20:03:24 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 15230 invoked by uid 500); 1 Oct 2007 20:03:24 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 15216 invoked by uid 99); 1 Oct 2007 20:03:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2007 13:03:23 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2007 20:03:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 805F71A9832; Mon, 1 Oct 2007 13:02:33 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r581053 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/kahada... Date: Mon, 01 Oct 2007 20:02:24 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071001200233.805F71A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Oct 1 13:02:18 2007 New Revision: 581053 URL: http://svn.apache.org/viewvc?rev=581053&view=rev Log: Fix for AMQ-1095: - Added contributed test cases - We now filter out non-matching messages as they are loaded into the TopicStorePrefetch - Changed the TopicStorePrefetch and StoreDurableSubscriberCursor so that they don't depend on the pending message counter since some stores cannot give an accurate count for it. Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java (with props) activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Oct 1 13:02:18 2007 @@ -48,7 +48,8 @@ public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { - super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize())); + super(broker, context, info); + this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this); this.usageManager = usageManager; this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct 1 13:02:18 2007 @@ -410,6 +410,7 @@ if (message == null) { return false; } + // Make sure we can dispatch a message. if (canDispatch(node) && !isSlave()) { MessageDispatch md = createMessageDispatch(node, message); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct 1 13:02:18 2007 @@ -26,6 +26,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.kaha.Store; @@ -42,7 +43,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); - private int pendingCount; private String clientId; private String subscriberName; private Map topics = new HashMap(); @@ -50,6 +50,7 @@ private boolean started; private PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; + private final Subscription subscription; /** * @param topic @@ -57,9 +58,10 @@ * @param subscriberName * @throws IOException */ - public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize) { + public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) { this.clientId = clientId; this.subscriberName = subscriberName; + this.subscription = subscription; this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store); storePrefetches.add(nonPersistent); } @@ -69,7 +71,6 @@ started = true; for (PendingMessageCursor tsp : storePrefetches) { tsp.start(); - pendingCount += tsp.size(); } } } @@ -80,8 +81,6 @@ for (PendingMessageCursor tsp : storePrefetches) { tsp.stop(); } - - pendingCount = 0; } } @@ -94,14 +93,13 @@ */ public synchronized void add(ConnectionContext context, Destination destination) throws Exception { if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { - TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName); + TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription); tsp.setMaxBatchSize(getMaxBatchSize()); tsp.setSystemUsage(systemUsage); topics.put(destination, tsp); storePrefetches.add(tsp); if (started) { tsp.start(); - pendingCount += tsp.size(); } } } @@ -124,14 +122,18 @@ * @return true if there are no pending messages */ public synchronized boolean isEmpty() { - return pendingCount <= 0; + for (PendingMessageCursor tsp : storePrefetches) { + if( !tsp.isEmpty() ) + return false; + } + return true; } public boolean isEmpty(Destination destination) { boolean result = true; TopicStorePrefetch tsp = topics.get(destination); if (tsp != null) { - result = tsp.size() <= 0; + result = tsp.isEmpty(); } return result; } @@ -151,7 +153,6 @@ if (node != null) { Message msg = node.getMessage(); if (started) { - pendingCount++; if (!msg.isPersistent()) { nonPersistent.addMessageLast(node); } @@ -171,7 +172,6 @@ } public synchronized void clear() { - pendingCount = 0; nonPersistent.clear(); for (PendingMessageCursor tsp : storePrefetches) { tsp.clear(); @@ -179,7 +179,7 @@ } public synchronized boolean hasNext() { - boolean result = pendingCount > 0; + boolean result = true; if (result) { try { currentCursor = getNextCursor(); @@ -201,14 +201,12 @@ if (currentCursor != null) { currentCursor.remove(); } - pendingCount--; } public synchronized void remove(MessageReference node) { if (currentCursor != null) { currentCursor.remove(node); } - pendingCount--; } public synchronized void reset() { @@ -226,6 +224,10 @@ } public int size() { + int pendingCount=0; + for (PendingMessageCursor tsp : storePrefetches) { + pendingCount += tsp.size(); + } return pendingCount; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct 1 13:02:18 2007 @@ -17,12 +17,15 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedList; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; import org.apache.commons.logging.Log; @@ -44,16 +47,19 @@ private Destination regionDestination; private MessageId firstMessageId; private MessageId lastMessageId; - private int pendingCount; + private boolean batchResetNeeded = true; + private boolean storeMayHaveMoreMessages = true; private boolean started; + private final Subscription subscription; /** * @param topic * @param clientId * @param subscriberName */ - public TopicStorePrefetch(Topic topic, String clientId, String subscriberName) { + public TopicStorePrefetch(Topic topic, String clientId, String subscriberName, Subscription subscription) { this.regionDestination = topic; + this.subscription = subscription; this.store = (TopicMessageStore)topic.getMessageStore(); this.clientId = clientId; this.subscriberName = subscriberName; @@ -62,13 +68,7 @@ public synchronized void start() { if (!started) { started = true; - pendingCount = getStoreSize(); - try { - fillBatch(); - } catch (Exception e) { - LOG.error("Failed to fill batch", e); - throw new RuntimeException(e); - } + safeFillBatch(); } } @@ -84,11 +84,13 @@ * @return true if there are no pendingCount messages */ public synchronized boolean isEmpty() { - return pendingCount <= 0; + safeFillBatch(); + return batchList.isEmpty(); } public synchronized int size() { - return getPendingCount(); + safeFillBatch(); + return batchList.size(); } public synchronized void addMessageLast(MessageReference node) throws Exception { @@ -98,7 +100,7 @@ } lastMessageId = node.getMessageId(); node.decrementReferenceCount(); - pendingCount++; + storeMayHaveMoreMessages=true; } } @@ -108,20 +110,18 @@ firstMessageId = node.getMessageId(); } node.decrementReferenceCount(); - pendingCount++; + storeMayHaveMoreMessages=true; } } public synchronized void remove() { - pendingCount--; } public synchronized void remove(MessageReference node) { - pendingCount--; } public synchronized void clear() { - pendingCount = 0; + gc(); } public synchronized boolean hasNext() { @@ -130,27 +130,17 @@ public synchronized MessageReference next() { Message result = null; - if (!isEmpty()) { - if (batchList.isEmpty()) { - try { - fillBatch(); - } catch (final Exception e) { - LOG.error("Failed to fill batch", e); - throw new RuntimeException(e); - } - if (batchList.isEmpty()) { - return null; - } - } - if (!batchList.isEmpty()) { - result = batchList.removeFirst(); - if (lastMessageId != null) { - if (result.getMessageId().equals(lastMessageId)) { - // pendingCount=0; - } + safeFillBatch(); + if (batchList.isEmpty()) { + return null; + } else { + result = batchList.removeFirst(); + if (lastMessageId != null) { + if (result.getMessageId().equals(lastMessageId)) { + // pendingCount=0; } - result.setRegionDestination(regionDestination); } + result.setRegionDestination(regionDestination); } return result; } @@ -163,12 +153,16 @@ } public synchronized boolean recoverMessage(Message message) throws Exception { - message.setRegionDestination(regionDestination); - // only increment if count is zero (could have been cached) - if (message.getReferenceCount() == 0) { - message.incrementReferenceCount(); + MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); + messageEvaluationContext.setMessageReference(message); + if( subscription.matches(message, messageEvaluationContext) ) { + message.setRegionDestination(regionDestination); + // only increment if count is zero (could have been cached) + if (message.getReferenceCount() == 0) { + message.incrementReferenceCount(); + } + batchList.addLast(message); } - batchList.addLast(message); return true; } @@ -178,38 +172,43 @@ } // implementation + protected void safeFillBatch() { + try { + fillBatch(); + } catch (Exception e) { + LOG.error("Failed to fill batch", e); + throw new RuntimeException(e); + } + } + protected synchronized void fillBatch() throws Exception { - if (!isEmpty()) { + if( batchResetNeeded ) { + store.resetBatching(clientId, subscriberName); + batchResetNeeded=false; + storeMayHaveMoreMessages=true; + } + + while( batchList.isEmpty() && storeMayHaveMoreMessages ) { store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); - if (firstMessageId != null) { - int pos = 0; - for (Message msg : batchList) { - if (msg.getMessageId().equals(firstMessageId)) { - firstMessageId = null; - break; - } - pos++; - } - if (pos > 0) { - for (int i = 0; i < pos && !batchList.isEmpty(); i++) { - batchList.removeFirst(); - } - if (batchList.isEmpty()) { - LOG.debug("Refilling batch - haven't got past first message = " + firstMessageId); - fillBatch(); + if( batchList.isEmpty() ) { + storeMayHaveMoreMessages = false; + } else { + if (firstMessageId != null) { + int pos = 0; + for (Iterator iter = batchList.iterator(); iter.hasNext();) { + Message msg = iter.next(); + if (msg.getMessageId().equals(firstMessageId)) { + firstMessageId = null; + break; + } else { + iter.remove(); + } } } } } } - protected synchronized int getPendingCount() { - if (pendingCount <= 0) { - pendingCount = getStoreSize(); - } - return pendingCount; - } - protected synchronized int getStoreSize() { try { return store.getMessageCount(clientId, subscriberName); @@ -224,6 +223,7 @@ msg.decrementReferenceCount(); } batchList.clear(); + batchResetNeeded = true; } public String toString() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Mon Oct 1 13:02:18 2007 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -39,7 +40,7 @@ * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { return new FilePendingMessageCursor(name, tmpStorage); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java Mon Oct 1 13:02:18 2007 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -36,5 +37,5 @@ * @param maxBatchSize * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize); + PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Oct 1 13:02:18 2007 @@ -116,7 +116,7 @@ String subName = sub.getSubscriptionName(); int prefetch = sub.getPrefetchSize(); if (pendingDurableSubscriberPolicy != null) { - PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch); + PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub); cursor.setSystemUsage(memoryManager); sub.setPending(cursor); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Mon Oct 1 13:02:18 2007 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.kaha.Store; @@ -40,7 +41,7 @@ * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { - return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize); + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { + return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Mon Oct 1 13:02:18 2007 @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.kaha.Store; @@ -38,7 +39,7 @@ * @param maxBatchSize * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) { + public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) { return new VMPendingMessageCursor(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Oct 1 13:02:18 2007 @@ -73,11 +73,8 @@ } protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception { - if (listener.hasSpace()) { - listener.recoverMessage(msg); - return true; - } - return false; + listener.recoverMessage(msg); + return listener.hasSpace(); } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Oct 1 13:02:18 2007 @@ -64,11 +64,8 @@ protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record) throws Exception { - if (listener.hasSpace()) { - listener.recoverMessageReference(new MessageId(record.getMessageId())); - return true; - } - return false; + listener.recoverMessageReference(new MessageId(record.getMessageId())); + return listener.hasSpace(); } public synchronized void recover(MessageRecoveryListener listener) throws Exception { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Mon Oct 1 13:02:18 2007 @@ -284,7 +284,7 @@ // The we should get the messages. for (int i = 0; i < 4; i++) { Message m2 = receiveMessage(connection2); - assertNotNull(m2); + assertNotNull("Did not get message "+i, m2); } assertNoMessagesLeft(connection2); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=581053&r1=581052&r2=581053&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Mon Oct 1 13:02:18 2007 @@ -77,7 +77,8 @@ consumer = getConsumer(consumerConnection); List consumerList = new ArrayList(); for (int i = 0; i < MESSAGE_COUNT; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(1000*5); + assertNotNull("Message "+i+" was missing.", msg); consumerList.add(msg); } assertEquals(senderList, consumerList); Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java?rev=581053&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java Mon Oct 1 13:02:18 2007 @@ -0,0 +1,163 @@ +/* ==================================================================== + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +==================================================================== */ + +package org.apache.activemq.bugs.amq1095; + +import java.io.File; +import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; + +/** + *

+ * Common functionality for ActiveMQ test cases. + *

+ * + * @author Rainer Klute <rainer.klute@dp-itsolutions.de> + * @since 2007-08-10 + * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $ + */ +public class ActiveMQTestCase extends TestCase +{ + private Context context; + private BrokerService broker; + protected Connection connection; + protected Destination destination; + private List consumersToEmpty = new LinkedList(); + protected final long RECEIVE_TIMEOUT = 500; + + + /**

Constructor

*/ + public ActiveMQTestCase() + {} + + /**

Constructor

+ * @param name the test case's name + */ + public ActiveMQTestCase(final String name) + { + super(name); + } + + /** + *

Sets up the JUnit testing environment. + */ + protected void setUp() + { + URI uri; + try + { + /* Copy all system properties starting with "java.naming." to the initial context. */ + final Properties systemProperties = System.getProperties(); + final Properties jndiProperties = new Properties(); + for (final Iterator i = systemProperties.keySet().iterator(); i.hasNext();) + { + final String key = (String) i.next(); + if (key.startsWith("java.naming.") || key.startsWith("topic.") || + key.startsWith("queue.")) + { + final String value = (String) systemProperties.get(key); + jndiProperties.put(key, value); + } + } + context = new InitialContext(jndiProperties); + uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml"); + broker = BrokerFactory.createBroker(uri); + broker.start(); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + + final ConnectionFactory connectionFactory; + try + { + /* Lookup the connection factory. */ + connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory"); + + destination = new ActiveMQTopic("TestTopic"); + + /* Create a connection: */ + connection = connectionFactory.createConnection(); + connection.setClientID("sampleClientID"); + } + catch (JMSException ex1) + { + ex1.printStackTrace(); + Assert.fail(ex1.toString()); + } + catch (NamingException ex2) { + ex2.printStackTrace(); + Assert.fail(ex2.toString()); + } + catch (Throwable ex3) { + ex3.printStackTrace(); + Assert.fail(ex3.toString()); + } + } + + + /** + *

+ * Tear down the testing environment by receiving any messages that might be + * left in the topic after a failure and shutting down the broker properly. + * This is quite important for subsequent test cases that assume the topic + * to be empty. + *

+ */ + protected void tearDown() throws Exception { + TextMessage msg; + for (final Iterator i = consumersToEmpty.iterator(); i.hasNext();) + { + final MessageConsumer consumer = (MessageConsumer) i.next(); + if (consumer != null) + do + msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + while (msg != null); + } + if (connection != null) { + connection.stop(); + } + broker.stop(); + } + + protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer) + { + consumersToEmpty.add(consumer); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java?rev=581053&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java Mon Oct 1 13:02:18 2007 @@ -0,0 +1,230 @@ +/* ==================================================================== + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +==================================================================== */ + +package org.apache.activemq.bugs.amq1095; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.Assert; + + +/** + *

+ * Test cases for various ActiveMQ functionalities. + *

+ * + *
    + *
  • + *

    + * Durable subscriptions are used. + *

    + *
  • + *
  • + *

    + * The Kaha persistence manager is used. + *

    + *
  • + *
  • + *

    + * An already existing Kaha directory is used. Everything runs fine if the + * ActiveMQ broker creates a new Kaha directory. + *

    + *
  • + *
+ * + * @author Rainer Klute <rainer.klute@dp-itsolutions.de> + * @since 2007-08-09 + * @version $Id: MessageSelectorTest.java 12 2007-08-14 12:02:02Z rke $ + */ +public class MessageSelectorTest extends ActiveMQTestCase { + + private MessageConsumer consumer1; + private MessageConsumer consumer2; + + /**

Constructor

*/ + public MessageSelectorTest() + {} + + /**

Constructor

+ * @param name the test case's name + */ + public MessageSelectorTest(final String name) + { + super(name); + } + + /** + *

+ * Tests whether message selectors work for durable subscribers. + *

+ */ + public void testMessageSelectorForDurableSubscribersRunA() + { + runMessageSelectorTest(true); + } + + /** + *

+ * Tests whether message selectors work for durable subscribers. + *

+ */ + public void testMessageSelectorForDurableSubscribersRunB() + { + runMessageSelectorTest(true); + } + + /** + *

+ * Tests whether message selectors work for non-durable subscribers. + *

+ */ + public void testMessageSelectorForNonDurableSubscribers() + { + runMessageSelectorTest(false); + } + + /** + *

+ * Tests whether message selectors work. This is done by sending two + * messages to a topic. Both have an int property with different values. Two + * subscribers use message selectors to receive the messages. Each one + * should receive exactly one of the messages. + *

+ */ + private void runMessageSelectorTest(final boolean isDurableSubscriber) + { + try + { + final String PROPERTY_CONSUMER = "consumer"; + final String CONSUMER_1 = "Consumer 1"; + final String CONSUMER_2 = "Consumer 2"; + final String MESSAGE_1 = "Message to " + CONSUMER_1; + final String MESSAGE_2 = "Message to " + CONSUMER_2; + + assertNotNull(connection); + assertNotNull(destination); + + final Session producingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producingSession.createProducer(destination); + + final Session consumingSession1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session consumingSession2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (isDurableSubscriber) + { + consumer1 = consumingSession1.createDurableSubscriber + ((Topic) destination, CONSUMER_1, PROPERTY_CONSUMER + " = 1", false); + consumer2 = consumingSession2.createDurableSubscriber + ((Topic) destination, CONSUMER_2, PROPERTY_CONSUMER + " = 2", false); + } + else + { + consumer1 = consumingSession1.createConsumer(destination, PROPERTY_CONSUMER + " = 1"); + consumer2 = consumingSession2.createConsumer(destination, PROPERTY_CONSUMER + " = 2"); + } + registerToBeEmptiedOnShutdown(consumer1); + registerToBeEmptiedOnShutdown(consumer2); + + connection.start(); + + TextMessage msg1; + TextMessage msg2; + int propertyValue; + String contents; + + /* Try to receive any messages from the consumers. There shouldn't be any yet. */ + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + if (msg1 != null) + { + final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run."); + propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER); + contents = msg1.getText(); + if (propertyValue != 1) // Is the property value as expected? + { + msg.append(" That message does not match the consumer's message selector."); + fail(msg.toString()); + } + assertEquals(1, propertyValue); + assertEquals(MESSAGE_1, contents); + } + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + if (msg2 != null) + { + final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run."); + propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER); + contents = msg2.getText(); + if (propertyValue != 2) // Is the property value as expected? + { + msg.append(" That message does not match the consumer's message selector."); + fail(msg.toString()); + } + assertEquals(2, propertyValue); + assertEquals(MESSAGE_2, contents); + } + + /* Send two messages. Each is targeted at one of the consumers. */ + TextMessage msg; + msg = producingSession.createTextMessage(); + msg.setText(MESSAGE_1); + msg.setIntProperty(PROPERTY_CONSUMER, 1); + producer.send(msg); + + msg = producingSession.createTextMessage(); + msg.setText(MESSAGE_2); + msg.setIntProperty(PROPERTY_CONSUMER, 2); + producer.send(msg); + + /* Receive the messages that have just been sent. */ + + /* Use consumer 1 to receive one of the messages. The receive() + * method is called twice to make sure there is nothing else in + * stock for this consumer. */ + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + assertNotNull(msg1); + propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER); + contents = msg1.getText(); + assertEquals(1, propertyValue); + assertEquals(MESSAGE_1, contents); + msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT); + assertNull(msg1); + + /* Use consumer 2 to receive the other message. The receive() + * method is called twice to make sure there is nothing else in + * stock for this consumer. */ + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + assertNotNull(msg2); + propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER); + contents = msg2.getText(); + assertEquals(2, propertyValue); + assertEquals(MESSAGE_2, contents); + msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT); + assertNull(msg2); + } + catch (JMSException ex) + { + ex.printStackTrace(); + Assert.fail(); + } + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml?rev=581053&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml (added) +++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml Mon Oct 1 13:02:18 2007 @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml