activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1044368 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/command/ ma...
Date Fri, 10 Dec 2010 14:18:55 GMT
Author: gtully
Date: Fri Dec 10 14:18:54 2010
New Revision: 1044368

URL: http://svn.apache.org/viewvc?rev=1044368&view=rev
Log:
better support for immediate priority higher message dispatch for durable subs, tidy up audit
and page size config for durables, use destination batch size to page in, remove extranious
calls to store getSize(), test for duplicate delivery, audit needs to be large when priority
is used and needs to be statically configured, prefetch is a bad model in with priority spread,
restrict priority values to 0-9 as impl is restricted to 0-9 jms range

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Fri Dec 10 14:18:54 2010
@@ -26,6 +26,7 @@ import javax.jms.JMSException;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
@@ -84,6 +85,11 @@ public class DurableTopicSubscription ex
         node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
     }
 
+    @Override
+    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
{
+        // statically configured via maxPageSize
+    }
+
     public void add(ConnectionContext context, Destination destination) throws Exception
{
         super.add(context, destination);
         // do it just once per destination
@@ -117,12 +123,6 @@ public class DurableTopicSubscription ex
             this.context = context;
             this.info = info;
             LOG.debug("Activating " + this);
-            int prefetch = info.getPrefetchSize();
-            if (prefetch>0) {
-                prefetch += prefetch/2;
-            }
-            int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
-            this.pending.setMaxAuditDepth(depth);
             if (!keepDurableSubsActive) {
                 for (Iterator<Destination> iter = destinations.values()
                         .iterator(); iter.hasNext();) {
@@ -134,6 +134,8 @@ public class DurableTopicSubscription ex
             synchronized (pending) {
                 pending.setSystemUsage(memoryManager);
                 pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+                pending.setMaxAuditDepth(getMaxAuditDepth());
+                pending.setMaxProducersToAudit(getMaxProducersToAudit());
                 pending.start();
                 // If nothing was in the persistent store, then try to use the
                 // recovery policy.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Dec 10 14:18:54 2010
@@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -67,7 +66,6 @@ public abstract class PrefetchSubscripti
     protected final SystemUsage usageManager;
     private final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
-    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
     private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
     
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
@@ -569,7 +567,7 @@ public abstract class PrefetchSubscripti
                     int numberToDispatch = countBeforeFull();
                     if (numberToDispatch > 0) {
                         setSlowConsumer(false);
-                        pending.setMaxBatchSize(numberToDispatch);
+                        setPendingBatchSize(pending, numberToDispatch);
                         int count = 0;
                         pending.reset();
                         while (pending.hasNext() && !isFull()
@@ -614,6 +612,10 @@ public abstract class PrefetchSubscripti
         }
     }
 
+    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch)
{
+        pending.setMaxBatchSize(numberToDispatch);
+    }
+
     protected boolean dispatch(final MessageReference node) throws IOException {
         final Message message = node.getMessage();
         if (message == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Fri Dec 10 14:18:54 2010
@@ -95,8 +95,9 @@ public abstract class AbstractStoreCurso
              * the cache. If subsequently, we pull out that message from the store (before
its deleted)
              * it will be a duplicate - but should be ignored
              */
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" cursor got duplicate: " + message);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+                        + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
             }
             storeHasMessages = true;
         }
@@ -125,7 +126,6 @@ public abstract class AbstractStoreCurso
     private synchronized void clearIterator(boolean ensureIterator) {
         boolean haveIterator = this.iterator != null;
         this.iterator=null;
-        last = null;
         if(haveIterator&&ensureIterator) {
             ensureIterator();
         }
@@ -176,8 +176,8 @@ public abstract class AbstractStoreCurso
         } else {
             if (cacheEnabled) {
                 cacheEnabled=false;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size
                             + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
                             + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
                 }
@@ -239,11 +239,6 @@ public abstract class AbstractStoreCurso
         clearIterator(false);
         batchResetNeeded = true;
         this.cacheEnabled=false;
-        if (isStarted()) { 
-            size = getStoreSize();
-        } else {
-            size = 0;
-        }
     }
     
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Fri Dec 10 14:18:54 2010
@@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFac
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
 
     private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
+    private static final int UNKNOWN = -1;
     private final String clientId;
     private final String subscriberName;
     private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination,
TopicStorePrefetch>();
@@ -49,7 +50,7 @@ public class StoreDurableSubscriberCurso
     private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
     private final Subscription subscription;
-    private int lastAddPriority = 0;
+    private int cacheCurrentPriority = UNKNOWN;
     private boolean immediatePriorityDispatch = true;
     /**
      * @param broker Broker for this cursor
@@ -72,12 +73,15 @@ public class StoreDurableSubscriberCurso
         this.nonPersistent.setMaxBatchSize(maxBatchSize);
         this.nonPersistent.setSystemUsage(systemUsage);
         this.storePrefetches.add(this.nonPersistent);
+
+        if (prioritizedMessages) {
+            setMaxAuditDepth(10*getMaxAuditDepth());
+        }
     }
 
     @Override
     public synchronized void start() throws Exception {
         if (!isStarted()) {
-            lastAddPriority = 0;
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
             	tsp.setMessageAudit(getMessageAudit());
@@ -107,11 +111,10 @@ public class StoreDurableSubscriberCurso
     public synchronized void add(ConnectionContext context, Destination destination) throws
Exception {
         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()))
{
             TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination,
clientId, subscriberName);
-            tsp.setMaxBatchSize(getMaxBatchSize());
+            tsp.setMaxBatchSize(destination.getMaxPageSize());
             tsp.setSystemUsage(systemUsage);
+            tsp.setMessageAudit(getMessageAudit());
             tsp.setEnableAudit(isEnableAudit());
-            tsp.setMaxAuditDepth(getMaxAuditDepth());
-            tsp.setMaxProducersToAudit(getMaxProducersToAudit());
             tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
@@ -184,16 +187,29 @@ public class StoreDurableSubscriberCurso
                 Destination dest = msg.getRegionDestination();
                 TopicStorePrefetch tsp = topics.get(dest);
                 if (tsp != null) {
-                    tsp.addMessageLast(node);
+
+                    // tps becomes a highest priority only cache when we have a new higher
priority
+                    // message and we are not currently caching
+                    final int priority = msg.getPriority();
                     if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch
&& !tsp.cacheEnabled) {
-                        final int priority = msg.getPriority();
-                        if (priority > lastAddPriority) {
+                        if (priority > tsp.getLastDispatchPriority()) {
                             // go get the latest priority message
-                            LOG.debug("Clearing cursor on high priority message " + priority);
-                            tsp.clear();
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("enabling cache for cursor on high priority message
" + priority);
+                            }
+                            tsp.cacheEnabled = true;
+                            cacheCurrentPriority = priority;
+                        }
+                    } else if (cacheCurrentPriority > 0 && priority < cacheCurrentPriority)
{
+                        // go to the store to get next priority message as lower priority
messages may be recovered
+                        // already
+                        tsp.clear();
+                        cacheCurrentPriority = UNKNOWN;
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("disabling/clearing cache for cursor on lower priority
message " + priority);
                         }
-                        lastAddPriority = priority;
                     }
+                    tsp.addMessageLast(node);
                 }
             }
 
@@ -272,12 +288,10 @@ public class StoreDurableSubscriberCurso
 
     @Override
     public void setMaxBatchSize(int newMaxBatchSize) {
-        if (newMaxBatchSize > getMaxBatchSize()) {
-            for (PendingMessageCursor storePrefetch : storePrefetches) {
-                storePrefetch.setMaxBatchSize(newMaxBatchSize);
-            }
-            super.setMaxBatchSize(newMaxBatchSize);
+        for (PendingMessageCursor storePrefetch : storePrefetches) {
+            storePrefetch.setMaxBatchSize(newMaxBatchSize);
         }
+        super.setMaxBatchSize(newMaxBatchSize);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Fri Dec 10 14:18:54 2010
@@ -104,6 +104,10 @@ class TopicStorePrefetch extends Abstrac
                 maxBatchSize, this);
     }
 
+    public int getLastDispatchPriority() {
+        return last != null? last.getMessage().getPriority() : 9;
+    }
+
     @Override
     public String toString() {
         return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + ","
+ subscriberName + ")";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Dec 10 14:18:54 2010
@@ -213,7 +213,7 @@ public class PolicyEntry extends Destina
             sub.setPrefetchSize(getDurableTopicPrefetch());
         }
         if (pendingDurableSubscriberPolicy != null) {
-            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId,
subName,prefetch,sub);
+            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId,
subName,sub.getPrefetchSize(),sub);
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri
Dec 10 14:18:54 2010
@@ -357,7 +357,13 @@ public abstract class Message extends Ba
     }
 
     public void setPriority(byte priority) {
-        this.priority = priority;
+        if (priority < 0) {
+            this.priority = 0;
+        } else if (priority > 9) {
+            this.priority = 9;
+        } else {
+            this.priority = priority;
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Fri Dec 10 14:18:54 2010
@@ -66,7 +66,7 @@ public class JDBCTopicMessageStore exten
                 adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0],
res[1]);
             }
             if (LOG.isTraceEnabled()) {
-                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ",
priority: " + res[1]);
+                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ",
priority: " + res[1] + " mid:" + messageId);
             }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -151,6 +151,9 @@ public class JDBCTopicMessageStore exten
                 adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
                         lastRecovered.sequence, 0, maxReturned, jdbcListener);
             }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(key + " last recovered: " + lastRecovered);
+            }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
Fri Dec 10 14:18:54 2010
@@ -264,6 +264,12 @@ public class ActiveMQMessageTest extends
         ActiveMQMessage msg = new ActiveMQMessage();
         msg.setJMSPriority(this.jmsPriority);
         assertTrue(msg.getJMSPriority() == this.jmsPriority);
+
+        msg.setJMSPriority(-90);
+        assertEquals(0, msg.getJMSPriority());
+
+        msg.setJMSPriority(90);
+        assertEquals(9, msg.getJMSPriority());                
     }
 
     public void testClearProperties() throws JMSException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1044368&r1=1044367&r2=1044368&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Fri Dec 10 14:18:54 2010
@@ -293,28 +293,63 @@ abstract public class MessagePriorityTes
         TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
         sub.close();
 
-        ProducerThread producerThread = new ProducerThread(topic, 5000, LOW_PRI);
+        final int numToProduce = 2000;
+        final int[] dups = new int[numToProduce*2];
+        ProducerThread producerThread = new ProducerThread(topic, numToProduce, LOW_PRI+1);
         producerThread.run();
         LOG.info("Low priority messages sent");
 
         sub = sess.createDurableSubscriber(topic, subName);
-        for (int i=0; i<200;i++) {
+        final int batchSize = 250;
+        int lowLowCount = 0;
+        for (int i=0; i<numToProduce; i++) {
+            Message msg = sub.receive(15000);
+            LOG.info("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() + "-" +
msg.getJMSPriority() : null));
+            assertNotNull("Message " + i + " was null", msg);
+            assertEquals("Message " + i + " has wrong priority", LOW_PRI+1, msg.getJMSPriority());
+            assertTrue("not duplicate ", dups[i] == 0);
+            dups[i] = 1;
+
+            if (i % batchSize == 0) {
+                producerThread.setMessagePriority(HIGH_PRI);
+                producerThread.setMessageCount(1);
+                producerThread.run();
+                LOG.info("High priority message sent, should be able to receive immediately");
+
+                if (i % batchSize*2 == 0) {
+                    producerThread.setMessagePriority(HIGH_PRI -1);
+                    producerThread.setMessageCount(1);
+                    producerThread.run();
+                    LOG.info("High -1 priority message sent, should be able to receive immediately");
+                }
+
+                if (i % batchSize*4 == 0) {
+                    producerThread.setMessagePriority(LOW_PRI);
+                    producerThread.setMessageCount(1);
+                    producerThread.run();
+                    lowLowCount++;
+                    LOG.info("Low low priority message sent, should not be able to receive
immediately");
+                }
+
+                msg = sub.receive(15000);
+                assertNotNull("Message was null", msg);
+                LOG.info("received hi? : " + msg);
+                assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
+                            
+                if (i % batchSize*2 == 0) {
+                    msg = sub.receive(15000);
+                    assertNotNull("Message was null", msg);
+                    LOG.info("received hi -1 ? i=" + i + ", " + msg);
+                    assertEquals("high priority", HIGH_PRI -1, msg.getJMSPriority());
+                }
+            }
+        }
+        for (int i=0; i<lowLowCount; i++) {
             Message msg = sub.receive(15000);
             LOG.debug("received i=" + i + ", " + (msg!=null? msg.getJMSMessageID() : null));
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", LOW_PRI, msg.getJMSPriority());
         }
-
-        producerThread.setMessagePriority(HIGH_PRI);
-        producerThread.setMessageCount(1);
-        producerThread.run();
-        LOG.info("High priority message sent");
-
-        // try and get the high priority message
-        Message msg = sub.receive(15000);
-        assertNotNull("Message was null", msg);
-        LOG.info("received: " + msg);
-        assertEquals("high priority", HIGH_PRI, msg.getJMSPriority());
     }
     
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java?rev=1044368&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
Fri Dec 10 14:18:54 2010
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertTrue;
+
+public class JdbcDurableSubDupTest {
+
+    private static final Log LOG = LogFactory.getLog(JdbcDurableSubDupTest.class);
+    final int prefetchVal = 150;
+    String url = "tcp://localhost:61616?jms.watchTopicAdvisories=false";
+    String queueName = "topicTest?consumer.prefetchSize=" + prefetchVal;
+    String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
+
+    String selector = "";
+    String clntVersion = "87";
+    String clntId = "timsClntId345" + clntVersion;
+    String subscriptionName = "subscriptionName-y" + clntVersion;
+    SimpleDateFormat dtf = new SimpleDateFormat("HH:mm:ss");
+
+    final int TO_RECEIVE = 5000;
+    BrokerService broker = null;
+    Vector<Throwable> exceptions = new Vector();
+    final int MAX_MESSAGES = 100000;
+    int[] dupChecker = new int[MAX_MESSAGES];
+
+    @Before
+    public void startBroker() throws Exception {
+        exceptions.clear();
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            dupChecker[i] = 0;
+        }
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setMaxAuditDepth(2000);
+        policyEntry.setMaxPageSize(150);
+        policyEntry.setPrioritizedMessages(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.addConnector("tcp://localhost:61616");
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Test
+    public void testNoDupsOnSlowConsumerReconnect() throws Exception {
+        JmsConsumerDup consumer = new JmsConsumerDup();
+        consumer.done.set(true);
+        consumer.run();
+
+        consumer.done.set(false);
+
+        LOG.info("serial production then consumption");
+        JmsProvider provider = new JmsProvider();
+        provider.run();
+
+        consumer.run();
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+        for (int i = 0; i < TO_RECEIVE; i++) {
+            assertTrue("got message " + i, dupChecker[i] == 1);
+        }
+    }
+
+    @Test
+    public void testNoDupsOnSlowConsumerLargePriorityGapReconnect() throws Exception {
+        JmsConsumerDup consumer = new JmsConsumerDup();
+        consumer.done.set(true);
+        consumer.run();
+
+        consumer.done.set(false);
+        JmsProvider provider = new JmsProvider();
+        provider.priorityModulator = 2500;
+        provider.run();
+
+        consumer.run();
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+        for (int i = 0; i < TO_RECEIVE; i++) {
+            assertTrue("got message " + i, dupChecker[i] == 1);
+        }
+        
+    }
+
+    class JmsConsumerDup implements MessageListener {
+        long count = 0;
+
+        AtomicBoolean done = new AtomicBoolean(false);
+
+        public void run() {
+            Connection connection = null;
+            Session session;
+            Topic topic;
+            ActiveMQConnectionFactory factory;
+            MessageConsumer consumer;
+
+            factory = new ActiveMQConnectionFactory(url);
+
+            try {
+                connection = factory.createConnection("MyUsername", "MyPassword");
+                connection.setClientID(clntId);
+                connection.start();
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                topic = session.createTopic(queueName);
+                consumer = session.createDurableSubscriber(topic, subscriptionName, selector,
false);
+                consumer.setMessageListener(this);
+                LOG.info("Waiting for messages...");
+
+                while (!done.get()) {
+                    TimeUnit.SECONDS.sleep(5);
+                    if (count == TO_RECEIVE || !exceptions.isEmpty()) {
+                        done.set(true);
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("caught", e);
+                exceptions.add(e);
+                throw new RuntimeException(e);
+            } finally {
+                if (connection != null) {
+                    try {
+                        LOG.info("consumer done (" + exceptions.isEmpty() + "), closing connection");
+                        connection.close();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        public void onMessage(Message message) {
+            ++count;
+
+            try {
+                Thread.sleep(0L);
+            } catch (InterruptedException e) {
+            }
+            ;
+
+            try {
+                TextMessage m = (TextMessage) message;
+
+                if (count%100 == 0) {
+                    LOG.info("Rcvd Msg #-" + count + " " + m.getText()
+                            + " Sent->" + dtf.format(new Date(m.getJMSTimestamp()))
+                            + " Recv->" + dtf.format(new Date())
+                            + " Expr->" + dtf.format(new Date(m.getJMSExpiration()))
+                            + ", mid: " + m.getJMSMessageID()
+                    );
+                }
+                int i = m.getIntProperty("SeqNo");
+
+                //check for duplicate messages
+                if (i < MAX_MESSAGES) {
+                    if (dupChecker[i] == 1) {
+                        LOG.error("Duplicate message received at count: " + count + ", id:
" + m.getJMSMessageID());
+                        exceptions.add(new RuntimeException("Got Duplicate at: " + m.getJMSMessageID()));
+
+                    } else {
+                        dupChecker[i] = 1;
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.error("caught ", e);
+                exceptions.add(e);
+            }
+        }
+    }
+
+
+    class JmsProvider implements Runnable {
+
+        int priorityModulator = 10;
+
+        public void run() {
+
+            Connection connection;
+            Session session;
+            Topic topic;
+
+            ActiveMQConnectionFactory factory;
+            MessageProducer messageProducer;
+            long timeToLive = 0l;
+
+            TextMessage message = null;
+
+            factory = new ActiveMQConnectionFactory(url);
+
+            try {
+                connection = factory.createConnection("MyUserName", "MyPassword");
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                topic = session.createTopic(queueName);
+                messageProducer = session.createProducer(topic);
+                messageProducer.setPriority(3);
+                messageProducer.setTimeToLive(timeToLive);
+                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                int msgSeqNo = 0;
+                int NUM_MSGS = 1000;
+                int NUM_GROUPS = TO_RECEIVE/NUM_MSGS;
+                for (int n = 0; n < NUM_GROUPS; n++) {
+
+                    message = session.createTextMessage();
+
+                    for (int i = 0; i < NUM_MSGS; i++) {
+                        int priority = 0;
+                        if (priorityModulator <= 10) {
+                            priority = msgSeqNo % priorityModulator;
+                        } else {
+                            priority = (msgSeqNo >= priorityModulator) ? 9 : 0;
+                        }
+                        message.setText(xmlMessage + msgSeqNo + "-" + priority);
+                        message.setJMSPriority(priority);
+                        message.setIntProperty("SeqNo", msgSeqNo);
+                        if (i > 0 && i%100 == 0) {
+                            LOG.info("Sending message: " + message.getText());
+                        }
+                        messageProducer.send(message, DeliveryMode.PERSISTENT, message.getJMSPriority(),
timeToLive);
+                        msgSeqNo++;
+                    }
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        exceptions.add(e);
+                    }
+                }
+
+            } catch (JMSException e) {
+                LOG.error("caught ", e);
+                e.printStackTrace();
+                exceptions.add(e);
+
+            }
+        }
+
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message