activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1045219 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/jdbc/...
Date Mon, 13 Dec 2010 16:20:41 GMT
Author: gtully
Date: Mon Dec 13 16:20:40 2010
New Revision: 1045219

URL: http://svn.apache.org/viewvc?rev=1045219&view=rev
Log:
respect storeHasMessages such that the store batch size, form max page size is respected,
improves preformance by negating the replay of high priority inflight messages as a the tail
of a backlog is recovered. Fix order issue with high priority cache such that sequence order
of lower priority messages is respected. additional test for same and default audit size to
10 when priority is enabled

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Dec 13 16:20:40 2010
@@ -49,6 +49,9 @@ public abstract class BaseDestination im
     public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
     public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
     public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
+    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
+    public static final int MAX_AUDIT_DEPTH = 2048;
+
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Dec 13 16:20:40 2010
@@ -63,7 +63,7 @@ public class DurableTopicSubscription ex
         
     }
 
-    public boolean isActive() {
+    public final boolean isActive() {
         return active.get();
     }
 
@@ -220,6 +220,12 @@ public class DurableTopicSubscription ex
         super.add(node);
     }
 
+    protected void dispatchPending() throws IOException {
+        if (isActive()) {
+            super.dispatchPending();
+        }
+    }
+    
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         synchronized(pending) {
             pending.addRecoveredMessage(message);
@@ -239,7 +245,7 @@ public class DurableTopicSubscription ex
     }
 
     protected boolean canDispatch(MessageReference node) {
-        return active.get();
+        return isActive();
     }
 
     protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference
node) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Mon Dec 13 16:20:40 2010
@@ -40,8 +40,8 @@ public abstract class AbstractPendingMes
     protected int memoryUsageHighWaterMark = 70;
     protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
     protected SystemUsage systemUsage;
-    protected int maxProducersToAudit=1024;
-    protected int maxAuditDepth=1000;
+    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
+    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
     protected boolean useCache=true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Dec 13 16:20:40 2010
@@ -36,10 +36,11 @@ public abstract class AbstractStoreCurso
     private Iterator<MessageReference> iterator = null;
     protected boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
-    protected boolean storeHasMessages = false;
+    private boolean storeHasMessages = false;
     protected int size;
     private MessageId lastCachedId;
-    
+    private boolean hadSpace = false;
+
     protected AbstractStoreCursor(Destination destination) {
         super((destination != null ? destination.isPrioritizedMessages():false));
         this.regionDestination=destination;
@@ -89,6 +90,7 @@ public abstract class AbstractStoreCurso
             batchList.addMessageLast(message);
             clearIterator(true);
             recovered = true;
+            storeHasMessages = true;
         } else {
             /*
              * we should expect to get these - as the message is recorded as it before it
goes into
@@ -99,7 +101,6 @@ public abstract class AbstractStoreCurso
                 LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
                         + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
             }
-            storeHasMessages = true;
         }
         return recovered;
     }
@@ -187,6 +188,7 @@ public abstract class AbstractStoreCurso
                 }
             }
         }
+        this.storeHasMessages = true;
         size++;
     }
 
@@ -229,7 +231,7 @@ public abstract class AbstractStoreCurso
     }
     
     
-    public final synchronized void gc() {
+    public synchronized void gc() {
         for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
             MessageReference msg = i.next();
             rollback(msg.getMessageId());
@@ -240,8 +242,13 @@ public abstract class AbstractStoreCurso
         batchResetNeeded = true;
         this.cacheEnabled=false;
     }
-    
-    
+
+    @Override
+    public boolean hasSpace() {
+        hadSpace = super.hasSpace();
+        return hadSpace;
+    }
+
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
             LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
@@ -251,7 +258,7 @@ public abstract class AbstractStoreCurso
             resetBatch();
             this.batchResetNeeded = false;
         }
-        if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0))
{
+        if (this.batchList.isEmpty() && this.storeHasMessages && this.size
>0) {
             this.storeHasMessages = false;
             try {
                 doFillBatch();
@@ -259,7 +266,7 @@ public abstract class AbstractStoreCurso
                 LOG.error("Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
-            if (!this.batchList.isEmpty()) {
+            if (!this.batchList.isEmpty() || !hadSpace) {
                 this.storeHasMessages=true;
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Mon Dec 13 16:20:40 2010
@@ -50,7 +50,7 @@ public class StoreDurableSubscriberCurso
     private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
     private final Subscription subscription;
-    private int cacheCurrentPriority = UNKNOWN;
+    private int cacheCurrentLowestPriority = UNKNOWN;
     private boolean immediatePriorityDispatch = true;
     /**
      * @param broker Broker for this cursor
@@ -187,27 +187,27 @@ public class StoreDurableSubscriberCurso
                 Destination dest = msg.getRegionDestination();
                 TopicStorePrefetch tsp = topics.get(dest);
                 if (tsp != null) {
-
-                    // tps becomes a highest priority only cache when we have a new higher
priority
-                    // message and we are not currently caching
+                    // cache can be come high priority cache for immediate dispatch
                     final int priority = msg.getPriority();
                     if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch
&& !tsp.cacheEnabled) {
-                        if (priority > tsp.getLastDispatchPriority()) {
-                            // go get the latest priority message
+                        if (priority > tsp.getCurrentLowestPriority()) {
                             if (LOG.isTraceEnabled()) {
-                                LOG.trace("enabling cache for cursor on high priority message
" + priority);
+                                LOG.trace("enabling cache for cursor on high priority message
" + priority
+                                        + ", current lowest: " + tsp.getCurrentLowestPriority());
                             }
                             tsp.cacheEnabled = true;
-                            cacheCurrentPriority = priority;
+                            cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
                         }
-                    } else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority)
{
+                    } else if (cacheCurrentLowestPriority != UNKNOWN && priority
<= cacheCurrentLowestPriority) {
                         // go to the store to get next priority message as lower priority
messages may be recovered
-                        // already
-                        tsp.clear();
-                        cacheCurrentPriority = UNKNOWN;
+                        // already and need to acked sequence order
                         if (LOG.isTraceEnabled()) {
-                            LOG.trace("disabling/clearing cache for cursor on lower priority
message " + priority);
+                            LOG.trace("disabling/clearing cache for cursor on lower priority
message "
+                                    + priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
+                                    + " cache lowest: " + cacheCurrentLowestPriority);
                         }
+                        tsp.cacheEnabled = false;
+                        cacheCurrentLowestPriority = UNKNOWN;
                     }
                     tsp.addMessageLast(node);
                 }
@@ -299,6 +299,7 @@ public class StoreDurableSubscriberCurso
         for (PendingMessageCursor tsp : storePrefetches) {
             tsp.gc();
         }
+        cacheCurrentLowestPriority = UNKNOWN;
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Mon Dec 13 16:20:40 2010
@@ -38,6 +38,7 @@ class TopicStorePrefetch extends Abstrac
     private final String clientId;
     private final String subscriberName;
     private final Subscription subscription;
+    private int currentLowestPriority;
     
     /**
      * @param topic
@@ -52,6 +53,15 @@ class TopicStorePrefetch extends Abstrac
         this.subscriberName = subscriberName;
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
+        resetCurrentLowestPriority();
+    }
+
+    private void resetCurrentLowestPriority() {
+        currentLowestPriority = 9;
+    }
+
+    public synchronized int getCurrentLowestPriority() {
+        return currentLowestPriority;
     }
 
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@@ -62,13 +72,19 @@ class TopicStorePrefetch extends Abstrac
         
     @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("recover: " + message.getMessageId() + ", priority: " + message.getPriority());
+        }
+        boolean recovered = false;
         MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
         if (this.subscription.matches(message, messageEvaluationContext)) {
-            return super.recoverMessage(message, cached);
+            recovered = super.recoverMessage(message, cached);
+            if (recovered) {
+                currentLowestPriority = Math.min(currentLowestPriority, message.getPriority());
               
+            }
         }
-        return false;
-        
+        return recovered;      
     }
     
     @Override
@@ -84,7 +100,11 @@ class TopicStorePrefetch extends Abstrac
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
-            return this.store.isEmpty();
+            boolean empty = this.store.isEmpty();
+            if (empty) {
+                resetCurrentLowestPriority();
+            }
+            return empty;
             
         } catch (Exception e) {
             LOG.error("Failed to get message count", e);
@@ -97,6 +117,12 @@ class TopicStorePrefetch extends Abstrac
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);
     }
+
+    @Override
+    public synchronized void gc() {
+        super.gc();
+        resetCurrentLowestPriority();
+    }
     
     @Override
     protected void doFillBatch() throws Exception {
@@ -104,10 +130,6 @@ class TopicStorePrefetch extends Abstrac
                 maxBatchSize, this);
     }
 
-    public int getLastDispatchPriority() {
-        return last != null? last.getMessage().getPriority() : 9;
-    }
-
     @Override
     public String toString() {
         return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + ","
+ subscriberName + ")";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Dec 13 16:20:40 2010
@@ -55,9 +55,9 @@ public class PolicyEntry extends Destina
     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
-    private int maxProducersToAudit=32;
-    private int maxAuditDepth=2048;
-    private int maxQueueAuditDepth=2048;
+    private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
+    private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
+    private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
     private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
@@ -217,7 +217,12 @@ public class PolicyEntry extends Destina
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }
-        sub.setMaxAuditDepth(getMaxAuditDepth());
+        int auditDepth = getMaxAuditDepth();
+        if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages())
{
+            sub.setMaxAuditDepth(auditDepth * 10);
+        } else {
+            sub.setMaxAuditDepth(auditDepth);
+        }
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
         sub.setUsePrefetchExtension(isUsePrefetchExtension());        
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Mon Dec 13 16:20:40 2010
@@ -144,6 +144,9 @@ public class JDBCTopicMessageStore exten
                 }
 
             };
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(key + " existing last recovered: " + lastRecovered);
+            }
             if (isPrioritizedMessages()) {
                 adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
                         lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
@@ -223,7 +226,7 @@ public class JDBCTopicMessageStore exten
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId,
subscriberName, isPrioritizedMessages());
+            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId,
subscriberName, isPrioritizedMessages());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get Message Count: " + clientId +
". Reason: " + e, e);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Mon Dec 13 16:20:40 2010
@@ -98,7 +98,7 @@ public class NegativeQueueTest extends A
     protected static int PREFETCH_SIZE = 1000;  
     
     protected BrokerService broker;
-    protected String bindAddress = "tcp://localhost:60706";
+    protected String bindAddress = "tcp://localhost:0";
     
     public void testWithDefaultPrefetch() throws Exception{
         PREFETCH_SIZE = 1000;
@@ -311,6 +311,7 @@ public class NegativeQueueTest extends A
         configureBroker(answer);
         answer.start();
         answer.waitUntilStarted();
+        bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString();
         return answer;
     }
     
@@ -329,7 +330,7 @@ public class NegativeQueueTest extends A
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);
         answer.setDeleteAllMessagesOnStartup(true);
-        answer.addConnector(bindAddress);
+        answer.addConnector("tcp://localhost:0");
 
         MemoryUsage memoryUsage = new MemoryUsage();
         memoryUsage.setLimit(MEMORY_USAGE); 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Mon Dec 13 16:20:40 2010
@@ -54,6 +54,7 @@ abstract public class MessagePriorityTes
     public boolean useCache = true;
     public boolean dispatchAsync = true;
     public boolean prioritizeMessages = true;
+    public boolean immediatePriorityDispatch = true;
     public int prefetchVal = 500;
 
     public int MSG_NUM = 600;
@@ -73,7 +74,7 @@ abstract public class MessagePriorityTes
         policy.setUseCache(useCache);
         StorePendingDurableSubscriberMessageStoragePolicy durableSubPending =
                 new StorePendingDurableSubscriberMessageStoragePolicy();
-        durableSubPending.setImmediatePriorityDispatch(true);
+        durableSubPending.setImmediatePriorityDispatch(immediatePriorityDispatch);
         policy.setPendingDurableSubscriberPolicy(durableSubPending);
         PolicyMap policyMap = new PolicyMap();
         policyMap.put(new ActiveMQQueue("TEST"), policy);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Mon Dec 13 16:20:40 2010
@@ -22,13 +22,18 @@ import java.util.HashMap;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.TopicSubscriber;
 import junit.framework.Test;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.MessagePriorityTest;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -36,7 +41,7 @@ import org.apache.derby.jdbc.EmbeddedDat
 public class JDBCMessagePriorityTest extends MessagePriorityTest {
 
     private static final Log LOG = LogFactory.getLog(JDBCMessagePriorityTest.class);
-    
+
     @Override
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception
{
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
@@ -81,9 +86,9 @@ public class JDBCMessagePriorityTest ext
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i = 0; i < MSG_NUM * 4; i++) {
             Message msg = sub.receive(10000);
-            LOG.debug("received i=" + i + ", m=" + (msg!=null?
+            LOG.debug("received i=" + i + ", m=" + (msg != null ?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
-                    : null) );
+                    : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", priorities[i / MSG_NUM],
msg.getJMSPriority());
             if (i > 0 && i % closeFrequency == 0) {
@@ -97,7 +102,7 @@ public class JDBCMessagePriorityTest ext
     }
 
     public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
-        addCombinationValues("prioritizeMessages", new Object[] {Boolean.TRUE, Boolean.FALSE});
+        addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE});
     }
 
     public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
@@ -115,7 +120,7 @@ public class JDBCMessagePriorityTest ext
 
         final AtomicInteger[] messageCounts = new AtomicInteger[maxPriority];
         Vector<ProducerThread> producers = new Vector<ProducerThread>();
-        for (int priority=0; priority <maxPriority; priority++) {
+        for (int priority = 0; priority < maxPriority; priority++) {
             producers.add(new ProducerThread(topic, MSG_NUM, priority));
             messageCounts[priority] = new AtomicInteger(0);
         }
@@ -124,15 +129,15 @@ public class JDBCMessagePriorityTest ext
             producer.start();
         }
 
-        final int closeFrequency = MSG_NUM/2;
+        final int closeFrequency = MSG_NUM / 2;
         HashMap dups = new HashMap();
         sub = consumerSession.createDurableSubscriber(topic, subName);
-        for (int i=0; i < MSG_NUM * maxPriority; i++) {
+        for (int i = 0; i < MSG_NUM * maxPriority; i++) {
             Message msg = sub.receive(10000);
-            LOG.debug("received i=" + i + ", m=" + (msg!=null?
+            LOG.debug("received i=" + i + ", m=" + (msg != null ?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
-                    : null) );
-            assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(),
subName));            
+                    : null));
+            assertNull("no duplicate message failed on : " + msg.getJMSMessageID(), dups.put(msg.getJMSMessageID(),
subName));
             assertNotNull("Message " + i + " was null", msg);
             messageCounts[msg.getJMSPriority()].incrementAndGet();
             if (i > 0 && i % closeFrequency == 0) {
@@ -151,6 +156,105 @@ public class JDBCMessagePriorityTest ext
         }
     }
 
+    public void initCombosForTestConcurrentRate() {
+        addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)});
+    }
+
+    public void testConcurrentRate() throws Exception {
+        ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+        final String subName = "priorityConcurrent";
+        Connection consumerConn = factory.createConnection();
+        consumerConn.setClientID("subName");
+        consumerConn.start();
+        Session consumerSession = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, subName);
+        sub.close();
+
+        final int TO_SEND = 2000;
+        final Vector<Message> duplicates = new Vector<Message>();
+        final int[] dups = new int[TO_SEND * 4];
+        long start;
+        double max = 0, sum = 0;
+        MessageProducer messageProducer = sess.createProducer(topic);
+        TextMessage message = sess.createTextMessage();
+        for (int i = 0; i < TO_SEND; i++) {
+            int priority = i % 10;
+            message.setText(i + "-" + priority);
+            message.setIntProperty("seq", i);
+            message.setJMSPriority(priority);
+            if (i > 0 && i % 1000 == 0) {
+                LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
+            }
+            start = System.currentTimeMillis();
+            messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(),
0);
+            long duration = System.currentTimeMillis() - start;
+            max = Math.max(max, duration);
+            if (duration == max) {
+                LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
+            }
+            sum += duration;
+        }
+
+        LOG.info("Sent: " + TO_SEND + ", max send time: " + max);
+
+        double noConsumerAve = (sum * 100 / TO_SEND);
+        sub = consumerSession.createDurableSubscriber(topic, subName);
+        final AtomicInteger count = new AtomicInteger();
+        sub.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                try {
+                    count.incrementAndGet();
+                    if (count.get() % 100 == 0) {
+                        LOG.info("onMessage: count: " + count.get() + ", " + ((TextMessage)
message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID());
+                    }
+                    int seqNo = message.getIntProperty("seq");
+                    if (dups[seqNo] == 0) {
+                        dups[seqNo] = 1;
+                    } else {
+                        LOG.error("Duplicate: " + ((TextMessage) message).getText() + ",
" + message.getJMSMessageID());
+                        duplicates.add(message);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        LOG.info("Activated consumer");
+        sum = max = 0;
+        for (int i = TO_SEND; i < (TO_SEND * 2); i++) {
+            int priority = i % 10;
+            message.setText(i + "-" + priority);
+            message.setIntProperty("seq", i);
+            message.setJMSPriority(priority);
+            if (i > 0 && i % 1000 == 0) {
+                LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
+            }
+            start = System.currentTimeMillis();
+            messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(),
0);
+            long duration = System.currentTimeMillis() - start;
+            max = Math.max(max, duration);
+            if (duration == max) {
+                LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
+            }
+            sum += duration;
+        }
+        LOG.info("Sent another: " + TO_SEND + ", max send time: " + max);
+
+        double withConsumerAve = (sum * 100 / TO_SEND);
+        assertTrue("max three times as slow with consumer:" + withConsumerAve + " , noConsumerMax:"
+ noConsumerAve,
+                withConsumerAve < noConsumerAve * 3);
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                LOG.info("count: " + count.get());
+                return TO_SEND * 2 == count.get();
+            }
+        }, 60 * 1000);
+
+        assertTrue("No duplicates : " + duplicates, duplicates.isEmpty());
+        assertEquals("got all messages", TO_SEND * 2, count.get());
+    }
+
     public static Test suite() {
         return suite(JDBCMessagePriorityTest.class);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java?rev=1045219&r1=1045218&r2=1045219&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
Mon Dec 13 16:20:40 2010
@@ -50,7 +50,8 @@ public class JdbcDurableSubDupTest {
 
     private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class);
     final int prefetchVal = 150;
-    String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false";
+    String urlOptions = "jms.watchTopicAdvisories=false";
+    String url = null;
     String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal;
     String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
 
@@ -83,10 +84,11 @@ public class JdbcDurableSubDupTest {
         policyMap.setDefaultEntry(policyEntry);
         broker.setDestinationPolicy(policyMap);
 
-        broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("tcp://localhost:0");
         broker.setDeleteAllMessagesOnStartup(true);
         broker.start();
         broker.waitUntilStarted();
+        url = broker.getTransportConnectors().get(0).getConnectUri().toString() + "?" + urlOptions;
     }
 
     @After



Mime
View raw message