From commits-return-15624-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Feb 22 18:52:24 2011 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 13558 invoked from network); 22 Feb 2011 18:52:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Feb 2011 18:52:24 -0000 Received: (qmail 295 invoked by uid 500); 22 Feb 2011 18:52:24 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 230 invoked by uid 500); 22 Feb 2011 18:52:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 223 invoked by uid 99); 22 Feb 2011 18:52:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Feb 2011 18:52:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Feb 2011 18:52:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 12D81238896F; Tue, 22 Feb 2011 18:51:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1073453 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ test/java/org/apache/activemq/usecases/ Date: Tue, 22 Feb 2011 18:51:54 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110222185155.12D81238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Tue Feb 22 18:51:54 2011 New Revision: 1073453 URL: http://svn.apache.org/viewvc?rev=1073453&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3188 - Full table scan for durable subs in jdbc store when priority enabled; very slow with large message backlog added more state to the topic message store such that it can ask the db for a single priority at a time which is indexed. This avoids a full table scan. send rate with active durable subs vs inactive durable subs is now in the region of 6x from 40x. validation test included. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Feb 22 18:51:54 2011 @@ -131,7 +131,7 @@ public class DurableTopicSubscription ex topic.activate(context, this); } } - synchronized (pending) { + synchronized (pendingLock) { pending.setSystemUsage(memoryManager); pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); pending.setMaxAuditDepth(getMaxAuditDepth()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Feb 22 18:51:54 2011 @@ -64,7 +64,7 @@ public abstract class PrefetchSubscripti private int maxProducersToAudit=32; private int maxAuditDepth=2048; protected final SystemUsage usageManager; - private final Object pendingLock = new Object(); + protected final Object pendingLock = new Object(); private final Object dispatchLock = new Object(); private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Feb 22 18:51:54 2011 @@ -45,7 +45,7 @@ public abstract class AbstractPendingMes protected boolean enableAudit=true; protected ActiveMQMessageAudit audit; protected boolean useCache=true; - protected boolean cacheEnabled=true; + private boolean cacheEnabled=true; private boolean started=false; protected MessageReference last = null; protected final boolean prioritizedMessages; @@ -329,7 +329,11 @@ public abstract class AbstractPendingMes } - public boolean isCacheEnabled() { + public synchronized boolean isCacheEnabled() { return cacheEnabled; } + + public synchronized void setCacheEnabled(boolean val) { + cacheEnabled = val; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Feb 22 18:51:54 2011 @@ -45,7 +45,7 @@ public abstract class AbstractStoreCurso this.regionDestination=destination; if (this.prioritizedMessages) { this.batchList= new PrioritizedPendingList(); - }else { + } else { this.batchList = new OrderedPendingList(); } } @@ -58,7 +58,7 @@ public abstract class AbstractStoreCurso resetBatch(); this.size = getStoreSize(); this.storeHasMessages=this.size > 0; - cacheEnabled = !this.storeHasMessages&&useCache; + setCacheEnabled(!this.storeHasMessages&&useCache); } } @@ -95,8 +95,7 @@ public abstract class AbstractStoreCurso * it will be a duplicate - but should be ignored */ if (LOG.isTraceEnabled()) { - LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() - + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); + LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); } } return recovered; @@ -108,7 +107,7 @@ public abstract class AbstractStoreCurso try { fillBatch(); } catch (Exception e) { - LOG.error("Failed to fill batch", e); + LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } } @@ -145,7 +144,7 @@ public abstract class AbstractStoreCurso try { fillBatch(); } catch (Exception e) { - LOG.error("Failed to fill batch", e); + LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } } @@ -169,24 +168,22 @@ public abstract class AbstractStoreCurso public final synchronized void addMessageLast(MessageReference node) throws Exception { if (hasSpace()) { - if (!cacheEnabled && size==0 && isStarted() && useCache) { + if (!isCacheEnabled() && size==0 && isStarted() && useCache) { if (LOG.isTraceEnabled()) { - LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() - + " enabling cache for empty store " + node.getMessageId()); + LOG.trace(this + " - enabling cache for empty store " + node.getMessageId()); } - cacheEnabled=true; + setCacheEnabled(true); } - if (cacheEnabled) { + if (isCacheEnabled()) { recoverMessage(node.getMessage(),true); lastCachedId = node.getMessageId(); } - } else if (cacheEnabled) { - cacheEnabled=false; + } else if (isCacheEnabled()) { + setCacheEnabled(false); // sync with store on disabling the cache if (lastCachedId != null) { if (LOG.isTraceEnabled()) { - LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() - + " disabling cache on size:" + size + LOG.trace(this + " - disabling cache" + ", lastCachedId: " + lastCachedId + " current node Id: " + node.getMessageId()); } @@ -203,7 +200,7 @@ public abstract class AbstractStoreCurso public final synchronized void addMessageFirst(MessageReference node) throws Exception { - cacheEnabled=false; + setCacheEnabled(false); size++; } @@ -221,7 +218,7 @@ public abstract class AbstractStoreCurso public final synchronized void remove(MessageReference node) { size--; - cacheEnabled=false; + setCacheEnabled(false); batchList.remove(node); } @@ -240,7 +237,7 @@ public abstract class AbstractStoreCurso batchList.clear(); clearIterator(false); batchResetNeeded = true; - this.cacheEnabled=false; + setCacheEnabled(false); } @Override @@ -251,8 +248,7 @@ public abstract class AbstractStoreCurso protected final synchronized void fillBatch() { if (LOG.isTraceEnabled()) { - LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded - + ", hasMessages=" + this.storeHasMessages + ", size=" + this.size + ", cacheEnabled=" + this.cacheEnabled); + LOG.trace(this + " - fillBatch"); } if (batchResetNeeded) { resetBatch(); @@ -263,7 +259,7 @@ public abstract class AbstractStoreCurso try { doFillBatch(); } catch (Exception e) { - LOG.error("Failed to fill batch", e); + LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } if (!this.batchList.isEmpty() || !hadSpace) { @@ -290,7 +286,11 @@ public abstract class AbstractStoreCurso } return size; } - + + public String toString() { + return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded + + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled(); + } protected abstract void doFillBatch() throws Exception; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Feb 22 18:51:54 2011 @@ -199,7 +199,7 @@ public class FilePendingMessageCursor ex if (hasSpace() || this.store == null) { memoryList.add(node); node.incrementReferenceCount(); - cacheEnabled = true; + setCacheEnabled(true); return true; } } @@ -247,7 +247,7 @@ public class FilePendingMessageCursor ex if (hasSpace()) { memoryList.addFirst(node); node.incrementReferenceCount(); - cacheEnabled = true; + setCacheEnabled(true); return; } } @@ -428,7 +428,7 @@ public class FilePendingMessageCursor ex } memoryList.clear(); - cacheEnabled = false; + setCacheEnabled(false); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Feb 22 18:51:54 2011 @@ -187,15 +187,15 @@ public class StoreDurableSubscriberCurso Destination dest = msg.getRegionDestination(); TopicStorePrefetch tsp = topics.get(dest); if (tsp != null) { - // cache can be come high priority cache for immediate dispatch + // cache can become high priority cache for immediate dispatch final int priority = msg.getPriority(); - if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) { + if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) { if (priority > tsp.getCurrentLowestPriority()) { if (LOG.isTraceEnabled()) { LOG.trace("enabling cache for cursor on high priority message " + priority + ", current lowest: " + tsp.getCurrentLowestPriority()); } - tsp.cacheEnabled = true; + tsp.setCacheEnabled(true); cacheCurrentLowestPriority = tsp.getCurrentLowestPriority(); } } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) { @@ -206,7 +206,7 @@ public class StoreDurableSubscriberCurso + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority() + " cache lowest: " + cacheCurrentLowestPriority); } - tsp.cacheEnabled = false; + tsp.setCacheEnabled(false); cacheCurrentLowestPriority = UNKNOWN; } tsp.addMessageLast(node); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Tue Feb 22 18:51:54 2011 @@ -297,7 +297,7 @@ public class StoreQueueCursor extends Ab @Override public boolean isCacheEnabled() { - cacheEnabled = isUseCache(); + boolean cacheEnabled = isUseCache(); if (cacheEnabled) { if (persistent != null) { cacheEnabled &= persistent.isCacheEnabled(); @@ -305,6 +305,7 @@ public class StoreQueueCursor extends Ab if (nonPersistent != null) { cacheEnabled &= nonPersistent.isCacheEnabled(); } + setCacheEnabled(cacheEnabled); } return cacheEnabled; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Feb 22 18:51:54 2011 @@ -132,6 +132,6 @@ class TopicStorePrefetch extends Abstrac @Override public String toString() { - return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")"; + return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Feb 22 18:51:54 2011 @@ -40,6 +40,25 @@ import org.slf4j.LoggerFactory; */ public class JDBCMessageStore extends AbstractMessageStore { + class Duration { + static final int LIMIT = 100; + final long start = System.currentTimeMillis(); + final String name; + + Duration(String name) { + this.name = name; + } + void end() { + end(null); + } + void end(Object o) { + long duration = System.currentTimeMillis() - start; + + if (duration > LIMIT) { + System.err.println(name + " took a long time: " + duration + "ms " + o); + } + } + } private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); protected final WireFormat wireFormat; protected final JDBCAdapter adapter; @@ -58,7 +77,6 @@ public class JDBCMessageStore extends Ab } public void addMessage(ConnectionContext context, Message message) throws IOException { - MessageId messageId = message.getMessageId(); if (audit != null && audit.isDuplicate(message)) { if (LOG.isDebugEnabled()) { @@ -90,6 +108,10 @@ public class JDBCMessageStore extends Ab } finally { c.close(); } + onAdd(sequenceId, message.getPriority()); + } + + protected void onAdd(long sequenceId, byte priority) { } public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Feb 22 18:51:54 2011 @@ -18,10 +18,10 @@ package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; @@ -102,22 +102,120 @@ public class JDBCTopicMessageStore exten } } - private class LastRecovered { - long sequence = 0; - byte priority = 9; - - public void update(long sequence, Message msg) { - this.sequence = sequence; - this.priority = msg.getPriority(); + private class LastRecovered implements Iterable { + LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10]; + LastRecovered() { + for (int i=0; i iterator() { + return new PriorityIterator(); + } + + class PriorityIterator implements Iterator { + int current = 9; + public boolean hasNext() { + for (int i=current; i>=0; i--) { + if (perPriority[i].hasMessages()) { + current = i; + return true; + } + } + return false; + } + + public LastRecoveredEntry next() { + return perPriority[current]; + } + + public void remove() { + throw new RuntimeException("not implemented"); + } + } + } + + private class LastRecoveredEntry { + final int priority; + long recovered = 0; + long stored = Integer.MAX_VALUE; + + public LastRecoveredEntry(int priority) { + this.priority = priority; } public String toString() { - return "" + sequence + ":" + priority; + return priority + "-" + stored + ":" + recovered; + } + + public void exhausted() { + stored = recovered; + } + + public boolean hasMessages() { + return stored > recovered; + } + } + + class LastRecoveredAwareListener implements JDBCMessageRecoveryListener { + final MessageRecoveryListener delegate; + final int maxMessages; + LastRecoveredEntry lastRecovered; + int recoveredCount; + int recoveredMarker; + + public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) { + this.delegate = delegate; + this.maxMessages = maxMessages; + } + + public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { + if (delegate.hasSpace()) { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + if (delegate.recoverMessage(msg)) { + lastRecovered.recovered = sequenceId; + recoveredCount++; + return true; + } + } + return false; + } + + public boolean recoverMessageReference(String reference) throws Exception { + return delegate.recoverMessageReference(new MessageId(reference)); + } + + public void setLastRecovered(LastRecoveredEntry lastRecovered) { + this.lastRecovered = lastRecovered; + recoveredMarker = recoveredCount; + } + + public boolean complete() { + return !delegate.hasSpace() || recoveredCount == maxMessages; + } + + public boolean stalled() { + return recoveredMarker == recoveredCount; } } public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + //Duration duration = new Duration("recoverNextMessages"); TransactionContext c = persistenceAdapter.getTransactionContext(); String key = getSubscriptionKey(clientId, subscriptionName); @@ -125,38 +223,38 @@ public class JDBCTopicMessageStore exten subscriberLastRecoveredMap.put(key, new LastRecovered()); } final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key); + LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned); try { - JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() { - public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { - if (listener.hasSpace()) { - Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); - msg.getMessageId().setBrokerSequenceId(sequenceId); - if (listener.recoverMessage(msg)) { - lastRecovered.update(sequenceId, msg); - return true; - } - } - return false; - } - - public boolean recoverMessageReference(String reference) throws Exception { - return listener.recoverMessageReference(new MessageId(reference)); - } - - }; if (LOG.isTraceEnabled()) { LOG.trace(key + " existing last recovered: " + lastRecovered); } if (isPrioritizedMessages()) { - adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, - lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener); + Iterator it = lastRecovered.iterator(); + for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) { + LastRecoveredEntry entry = it.next(); + recoveredAwareListener.setLastRecovered(entry); + //Duration microDuration = new Duration("recoverNextMessages:loop"); + adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, + entry.recovered, entry.priority, maxReturned, recoveredAwareListener); + //microDuration.end(entry); + if (recoveredAwareListener.stalled()) { + if (recoveredAwareListener.complete()) { + break; + } else { + entry.exhausted(); + } + } + } } else { + LastRecoveredEntry last = lastRecovered.defaultPriority(); + recoveredAwareListener.setLastRecovered(last); adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, - lastRecovered.sequence, 0, maxReturned, jdbcListener); + last.recovered, 0, maxReturned, recoveredAwareListener); } if (LOG.isTraceEnabled()) { LOG.trace(key + " last recovered: " + lastRecovered); } + //duration.end(); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); } finally { @@ -168,6 +266,14 @@ public class JDBCTopicMessageStore exten subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName)); } + protected void onAdd(long sequenceId, byte priority) { + // update last recovered state + for (LastRecovered last : subscriberLastRecoveredMap.values()) { + last.updateStored(sequenceId, priority); + } + } + + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { TransactionContext c = persistenceAdapter.getTransactionContext(); try { @@ -223,6 +329,7 @@ public class JDBCTopicMessageStore exten } public int getMessageCount(String clientId, String subscriberName) throws IOException { + //Duration duration = new Duration("getMessageCount"); int result = 0; TransactionContext c = persistenceAdapter.getTransactionContext(); try { @@ -236,6 +343,7 @@ public class JDBCTopicMessageStore exten if (LOG.isTraceEnabled()) { LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result); } + //duration.end(); return result; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Tue Feb 22 18:51:54 2011 @@ -281,13 +281,13 @@ public class Statements { public String getFindDurableSubMessagesByPriorityStatement() { if (findDurableSubMessagesByPriorityStatement == null) { - findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " - + getFullAckTableName() + " D " + findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M," + + " " + getFullAckTableName() + " D" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER" + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID" - + " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )" - + " ORDER BY M.PRIORITY DESC, M.ID"; + + " AND M.ID > ? AND M.PRIORITY = ?" + + " ORDER BY M.ID"; } return findDurableSubMessagesByPriorityStatement; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java?rev=1073453&r1=1073452&r2=1073453&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java Tue Feb 22 18:51:54 2011 @@ -23,15 +23,19 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; @@ -43,32 +47,28 @@ import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; +//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.Wait; +//import org.apache.commons.dbcp.BasicDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConcurrentProducerDurableConsumerTest extends TestSupport { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class); - private int consumerCount = 1; + private int consumerCount = 5; BrokerService broker; protected List connections = Collections.synchronizedList(new ArrayList()); - protected Map consumers = new HashMap(); + protected Map consumers = new HashMap(); protected MessageIdList allMessagesList = new MessageIdList(); private int messageSize = 1024; - public void testPlaceHolder() throws Exception { - } - - public void x_initCombosForTestSendRateWithActivatingConsumers() throws Exception { + public void initCombosForTestSendRateWithActivatingConsumers() throws Exception { addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM}); } - public void x_testSendRateWithActivatingConsumers() throws Exception { + public void testSendRateWithActivatingConsumers() throws Exception { final Destination destination = createDestination(); final ConnectionFactory factory = createConnectionFactory(); startInactiveConsumers(factory, destination); @@ -78,12 +78,12 @@ public class ConcurrentProducerDurableCo MessageProducer producer = createMessageProducer(session, destination); // preload the durable consumers - double[] inactiveConsumerStats = produceMessages(destination, 200, 100, session, producer, null); + double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null); LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1] + ", max: " + inactiveConsumerStats[0] + ", multiplier: " + (inactiveConsumerStats[0]/inactiveConsumerStats[1])); - // periodically start a durable sub that is has a backlog - final int consumersToActivate = 1; + // periodically start a durable sub that has a backlog + final int consumersToActivate = 5; final Object addConsumerSignal = new Object(); Executors.newCachedThreadPool(new ThreadFactory() { @Override @@ -96,16 +96,15 @@ public class ConcurrentProducerDurableCo try { MessageConsumer consumer = null; for (int i = 0; i < consumersToActivate; i++) { - LOG.info("Waiting for add signal"); + LOG.info("Waiting for add signal from producer..."); synchronized (addConsumerSignal) { addConsumerSignal.wait(30 * 60 * 1000); } + TimedMessageListener listener = new TimedMessageListener(); consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1)); LOG.info("Created consumer " + consumer); - MessageIdList list = new MessageIdList(); - list.setParent(allMessagesList); - consumer.setMessageListener(list); - consumers.put(consumer, list); + consumer.setMessageListener(listener); + consumers.put(consumer, listener); } } catch (Exception e) { LOG.error("failed to start consumer", e); @@ -114,18 +113,44 @@ public class ConcurrentProducerDurableCo }); - double[] stats = produceMessages(destination, 20, 100, session, producer, addConsumerSignal); + double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal); - LOG.info(" with concurrent activate, ave: " + stats[1] + ", max: " + stats[0] + ", multiplier: " + (stats[0]/stats[1])); - assertTrue("max (" + stats[0] + ") within reasonable multiplier of ave (" + stats[1] + ")", - stats[0] < 5 * stats[1]); + LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1])); + + while(consumers.size() < consumersToActivate) { + TimeUnit.SECONDS.sleep(2); + } + long timeToFirstAccumulator = 0; + for (TimedMessageListener listener : consumers.values()) { + long time = listener.getFirstReceipt(); + timeToFirstAccumulator += time; + LOG.info("Time to first " + time); + } + LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size()); + + for (TimedMessageListener listener : consumers.values()) { + LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime); + } + + //assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")", + // statsWithActive[0] < 5 * statsWithActive[1]); + + // compare no active to active + LOG.info("Ave send time with active: " + statsWithActive[1] + + " as multiplier of ave with none active: " + inactiveConsumerStats[1] + + ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1])); + + assertTrue("Ave send time with active: " + statsWithActive[1] + + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1] + + ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]), + statsWithActive[1] < 15 * inactiveConsumerStats[1]); } public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception { addCombinationValues("defaultPersistenceAdapter", - new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM}); + new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); } public void x_testSendWithInactiveAndActiveConsumers() throws Exception { @@ -150,7 +175,7 @@ public class ConcurrentProducerDurableCo LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1])); - final int reasonableMultiplier = 4; // not so reasonable, but on slow disks it can be + final int reasonableMultiplier = 15; // not so reasonable but improving assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: " + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]), withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier); @@ -188,9 +213,8 @@ public class ConcurrentProducerDurableCo protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception { MessageConsumer consumer; for (int i = 0; i < consumerCount; i++) { + TimedMessageListener list = new TimedMessageListener(); consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1)); - MessageIdList list = new MessageIdList(); - list.setParent(allMessagesList); consumer.setMessageListener(list); consumers.put(consumer, list); } @@ -212,39 +236,44 @@ public class ConcurrentProducerDurableCo * @throws Exception */ private double[] produceMessages(Destination destination, - int toSend, - int numIterations, + final int toSend, + final int numIterations, Session session, MessageProducer producer, Object addConsumerSignal) throws Exception { long start; long count = 0; - double max = 0, sum = 0; + double batchMax = 0, max = 0, sum = 0; for (int i=0; i + mysql + mysql-connector-java + 5.1.10 + test + + + commons-dbcp + commons-dbcp + 1.2.2 + test + + */ + } else { + setDefaultPersistenceAdapter(brokerService); + } return brokerService; } @@ -311,6 +372,8 @@ public class ConcurrentProducerDurableCo ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); prefetchPolicy.setAll(1); factory.setPrefetchPolicy(prefetchPolicy); + + factory.setDispatchAsync(true); return factory; } @@ -318,4 +381,55 @@ public class ConcurrentProducerDurableCo return suite(ConcurrentProducerDurableConsumerTest.class); } + class TimedMessageListener implements MessageListener { + final int batchSize = 1000; + CountDownLatch firstReceiptLatch = new CountDownLatch(1); + long mark = System.currentTimeMillis(); + long firstReceipt = 0l; + long receiptAccumulator = 0; + long batchReceiptAccumulator = 0; + long maxReceiptTime = 0; + AtomicLong count = new AtomicLong(0); + + @Override + public void onMessage(Message message) { + final long current = System.currentTimeMillis(); + final long duration = current - mark; + receiptAccumulator += duration; + allMessagesList.onMessage(message); + if (count.incrementAndGet() == 1) { + firstReceipt = duration; + firstReceiptLatch.countDown(); + LOG.info("First receipt in " + firstReceipt + "ms"); + } else if (count.get() % batchSize == 0) { + LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms"); + batchReceiptAccumulator=0; + } + maxReceiptTime = Math.max(maxReceiptTime, duration); + receiptAccumulator += duration; + batchReceiptAccumulator += duration; + mark = current; + } + + long getMessageCount() { + return count.get(); + } + + long getFirstReceipt() throws Exception { + firstReceiptLatch.await(30, TimeUnit.SECONDS); + return firstReceipt; + } + + public long waitForReceivedLimit(long limit) throws Exception { + final long expiry = System.currentTimeMillis() + 30*60*1000; + while (count.get() < limit) { + if (System.currentTimeMillis() > expiry) { + throw new RuntimeException("Expired waiting for X messages, " + limit); + } + TimeUnit.SECONDS.sleep(2); + } + return receiptAccumulator/(limit/batchSize); + } + } + }