activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1076278 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/region/ broker/region/cursors/ broker/region/policy/
Date Wed, 02 Mar 2011 16:23:40 GMT
Author: gtully
Date: Wed Mar  2 16:23:40 2011
New Revision: 1076278

URL: http://svn.apache.org/viewvc?rev=1076278&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3196 - Speed up initial message delivery for offline
durable sub with keepDurableSubsActive=true and JDBC store
avoid stop/start of cursor when keepdurablesubsactive=true such that a valid message count
is retained by the cursor, independent of the store. This avoids unnecessary calls to the
underlyin gtore on activation and reactivation

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
Wed Mar  2 16:23:40 2011
@@ -262,4 +262,8 @@ public class ActiveMQMessageAuditNoSync 
         }
         return result;
     }
+
+    public void clear() {
+        map.clear();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Mar  2 16:23:40 2011
@@ -97,7 +97,15 @@ public class DurableTopicSubscription ex
             return;
         }
         destinations.put(destination.getActiveMQDestination(), destination);
-        if (destination.getMessageStore() != null) {
+
+        if (active.get() || keepDurableSubsActive) {
+            Topic topic = (Topic)destination;
+            topic.activate(context, this);
+            if (pending.isEmpty(topic)) {
+                topic.recoverRetroactiveMessages(context, this);
+            }
+            this.enqueueCounter+=pending.size();
+        } else if (destination.getMessageStore() != null) {
             TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
             try {
                 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
@@ -107,13 +115,6 @@ public class DurableTopicSubscription ex
                 throw jmsEx;
             }
         }
-        if (active.get() || keepDurableSubsActive) {
-            Topic topic = (Topic)destination;
-            topic.activate(context, this);
-            if (pending.isEmpty(topic)) {
-                topic.recoverRetroactiveMessages(context, this);
-            }
-        }
         dispatchPending();
     }
 
@@ -304,5 +305,9 @@ public class DurableTopicSubscription ex
     
     protected boolean isDropped(MessageReference node) {
        return false;
-     }
+    }
+
+    public boolean isKeepDurableSubsActive() {
+        return keepDurableSubsActive;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Wed Mar  2 16:23:40 2011
@@ -25,8 +25,8 @@ import org.apache.activemq.advisory.Advi
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.usage.SystemUsage;
@@ -49,7 +49,7 @@ public class StoreDurableSubscriberCurso
     private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
     private final PendingMessageCursor nonPersistent;
     private PendingMessageCursor currentCursor;
-    private final Subscription subscription;
+    private final DurableTopicSubscription subscription;
     private int cacheCurrentLowestPriority = UNKNOWN;
     private boolean immediatePriorityDispatch = true;
     /**
@@ -59,14 +59,14 @@ public class StoreDurableSubscriberCurso
      * @param maxBatchSize currently ignored
      * @param subscription  subscription for this cursor
      */
-    public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, Subscription subscription) {
+    public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, DurableTopicSubscription subscription) {
         super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
         this.subscription=subscription;
         this.clientId = clientId;
         this.subscriberName = subscriberName;
         if (broker.getBrokerService().isPersistent()) {
             this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
-        }else {
+        } else {
             this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
         }
         
@@ -93,9 +93,18 @@ public class StoreDurableSubscriberCurso
     @Override
     public synchronized void stop() throws Exception {
         if (isStarted()) {
-            super.stop();
-            for (PendingMessageCursor tsp : storePrefetches) {
-                tsp.stop();
+            if (subscription.isKeepDurableSubsActive()) {
+                super.gc();
+                super.getMessageAudit().clear();
+                for (PendingMessageCursor tsp : storePrefetches) {
+                    tsp.gc();
+                    tsp.getMessageAudit().clear();
+                }
+            } else {
+                super.stop();
+                for (PendingMessageCursor tsp : storePrefetches) {
+                    tsp.stop();
+                }
             }
         }
     }
@@ -217,6 +226,28 @@ public class StoreDurableSubscriberCurso
     }
 
     @Override
+    public boolean isTransient() {
+        return subscription.isKeepDurableSubsActive();
+    }
+
+    @Override
+    public void addMessageFirst(MessageReference node) throws Exception {
+        // for keep durable subs active, need to deal with redispatch
+        if (node != null) {
+            Message msg = node.getMessage();
+            if (!msg.isPersistent()) {
+                nonPersistent.addMessageFirst(node);
+            } else {
+                Destination dest = msg.getRegionDestination();
+                TopicStorePrefetch tsp = topics.get(dest);
+                if (tsp != null) {
+                    tsp.addMessageFirst(node);
+                }
+            }
+        }
+    }
+
+    @Override
     public synchronized void addRecoveredMessage(MessageReference node) throws Exception
{
         nonPersistent.addMessageLast(node);
     }
@@ -379,5 +410,6 @@ public class StoreDurableSubscriberCurso
 
     public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
         this.immediatePriorityDispatch = immediatePriorityDispatch;
-    }    
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
Wed Mar  2 16:23:40 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -42,7 +43,7 @@ public class FilePendingDurableSubscribe
      * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub) {
         return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
Wed Mar  2 16:23:40 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 
@@ -38,5 +39,5 @@ public interface PendingDurableSubscribe
      * @param sub 
      * @return the Pending Message cursor
      */
-    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub);
+    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Wed Mar  2 16:23:40 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -58,7 +59,7 @@ public class StorePendingDurableSubscrib
      * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub) {
         StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId,
name, maxBatchSize, sub);
         cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
         return cursor;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
Wed Mar  2 16:23:40 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -40,7 +41,7 @@ public class VMPendingDurableSubscriberM
      * @param sub 
      * @return the Pending Message cursor
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name,int maxBatchSize, Subscription sub) {
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name,int maxBatchSize, DurableTopicSubscription sub) {
         return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub));
     }
 }



Mime
View raw message