From commits-return-15166-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Dec 13 16:21:05 2010 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 69163 invoked from network); 13 Dec 2010 16:21:05 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 13 Dec 2010 16:21:05 -0000 Received: (qmail 3384 invoked by uid 500); 13 Dec 2010 16:21:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 3311 invoked by uid 500); 13 Dec 2010 16:21:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3303 invoked by uid 99); 13 Dec 2010 16:21:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 16:21:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Dec 2010 16:21:01 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9F637238897D; Mon, 13 Dec 2010 16:20:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1045219 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/jdbc/... Date: Mon, 13 Dec 2010 16:20:41 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101213162041.9F637238897D@eris.apache.org> Author: gtully Date: Mon Dec 13 16:20:40 2010 New Revision: 1045219 URL: http://svn.apache.org/viewvc?rev=1045219&view=rev Log: respect storeHasMessages such that the store batch size, form max page size is respected, improves preformance by negating the replay of high priority inflight messages as a the tail of a backlog is recovered. Fix order issue with high priority cache such that sequence order of lower priority messages is respected. additional test for same and default audit size to 10 when priority is enabled Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Dec 13 16:20:40 2010 @@ -49,6 +49,9 @@ public abstract class BaseDestination im public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; + public static final int MAX_PRODUCERS_TO_AUDIT = 64; + public static final int MAX_AUDIT_DEPTH = 2048; + protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Dec 13 16:20:40 2010 @@ -63,7 +63,7 @@ public class DurableTopicSubscription ex } - public boolean isActive() { + public final boolean isActive() { return active.get(); } @@ -220,6 +220,12 @@ public class DurableTopicSubscription ex super.add(node); } + protected void dispatchPending() throws IOException { + if (isActive()) { + super.dispatchPending(); + } + } + protected void doAddRecoveredMessage(MessageReference message) throws Exception { synchronized(pending) { pending.addRecoveredMessage(message); @@ -239,7 +245,7 @@ public class DurableTopicSubscription ex } protected boolean canDispatch(MessageReference node) { - return active.get(); + return isActive(); } protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Dec 13 16:20:40 2010 @@ -40,8 +40,8 @@ public abstract class AbstractPendingMes protected int memoryUsageHighWaterMark = 70; protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; protected SystemUsage systemUsage; - protected int maxProducersToAudit=1024; - protected int maxAuditDepth=1000; + protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT; + protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH; protected boolean enableAudit=true; protected ActiveMQMessageAudit audit; protected boolean useCache=true; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Mon Dec 13 16:20:40 2010 @@ -36,10 +36,11 @@ public abstract class AbstractStoreCurso private Iterator iterator = null; protected boolean cacheEnabled=false; protected boolean batchResetNeeded = true; - protected boolean storeHasMessages = false; + private boolean storeHasMessages = false; protected int size; private MessageId lastCachedId; - + private boolean hadSpace = false; + protected AbstractStoreCursor(Destination destination) { super((destination != null ? destination.isPrioritizedMessages():false)); this.regionDestination=destination; @@ -89,6 +90,7 @@ public abstract class AbstractStoreCurso batchList.addMessageLast(message); clearIterator(true); recovered = true; + storeHasMessages = true; } else { /* * we should expect to get these - as the message is recorded as it before it goes into @@ -99,7 +101,6 @@ public abstract class AbstractStoreCurso LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); } - storeHasMessages = true; } return recovered; } @@ -187,6 +188,7 @@ public abstract class AbstractStoreCurso } } } + this.storeHasMessages = true; size++; } @@ -229,7 +231,7 @@ public abstract class AbstractStoreCurso } - public final synchronized void gc() { + public synchronized void gc() { for (Iteratori = batchList.iterator();i.hasNext();) { MessageReference msg = i.next(); rollback(msg.getMessageId()); @@ -240,8 +242,13 @@ public abstract class AbstractStoreCurso batchResetNeeded = true; this.cacheEnabled=false; } - - + + @Override + public boolean hasSpace() { + hadSpace = super.hasSpace(); + return hadSpace; + } + protected final synchronized void fillBatch() { if (LOG.isTraceEnabled()) { LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded @@ -251,7 +258,7 @@ public abstract class AbstractStoreCurso resetBatch(); this.batchResetNeeded = false; } - if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) { + if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { this.storeHasMessages = false; try { doFillBatch(); @@ -259,7 +266,7 @@ public abstract class AbstractStoreCurso LOG.error("Failed to fill batch", e); throw new RuntimeException(e); } - if (!this.batchList.isEmpty()) { + if (!this.batchList.isEmpty() || !hadSpace) { this.storeHasMessages=true; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Dec 13 16:20:40 2010 @@ -50,7 +50,7 @@ public class StoreDurableSubscriberCurso private final PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; private final Subscription subscription; - private int cacheCurrentPriority = UNKNOWN; + private int cacheCurrentLowestPriority = UNKNOWN; private boolean immediatePriorityDispatch = true; /** * @param broker Broker for this cursor @@ -187,27 +187,27 @@ public class StoreDurableSubscriberCurso Destination dest = msg.getRegionDestination(); TopicStorePrefetch tsp = topics.get(dest); if (tsp != null) { - - // tps becomes a highest priority only cache when we have a new higher priority - // message and we are not currently caching + // cache can be come high priority cache for immediate dispatch final int priority = msg.getPriority(); if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) { - if (priority > tsp.getLastDispatchPriority()) { - // go get the latest priority message + if (priority > tsp.getCurrentLowestPriority()) { if (LOG.isTraceEnabled()) { - LOG.trace("enabling cache for cursor on high priority message " + priority); + LOG.trace("enabling cache for cursor on high priority message " + priority + + ", current lowest: " + tsp.getCurrentLowestPriority()); } tsp.cacheEnabled = true; - cacheCurrentPriority = priority; + cacheCurrentLowestPriority = tsp.getCurrentLowestPriority(); } - } else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority) { + } else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) { // go to the store to get next priority message as lower priority messages may be recovered - // already - tsp.clear(); - cacheCurrentPriority = UNKNOWN; + // already and need to acked sequence order if (LOG.isTraceEnabled()) { - LOG.trace("disabling/clearing cache for cursor on lower priority message " + priority); + LOG.trace("disabling/clearing cache for cursor on lower priority message " + + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority() + + " cache lowest: " + cacheCurrentLowestPriority); } + tsp.cacheEnabled = false; + cacheCurrentLowestPriority = UNKNOWN; } tsp.addMessageLast(node); } @@ -299,6 +299,7 @@ public class StoreDurableSubscriberCurso for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); } + cacheCurrentLowestPriority = UNKNOWN; } @Override Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Dec 13 16:20:40 2010 @@ -38,6 +38,7 @@ class TopicStorePrefetch extends Abstrac private final String clientId; private final String subscriberName; private final Subscription subscription; + private int currentLowestPriority; /** * @param topic @@ -52,6 +53,15 @@ class TopicStorePrefetch extends Abstrac this.subscriberName = subscriberName; this.maxProducersToAudit=32; this.maxAuditDepth=10000; + resetCurrentLowestPriority(); + } + + private void resetCurrentLowestPriority() { + currentLowestPriority = 9; + } + + public synchronized int getCurrentLowestPriority() { + return currentLowestPriority; } public boolean recoverMessageReference(MessageId messageReference) throws Exception { @@ -62,13 +72,19 @@ class TopicStorePrefetch extends Abstrac @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("recover: " + message.getMessageId() + ", priority: " + message.getPriority()); + } + boolean recovered = false; MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message); if (this.subscription.matches(message, messageEvaluationContext)) { - return super.recoverMessage(message, cached); + recovered = super.recoverMessage(message, cached); + if (recovered) { + currentLowestPriority = Math.min(currentLowestPriority, message.getPriority()); + } } - return false; - + return recovered; } @Override @@ -84,7 +100,11 @@ class TopicStorePrefetch extends Abstrac @Override protected synchronized boolean isStoreEmpty() { try { - return this.store.isEmpty(); + boolean empty = this.store.isEmpty(); + if (empty) { + resetCurrentLowestPriority(); + } + return empty; } catch (Exception e) { LOG.error("Failed to get message count", e); @@ -97,6 +117,12 @@ class TopicStorePrefetch extends Abstrac protected void resetBatch() { this.store.resetBatching(clientId, subscriberName); } + + @Override + public synchronized void gc() { + super.gc(); + resetCurrentLowestPriority(); + } @Override protected void doFillBatch() throws Exception { @@ -104,10 +130,6 @@ class TopicStorePrefetch extends Abstrac maxBatchSize, this); } - public int getLastDispatchPriority() { - return last != null? last.getMessage().getPriority() : 9; - } - @Override public String toString() { return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")"; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Dec 13 16:20:40 2010 @@ -55,9 +55,9 @@ public class PolicyEntry extends Destina private PendingQueueMessageStoragePolicy pendingQueuePolicy; private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; - private int maxProducersToAudit=32; - private int maxAuditDepth=2048; - private int maxQueueAuditDepth=2048; + private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; + private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; + private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; private boolean enableAudit=true; private boolean producerFlowControl = true; private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; @@ -217,7 +217,12 @@ public class PolicyEntry extends Destina cursor.setSystemUsage(memoryManager); sub.setPending(cursor); } - sub.setMaxAuditDepth(getMaxAuditDepth()); + int auditDepth = getMaxAuditDepth(); + if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { + sub.setMaxAuditDepth(auditDepth * 10); + } else { + sub.setMaxAuditDepth(auditDepth); + } sub.setMaxProducersToAudit(getMaxProducersToAudit()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Dec 13 16:20:40 2010 @@ -144,6 +144,9 @@ public class JDBCTopicMessageStore exten } }; + if (LOG.isTraceEnabled()) { + LOG.trace(key + " existing last recovered: " + lastRecovered); + } if (isPrioritizedMessages()) { adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName, lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener); @@ -223,7 +226,7 @@ public class JDBCTopicMessageStore exten int result = 0; TransactionContext c = persistenceAdapter.getTransactionContext(); try { - result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); + result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages()); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java Mon Dec 13 16:20:40 2010 @@ -98,7 +98,7 @@ public class NegativeQueueTest extends A protected static int PREFETCH_SIZE = 1000; protected BrokerService broker; - protected String bindAddress = "tcp://localhost:60706"; + protected String bindAddress = "tcp://localhost:0"; public void testWithDefaultPrefetch() throws Exception{ PREFETCH_SIZE = 1000; @@ -311,6 +311,7 @@ public class NegativeQueueTest extends A configureBroker(answer); answer.start(); answer.waitUntilStarted(); + bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString(); return answer; } @@ -329,7 +330,7 @@ public class NegativeQueueTest extends A pMap.setDefaultEntry(policy); answer.setDestinationPolicy(pMap); answer.setDeleteAllMessagesOnStartup(true); - answer.addConnector(bindAddress); + answer.addConnector("tcp://localhost:0"); MemoryUsage memoryUsage = new MemoryUsage(); memoryUsage.setLimit(MEMORY_USAGE); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Mon Dec 13 16:20:40 2010 @@ -54,6 +54,7 @@ abstract public class MessagePriorityTes public boolean useCache = true; public boolean dispatchAsync = true; public boolean prioritizeMessages = true; + public boolean immediatePriorityDispatch = true; public int prefetchVal = 500; public int MSG_NUM = 600; @@ -73,7 +74,7 @@ abstract public class MessagePriorityTes policy.setUseCache(useCache); StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy(); - durableSubPending.setImmediatePriorityDispatch(true); + durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch); policy.setPendingDurableSubscriberPolicy(durableSubPending); PolicyMap policyMap = new PolicyMap(); policyMap.put(new ActiveMQQueue("TEST"), policy); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Mon Dec 13 16:20:40 2010 @@ -22,13 +22,18 @@ import java.util.HashMap; import java.util.Vector; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.TopicSubscriber; import junit.framework.Test; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.MessagePriorityTest; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.derby.jdbc.EmbeddedDataSource; @@ -36,7 +41,7 @@ import org.apache.derby.jdbc.EmbeddedDat public class JDBCMessagePriorityTest extends MessagePriorityTest { private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class); - + @Override protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); @@ -81,9 +86,9 @@ public class JDBCMessagePriorityTest ext sub = sess.createDurableSubscriber(topic, subName); for (int i = 0; i < MSG_NUM * 4; i++) { Message msg = sub.receive(10000); - LOG.debug("received i=" + i + ", m=" + (msg!=null? + LOG.debug("received i=" + i + ", m=" + (msg != null ? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() - : null) ); + : null)); assertNotNull("Message " + i + " was null", msg); assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM], msg.getJMSPriority()); if (i > 0 && i % closeFrequency == 0) { @@ -97,7 +102,7 @@ public class JDBCMessagePriorityTest ext } public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() { - addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE}); } public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception { @@ -115,7 +120,7 @@ public class JDBCMessagePriorityTest ext final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority]; Vector producers = new Vector(); - for (int priority=0; priority 0 && i % closeFrequency == 0) { @@ -151,6 +156,105 @@ public class JDBCMessagePriorityTest ext } } + public void initCombosForTestConcurrentRate() { + addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)}); + } + + public void testConcurrentRate() throws Exception { + ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST"); + final String subName = "priorityConcurrent"; + Connection consumerConn = factory.createConnection(); + consumerConn.setClientID("subName"); + consumerConn.start(); + Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName); + sub.close(); + + final int TO_SEND = 2000; + final Vector duplicates = new Vector(); + final int[] dups = new int[TO_SEND * 4]; + long start; + double max = 0, sum = 0; + MessageProducer messageProducer = sess.createProducer(topic); + TextMessage message = sess.createTextMessage(); + for (int i = 0; i < TO_SEND; i++) { + int priority = i % 10; + message.setText(i + "-" + priority); + message.setIntProperty("seq", i); + message.setJMSPriority(priority); + if (i > 0 && i % 1000 == 0) { + LOG.info("Max send time: " + max + ". Sending message: " + message.getText()); + } + start = System.currentTimeMillis(); + messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0); + long duration = System.currentTimeMillis() - start; + max = Math.max(max, duration); + if (duration == max) { + LOG.info("new max: " + max + " on i=" + i + ", " + message.getText()); + } + sum += duration; + } + + LOG.info("Sent: " + TO_SEND + ", max send time: " + max); + + double noConsumerAve = (sum * 100 / TO_SEND); + sub = consumerSession.createDurableSubscriber(topic, subName); + final AtomicInteger count = new AtomicInteger(); + sub.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + count.incrementAndGet(); + if (count.get() % 100 == 0) { + LOG.info("onMessage: count: " + count.get() + ", " + ((TextMessage) message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID()); + } + int seqNo = message.getIntProperty("seq"); + if (dups[seqNo] == 0) { + dups[seqNo] = 1; + } else { + LOG.error("Duplicate: " + ((TextMessage) message).getText() + ", " + message.getJMSMessageID()); + duplicates.add(message); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + LOG.info("Activated consumer"); + sum = max = 0; + for (int i = TO_SEND; i < (TO_SEND * 2); i++) { + int priority = i % 10; + message.setText(i + "-" + priority); + message.setIntProperty("seq", i); + message.setJMSPriority(priority); + if (i > 0 && i % 1000 == 0) { + LOG.info("Max send time: " + max + ". Sending message: " + message.getText()); + } + start = System.currentTimeMillis(); + messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(), 0); + long duration = System.currentTimeMillis() - start; + max = Math.max(max, duration); + if (duration == max) { + LOG.info("new max: " + max + " on i=" + i + ", " + message.getText()); + } + sum += duration; + } + LOG.info("Sent another: " + TO_SEND + ", max send time: " + max); + + double withConsumerAve = (sum * 100 / TO_SEND); + assertTrue("max three times as slow with consumer:" + withConsumerAve + " , noConsumerMax:" + noConsumerAve, + withConsumerAve < noConsumerAve * 3); + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + LOG.info("count: " + count.get()); + return TO_SEND * 2 == count.get(); + } + }, 60 * 1000); + + assertTrue("No duplicates : " + duplicates, duplicates.isEmpty()); + assertEquals("got all messages", TO_SEND * 2, count.get()); + } + public static Test suite() { return suite(JDBCMessagePriorityTest.class); } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java Mon Dec 13 16:20:40 2010 @@ -50,7 +50,8 @@ public class JdbcDurableSubDupTest { private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class); final int prefetchVal = 150; - String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false"; + String urlOptions = "jms.watchTopicAdvisories=false"; + String url = null; String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal; String xmlMessage = ""; @@ -83,10 +84,11 @@ public class JdbcDurableSubDupTest { policyMap.setDefaultEntry(policyEntry); broker.setDestinationPolicy(policyMap); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:0"); broker.setDeleteAllMessagesOnStartup(true); broker.start(); broker.waitUntilStarted(); + url = broker.getTransportConnectors().get(0).getConnectUri().toString() + "?" + urlOptions; } @After