activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r581053 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/kahada...
Date Mon, 01 Oct 2007 20:02:24 GMT
Author: chirino
Date: Mon Oct  1 13:02:18 2007
New Revision: 581053

URL: http://svn.apache.org/viewvc?rev=581053&view=rev
Log:
Fix for AMQ-1095:
 - Added contributed test cases
 - We now filter out non-matching messages as they are loaded into the TopicStorePrefetch
 - Changed the TopicStorePrefetch and StoreDurableSubscriberCursor so that they don't depend
   on the pending message counter since some stores cannot give an accurate count for it.


Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java   (with props)
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Oct  1 13:02:18 2007
@@ -48,7 +48,8 @@
 
     public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws InvalidSelectorException {
-        super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize()));
+        super(broker, context, info);
+        this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
         this.usageManager = usageManager;
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct  1 13:02:18 2007
@@ -410,6 +410,7 @@
         if (message == null) {
             return false;
         }
+                
         // Make sure we can dispatch a message.
         if (canDispatch(node) && !isSlave()) {
             MessageDispatch md = createMessageDispatch(node, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct  1 13:02:18 2007
@@ -26,6 +26,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.Store;
@@ -42,7 +43,6 @@
 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
 
     private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
-    private int pendingCount;
     private String clientId;
     private String subscriberName;
     private Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
@@ -50,6 +50,7 @@
     private boolean started;
     private PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
+    private final Subscription subscription;
 
     /**
      * @param topic
@@ -57,9 +58,10 @@
      * @param subscriberName
      * @throws IOException
      */
-    public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize) {
+    public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) {
         this.clientId = clientId;
         this.subscriberName = subscriberName;
+        this.subscription = subscription;
         this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
         storePrefetches.add(nonPersistent);
     }
@@ -69,7 +71,6 @@
             started = true;
             for (PendingMessageCursor tsp : storePrefetches) {
                 tsp.start();
-                pendingCount += tsp.size();
             }
         }
     }
@@ -80,8 +81,6 @@
             for (PendingMessageCursor tsp : storePrefetches) {
                 tsp.stop();
             }
-
-            pendingCount = 0;
         }
     }
 
@@ -94,14 +93,13 @@
      */
     public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
-            TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName);
+            TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription);
             tsp.setMaxBatchSize(getMaxBatchSize());
             tsp.setSystemUsage(systemUsage);
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
             if (started) {
                 tsp.start();
-                pendingCount += tsp.size();
             }
         }
     }
@@ -124,14 +122,18 @@
      * @return true if there are no pending messages
      */
     public synchronized boolean isEmpty() {
-        return pendingCount <= 0;
+        for (PendingMessageCursor tsp : storePrefetches) {
+            if( !tsp.isEmpty() )
+                return false;
+        }
+        return true;
     }
 
     public boolean isEmpty(Destination destination) {
         boolean result = true;
         TopicStorePrefetch tsp = topics.get(destination);
         if (tsp != null) {
-            result = tsp.size() <= 0;
+            result = tsp.isEmpty();
         }
         return result;
     }
@@ -151,7 +153,6 @@
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
-                pendingCount++;
                 if (!msg.isPersistent()) {
                     nonPersistent.addMessageLast(node);
                 }
@@ -171,7 +172,6 @@
     }
 
     public synchronized void clear() {
-        pendingCount = 0;
         nonPersistent.clear();
         for (PendingMessageCursor tsp : storePrefetches) {
             tsp.clear();
@@ -179,7 +179,7 @@
     }
 
     public synchronized boolean hasNext() {
-        boolean result = pendingCount > 0;
+        boolean result = true;
         if (result) {
             try {
                 currentCursor = getNextCursor();
@@ -201,14 +201,12 @@
         if (currentCursor != null) {
             currentCursor.remove();
         }
-        pendingCount--;
     }
 
     public synchronized void remove(MessageReference node) {
         if (currentCursor != null) {
             currentCursor.remove(node);
         }
-        pendingCount--;
     }
 
     public synchronized void reset() {
@@ -226,6 +224,10 @@
     }
 
     public int size() {
+        int pendingCount=0;
+        for (PendingMessageCursor tsp : storePrefetches) {
+            pendingCount += tsp.size();
+        }
         return pendingCount;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct  1 13:02:18 2007
@@ -17,12 +17,15 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.LinkedList;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.commons.logging.Log;
@@ -44,16 +47,19 @@
     private Destination regionDestination;
     private MessageId firstMessageId;
     private MessageId lastMessageId;
-    private int pendingCount;
+    private boolean batchResetNeeded = true;
+    private boolean storeMayHaveMoreMessages = true;
     private boolean started;
+    private final Subscription subscription;
 
     /**
      * @param topic
      * @param clientId
      * @param subscriberName
      */
-    public TopicStorePrefetch(Topic topic, String clientId, String subscriberName) {
+    public TopicStorePrefetch(Topic topic, String clientId, String subscriberName, Subscription subscription) {
         this.regionDestination = topic;
+        this.subscription = subscription;
         this.store = (TopicMessageStore)topic.getMessageStore();
         this.clientId = clientId;
         this.subscriberName = subscriberName;
@@ -62,13 +68,7 @@
     public synchronized void start() {
         if (!started) {
             started = true;
-            pendingCount = getStoreSize();
-            try {
-                fillBatch();
-            } catch (Exception e) {
-                LOG.error("Failed to fill batch", e);
-                throw new RuntimeException(e);
-            }
+            safeFillBatch();
         }
     }
 
@@ -84,11 +84,13 @@
      * @return true if there are no pendingCount messages
      */
     public synchronized boolean isEmpty() {
-        return pendingCount <= 0;
+        safeFillBatch();
+        return batchList.isEmpty();
     }
 
     public synchronized int size() {
-        return getPendingCount();
+        safeFillBatch();
+        return batchList.size();
     }
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
@@ -98,7 +100,7 @@
             }
             lastMessageId = node.getMessageId();
             node.decrementReferenceCount();
-            pendingCount++;
+            storeMayHaveMoreMessages=true;
         }
     }
 
@@ -108,20 +110,18 @@
                 firstMessageId = node.getMessageId();
             }
             node.decrementReferenceCount();
-            pendingCount++;
+            storeMayHaveMoreMessages=true;
         }
     }
 
     public synchronized void remove() {
-        pendingCount--;
     }
 
     public synchronized void remove(MessageReference node) {
-        pendingCount--;
     }
 
     public synchronized void clear() {
-        pendingCount = 0;
+        gc();
     }
 
     public synchronized boolean hasNext() {
@@ -130,27 +130,17 @@
 
     public synchronized MessageReference next() {
         Message result = null;
-        if (!isEmpty()) {
-            if (batchList.isEmpty()) {
-                try {
-                    fillBatch();
-                } catch (final Exception e) {
-                    LOG.error("Failed to fill batch", e);
-                    throw new RuntimeException(e);
-                }
-                if (batchList.isEmpty()) {
-                    return null;
-                }
-            }
-            if (!batchList.isEmpty()) {
-                result = batchList.removeFirst();
-                if (lastMessageId != null) {
-                    if (result.getMessageId().equals(lastMessageId)) {
-                        // pendingCount=0;
-                    }
+        safeFillBatch();
+        if (batchList.isEmpty()) {
+            return null;
+        } else {
+            result = batchList.removeFirst();
+            if (lastMessageId != null) {
+                if (result.getMessageId().equals(lastMessageId)) {
+                    // pendingCount=0;
                 }
-                result.setRegionDestination(regionDestination);
             }
+            result.setRegionDestination(regionDestination);
         }
         return result;
     }
@@ -163,12 +153,16 @@
     }
 
     public synchronized boolean recoverMessage(Message message) throws Exception {
-        message.setRegionDestination(regionDestination);
-        // only increment if count is zero (could have been cached)
-        if (message.getReferenceCount() == 0) {
-            message.incrementReferenceCount();
+        MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
+        messageEvaluationContext.setMessageReference(message);
+        if( subscription.matches(message, messageEvaluationContext) ) {
+            message.setRegionDestination(regionDestination);
+            // only increment if count is zero (could have been cached)
+            if (message.getReferenceCount() == 0) {
+                message.incrementReferenceCount();
+            }
+            batchList.addLast(message);
         }
-        batchList.addLast(message);
         return true;
     }
 
@@ -178,38 +172,43 @@
     }
 
     // implementation
+    protected void safeFillBatch() {
+        try {
+            fillBatch();
+        } catch (Exception e) {
+            LOG.error("Failed to fill batch", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     protected synchronized void fillBatch() throws Exception {
-        if (!isEmpty()) {
+        if( batchResetNeeded ) {
+            store.resetBatching(clientId, subscriberName);
+            batchResetNeeded=false;
+            storeMayHaveMoreMessages=true;
+        }
+        
+        while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
             store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
-            if (firstMessageId != null) {
-                int pos = 0;
-                for (Message msg : batchList) {
-                    if (msg.getMessageId().equals(firstMessageId)) {
-                        firstMessageId = null;
-                        break;
-                    }
-                    pos++;
-                }
-                if (pos > 0) {
-                    for (int i = 0; i < pos && !batchList.isEmpty(); i++) {
-                        batchList.removeFirst();
-                    }
-                    if (batchList.isEmpty()) {
-                        LOG.debug("Refilling batch - haven't got past first message = " + firstMessageId);
-                        fillBatch();
+            if( batchList.isEmpty() ) {
+                storeMayHaveMoreMessages = false;
+            } else {
+                if (firstMessageId != null) {
+                    int pos = 0;
+                    for (Iterator<Message> iter = batchList.iterator(); iter.hasNext();) {
+                        Message msg = iter.next();
+                        if (msg.getMessageId().equals(firstMessageId)) {
+                            firstMessageId = null;
+                            break;
+                        } else {
+                            iter.remove();
+                        }
                     }
                 }
             }
         }
     }
 
-    protected synchronized int getPendingCount() {
-        if (pendingCount <= 0) {
-            pendingCount = getStoreSize();
-        }
-        return pendingCount;
-    }
-
     protected synchronized int getStoreSize() {
         try {
             return store.getMessageCount(clientId, subscriberName);
@@ -224,6 +223,7 @@
             msg.decrementReferenceCount();
         }
         batchList.clear();
+        batchResetNeeded = true;
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Mon Oct  1 13:02:18 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.kaha.Store;
@@ -39,7 +40,7 @@
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
         return new FilePendingMessageCursor(name, tmpStorage);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java Mon Oct  1 13:02:18 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.kaha.Store;
 
@@ -36,5 +37,5 @@
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize);
+    PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Oct  1 13:02:18 2007
@@ -116,7 +116,7 @@
         String subName = sub.getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
         if (pendingDurableSubscriberPolicy != null) {
-            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch);
+            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub);
             cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Mon Oct  1 13:02:18 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.kaha.Store;
@@ -40,7 +41,7 @@
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) {
-        return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize);
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
+        return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Mon Oct  1 13:02:18 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.kaha.Store;
@@ -38,7 +39,7 @@
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
         return new VMPendingMessageCursor();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Oct  1 13:02:18 2007
@@ -73,11 +73,8 @@
     }
 
     protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
-        if (listener.hasSpace()) {
-            listener.recoverMessage(msg);
-            return true;
-        }
-        return false;
+        listener.recoverMessage(msg);
+        return listener.hasSpace();
     }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Oct  1 13:02:18 2007
@@ -64,11 +64,8 @@
 
     protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
         throws Exception {
-        if (listener.hasSpace()) {
-            listener.recoverMessageReference(new MessageId(record.getMessageId()));
-            return true;
-        }
-        return false;
+        listener.recoverMessageReference(new MessageId(record.getMessageId()));
+        return listener.hasSpace();
     }
 
     public synchronized void recover(MessageRecoveryListener listener) throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Mon Oct  1 13:02:18 2007
@@ -284,7 +284,7 @@
         // The we should get the messages.
         for (int i = 0; i < 4; i++) {
             Message m2 = receiveMessage(connection2);
-            assertNotNull(m2);
+            assertNotNull("Did not get message "+i, m2);
         }
         assertNoMessagesLeft(connection2);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Mon Oct  1 13:02:18 2007
@@ -77,7 +77,8 @@
         consumer = getConsumer(consumerConnection);
         List<Message> consumerList = new ArrayList<Message>();
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message msg = consumer.receive();
+            Message msg = consumer.receive(1000*5);
+            assertNotNull("Message "+i+" was missing.", msg);
             consumerList.add(msg);
         }
         assertEquals(senderList, consumerList);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java?rev=581053&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java Mon Oct  1 13:02:18 2007
@@ -0,0 +1,163 @@
+/* ====================================================================
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================== */
+
+package org.apache.activemq.bugs.amq1095;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * <p>
+ * Common functionality for ActiveMQ test cases.
+ * </p>
+ * 
+ * @author Rainer Klute <a
+ *         href="mailto:rainer.klute@dp-itsolutions.de">&lt;rainer.klute@dp-itsolutions.de&gt;</a>
+ * @since 2007-08-10
+ * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $
+ */
+public class ActiveMQTestCase extends TestCase
+{
+    private Context context;
+    private BrokerService broker;
+    protected Connection connection;
+    protected Destination destination;
+    private List<MessageConsumer> consumersToEmpty = new LinkedList<MessageConsumer>();
+    protected final long RECEIVE_TIMEOUT = 500;
+
+
+    /** <p>Constructor</p> */
+    public ActiveMQTestCase()
+    {}
+    
+    /** <p>Constructor</p> 
+     * @param name the test case's name
+     */
+    public ActiveMQTestCase(final String name)
+    {
+        super(name);
+    }
+
+    /**
+     * <p>Sets up the JUnit testing environment.
+     */
+    protected void setUp()
+    {
+        URI uri;
+        try
+        {
+            /* Copy all system properties starting with "java.naming." to the initial context. */
+            final Properties systemProperties = System.getProperties();
+            final Properties jndiProperties = new Properties();
+            for (final Iterator i = systemProperties.keySet().iterator(); i.hasNext();)
+            {
+                final String key = (String) i.next();
+                if (key.startsWith("java.naming.") || key.startsWith("topic.") ||
+                    key.startsWith("queue."))
+                {
+                    final String value = (String) systemProperties.get(key);
+                    jndiProperties.put(key, value);
+                }
+            }
+            context = new InitialContext(jndiProperties); 
+            uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml");
+            broker = BrokerFactory.createBroker(uri);
+            broker.start();
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException(ex);
+        }
+
+        final ConnectionFactory connectionFactory;
+        try
+        {
+            /* Lookup the connection factory. */
+            connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory");
+
+            destination = new ActiveMQTopic("TestTopic");
+
+            /* Create a connection: */
+            connection = connectionFactory.createConnection();
+            connection.setClientID("sampleClientID");
+        }
+        catch (JMSException ex1)
+        {
+            ex1.printStackTrace();
+            Assert.fail(ex1.toString());
+        }
+        catch (NamingException ex2) {
+            ex2.printStackTrace();
+            Assert.fail(ex2.toString());
+        }
+        catch (Throwable ex3) {
+            ex3.printStackTrace();
+            Assert.fail(ex3.toString());
+        }
+    }
+
+
+    /**
+     * <p>
+     * Tear down the testing environment by receiving any messages that might be
+     * left in the topic after a failure and shutting down the broker properly.
+     * This is quite important for subsequent test cases that assume the topic
+     * to be empty.
+     * </p>
+     */
+    protected void tearDown() throws Exception {
+        TextMessage msg;
+        for (final Iterator i = consumersToEmpty.iterator(); i.hasNext();)
+        {
+            final MessageConsumer consumer = (MessageConsumer) i.next();
+            if (consumer != null)
+                do
+                    msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+                while (msg != null);
+        }
+        if (connection != null) {
+            connection.stop();
+        }
+        broker.stop();
+    }
+
+    protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer)
+    {
+        consumersToEmpty.add(consumer);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java?rev=581053&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java Mon Oct  1 13:02:18 2007
@@ -0,0 +1,230 @@
+/* ====================================================================
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================== */
+
+package org.apache.activemq.bugs.amq1095;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+
+/**
+ * <p>
+ * Test cases for various ActiveMQ functionalities.
+ * </p>
+ * 
+ * <ul>
+ * <li>
+ * <p>
+ * Durable subscriptions are used.
+ * </p>
+ * </li>
+ * <li>
+ * <p>
+ * The Kaha persistence manager is used.
+ * </p>
+ * </li>
+ * <li>
+ * <p>
+ * An already existing Kaha directory is used. Everything runs fine if the
+ * ActiveMQ broker creates a new Kaha directory.
+ * </p>
+ * </li>
+ * </ul>
+ * 
+ * @author Rainer Klute <a
+ *         href="mailto:rainer.klute@dp-itsolutions.de">&lt;rainer.klute@dp-itsolutions.de&gt;</a>
+ * @since 2007-08-09
+ * @version $Id: MessageSelectorTest.java 12 2007-08-14 12:02:02Z rke $
+ */
+public class MessageSelectorTest extends ActiveMQTestCase {
+
+    private MessageConsumer consumer1;
+    private MessageConsumer consumer2;
+
+    /** <p>Constructor</p> */
+    public MessageSelectorTest()
+    {}
+    
+    /** <p>Constructor</p>
+     * @param name the test case's name
+     */
+    public MessageSelectorTest(final String name)
+    {
+        super(name);
+    }
+
+    /**
+     * <p>
+     * Tests whether message selectors work for durable subscribers.
+     * </p>
+     */
+    public void testMessageSelectorForDurableSubscribersRunA()
+    {
+        runMessageSelectorTest(true);
+    }
+
+    /**
+     * <p>
+     * Tests whether message selectors work for durable subscribers.
+     * </p>
+     */
+    public void testMessageSelectorForDurableSubscribersRunB()
+    {
+        runMessageSelectorTest(true);
+    }
+
+    /**
+     * <p>
+     * Tests whether message selectors work for non-durable subscribers.
+     * </p>
+     */
+    public void testMessageSelectorForNonDurableSubscribers()
+    {
+        runMessageSelectorTest(false);
+    }
+    
+    /**
+     * <p>
+     * Tests whether message selectors work. This is done by sending two
+     * messages to a topic. Both have an int property with different values. Two
+     * subscribers use message selectors to receive the messages. Each one
+     * should receive exactly one of the messages.
+     * </p>
+     */
+    private void runMessageSelectorTest(final boolean isDurableSubscriber)
+    {
+        try
+        {
+            final String PROPERTY_CONSUMER = "consumer";
+            final String CONSUMER_1 = "Consumer 1";
+            final String CONSUMER_2 = "Consumer 2";
+            final String MESSAGE_1 = "Message to " + CONSUMER_1;
+            final String MESSAGE_2 = "Message to " + CONSUMER_2;
+
+            assertNotNull(connection);
+            assertNotNull(destination);
+
+            final Session producingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = producingSession.createProducer(destination);
+
+            final Session consumingSession1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Session consumingSession2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            if (isDurableSubscriber)
+            {
+                consumer1 = consumingSession1.createDurableSubscriber
+                    ((Topic) destination, CONSUMER_1, PROPERTY_CONSUMER + " = 1", false);
+                consumer2 = consumingSession2.createDurableSubscriber
+                    ((Topic) destination, CONSUMER_2, PROPERTY_CONSUMER + " = 2", false);
+            }
+            else
+            {
+                consumer1 = consumingSession1.createConsumer(destination, PROPERTY_CONSUMER + " = 1");
+                consumer2 = consumingSession2.createConsumer(destination, PROPERTY_CONSUMER + " = 2");
+            }
+            registerToBeEmptiedOnShutdown(consumer1);
+            registerToBeEmptiedOnShutdown(consumer2);
+
+            connection.start();
+
+            TextMessage msg1;
+            TextMessage msg2;
+            int propertyValue;
+            String contents;
+
+            /* Try to receive any messages from the consumers. There shouldn't be any yet. */
+            msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+            if (msg1 != null)
+            {
+                final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run.");
+                propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
+                contents = msg1.getText();
+                if (propertyValue != 1) // Is the property value as expected?
+                {
+                    msg.append(" That message does not match the consumer's message selector.");
+                    fail(msg.toString());
+                }
+                assertEquals(1, propertyValue);
+                assertEquals(MESSAGE_1, contents);
+            }
+            msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+            if (msg2 != null)
+            {
+                final StringBuffer msg = new StringBuffer("The consumer read a message that was left over from a former ActiveMQ broker run.");
+                propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
+                contents = msg2.getText();
+                if (propertyValue != 2) // Is the property value as expected?
+                {
+                    msg.append(" That message does not match the consumer's message selector.");
+                    fail(msg.toString());
+                }
+                assertEquals(2, propertyValue);
+                assertEquals(MESSAGE_2, contents);
+            }
+
+            /* Send two messages. Each is targeted at one of the consumers. */
+            TextMessage msg;
+            msg = producingSession.createTextMessage();
+            msg.setText(MESSAGE_1);
+            msg.setIntProperty(PROPERTY_CONSUMER, 1);
+            producer.send(msg);
+
+            msg = producingSession.createTextMessage();
+            msg.setText(MESSAGE_2);
+            msg.setIntProperty(PROPERTY_CONSUMER, 2);
+            producer.send(msg);
+
+            /* Receive the messages that have just been sent. */
+
+            /* Use consumer 1 to receive one of the messages. The receive()
+             * method is called twice to make sure there is nothing else in
+             * stock for this consumer. */
+            msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+            assertNotNull(msg1);
+            propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
+            contents = msg1.getText();
+            assertEquals(1, propertyValue);
+            assertEquals(MESSAGE_1, contents);
+            msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+            assertNull(msg1);
+
+            /* Use consumer 2 to receive the other message. The receive()
+             * method is called twice to make sure there is nothing else in
+             * stock for this consumer. */
+            msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+            assertNotNull(msg2);
+            propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
+            contents = msg2.getText();
+            assertEquals(2, propertyValue);
+            assertEquals(MESSAGE_2, contents);
+            msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+            assertNull(msg2);
+        }
+        catch (JMSException ex)
+        {
+            ex.printStackTrace();
+            Assert.fail();
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml?rev=581053&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml Mon Oct  1 13:02:18 2007
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans>
+
+  <broker brokerName="localhost" xmlns="http://activemq.org/config/1.0" persistent="true">
+  
+    <persistenceAdapter>
+      <kahaPersistenceAdapter directory="file:kahadir" maxDataFileLength="200000"/>
+    </persistenceAdapter>
+
+    <destinations>
+      <queue physicalName="unused"/>
+      <topic physicalName="activemq.TestTopic"/>
+    </destinations>
+
+  </broker>
+
+</beans>
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message