activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r957181 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Wed, 23 Jun 2010 13:06:20 GMT
Author: dejanb
Date: Wed Jun 23 13:06:19 2010
New Revision: 957181

URL: http://svn.apache.org/viewvc?rev=957181&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2695 - additional fix for amq store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=957181&r1=957180&r2=957181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
Wed Jun 23 13:06:19 2010
@@ -21,14 +21,19 @@ import java.io.IOException;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -190,7 +195,71 @@ public class AMQTopicMessageStore extend
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException
{
         flush();
-        return topicReferenceStore.getMessageCount(clientId, subscriberName);
+        SubscriptionInfo info = lookupSubscription(clientId, subscriberName);
+        try {
+            MessageCounter counter = new MessageCounter(info, this);
+            topicReferenceStore.recoverSubscription(clientId, subscriberName, counter);
+            return counter.count;
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+    
+    private class MessageCounter implements MessageRecoveryListener {
+        
+        int count = 0;
+        SubscriptionInfo info;
+        BooleanExpression selectorExpression;
+        TopicMessageStore store;
+        
+        public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception
{
+            this.info = info;
+            if (info != null) {
+                String selector = info.getSelector();
+                if (selector != null) {
+                    this.selectorExpression = SelectorParser.parse(selector);
+                }
+            }
+            this.store = store;
+        }
+
+        @Override
+        public boolean recoverMessageReference(MessageId ref) throws Exception {
+            if (selectorExpression != null) {
+                MessageEvaluationContext ctx = new MessageEvaluationContext();
+                ctx.setMessageReference(store.getMessage(ref));
+                if (selectorExpression.matches(ctx)) {
+                    count++;
+                }
+            } else {
+                count ++;
+            }
+            return true;
+        }
+
+        @Override
+        public boolean recoverMessage(Message message) throws Exception {
+            if (selectorExpression != null) {
+                MessageEvaluationContext ctx = new MessageEvaluationContext();
+                ctx.setMessageReference(store.getMessage(message.getMessageId()));
+                if (selectorExpression.matches(ctx)) {
+                    count++;
+                }
+            } else {
+                count++;
+            }
+            return true;
+        }
+
+        @Override
+        public boolean isDuplicate(MessageId ref) {
+            return false;
+        }
+
+        @Override
+        public boolean hasSpace() {
+            return true;
+        }
     }
 
     public void resetBatching(String clientId, String subscriptionName) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=957181&r1=957180&r2=957181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Jun 23 13:06:19 2010
@@ -328,7 +328,7 @@ public class KahaTopicReferenceStore ext
         if (container != null) {
             for (Iterator i = container.iterator(); i.hasNext();) {
                 ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
-                ReferenceRecord msg = messageContainer.get(ref.getMessageEntry());
+                ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry());
                 if (msg != null) {
                     if (!recoverReference(listener, msg)) {
                         break;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=957181&r1=957180&r2=957181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Wed Jun 23 13:06:19 2010
@@ -672,26 +672,26 @@ public class KahaDBStore extends Message
                         cursorPos += 1;
 
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
cursorPos); iterator
-                                .hasNext();) {
-                            Entry<Long, MessageKeys> entry = iterator.next();
+                        try {
                             String selector = info.getSelector();
+                            BooleanExpression selectorExpression = null;
                             if (selector != null) {
-                                try {
-                                    if (selector != null) { 
-                                        BooleanExpression selectorExpression = SelectorParser.parse(selector);
+                                selectorExpression = SelectorParser.parse(selector);
+                            }
+                            for (Iterator<Entry<Long, MessageKeys>> iterator
= sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                                Entry<Long, MessageKeys> entry = iterator.next();
+                                if (selectorExpression != null) { 
                                         MessageEvaluationContext ctx = new MessageEvaluationContext();
                                         ctx.setMessageReference(loadMessage(entry.getValue().location));
                                         if (selectorExpression.matches(ctx)) {
                                             counter++;
                                         }
-                                    }
-                                } catch (Exception e) {
-                                    throw IOExceptionSupport.create(e);
+                                } else {
+                                    counter++;
                                 }
-                            } else {
-                                counter++;
                             }
+                        } catch (Exception e) {
+                            throw IOExceptionSupport.create(e);
                         }
                         return counter;
                     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=957181&r1=957180&r2=957181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
Wed Jun 23 13:06:19 2010
@@ -130,6 +130,7 @@ abstract public class DurableSubscriptio
         if (deleteMessages) {
             broker.setDeleteAllMessagesOnStartup(true);
         }
+        broker.setPersistenceAdapter(createPersistenceAdapter());
         broker.start();
     }
 



Mime
View raw message