Author: gtully
Date: Wed Mar 2 16:23:40 2011
New Revision: 1076278
URL: http://svn.apache.org/viewvc?rev=1076278&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3196 - Speed up initial message delivery for offline
durable sub with keepDurableSubsActive=true and JDBC store
avoid stop/start of cursor when keepdurablesubsactive=true such that a valid message count
is retained by the cursor, independent of the store. This avoids unnecessary calls to the
underlyin gtore on activation and reactivation
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
Wed Mar 2 16:23:40 2011
@@ -262,4 +262,8 @@ public class ActiveMQMessageAuditNoSync
}
return result;
}
+
+ public void clear() {
+ map.clear();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Mar 2 16:23:40 2011
@@ -97,7 +97,15 @@ public class DurableTopicSubscription ex
return;
}
destinations.put(destination.getActiveMQDestination(), destination);
- if (destination.getMessageStore() != null) {
+
+ if (active.get() || keepDurableSubsActive) {
+ Topic topic = (Topic)destination;
+ topic.activate(context, this);
+ if (pending.isEmpty(topic)) {
+ topic.recoverRetroactiveMessages(context, this);
+ }
+ this.enqueueCounter+=pending.size();
+ } else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
try {
this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
@@ -107,13 +115,6 @@ public class DurableTopicSubscription ex
throw jmsEx;
}
}
- if (active.get() || keepDurableSubsActive) {
- Topic topic = (Topic)destination;
- topic.activate(context, this);
- if (pending.isEmpty(topic)) {
- topic.recoverRetroactiveMessages(context, this);
- }
- }
dispatchPending();
}
@@ -304,5 +305,9 @@ public class DurableTopicSubscription ex
protected boolean isDropped(MessageReference node) {
return false;
- }
+ }
+
+ public boolean isKeepDurableSubsActive() {
+ return keepDurableSubsActive;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Wed Mar 2 16:23:40 2011
@@ -25,8 +25,8 @@ import org.apache.activemq.advisory.Advi
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
@@ -49,7 +49,7 @@ public class StoreDurableSubscriberCurso
private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
- private final Subscription subscription;
+ private final DurableTopicSubscription subscription;
private int cacheCurrentLowestPriority = UNKNOWN;
private boolean immediatePriorityDispatch = true;
/**
@@ -59,14 +59,14 @@ public class StoreDurableSubscriberCurso
* @param maxBatchSize currently ignored
* @param subscription subscription for this cursor
*/
- public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, Subscription subscription) {
+ public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int
maxBatchSize, DurableTopicSubscription subscription) {
super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
this.subscription=subscription;
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
- }else {
+ } else {
this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
@@ -93,9 +93,18 @@ public class StoreDurableSubscriberCurso
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
- super.stop();
- for (PendingMessageCursor tsp : storePrefetches) {
- tsp.stop();
+ if (subscription.isKeepDurableSubsActive()) {
+ super.gc();
+ super.getMessageAudit().clear();
+ for (PendingMessageCursor tsp : storePrefetches) {
+ tsp.gc();
+ tsp.getMessageAudit().clear();
+ }
+ } else {
+ super.stop();
+ for (PendingMessageCursor tsp : storePrefetches) {
+ tsp.stop();
+ }
}
}
}
@@ -217,6 +226,28 @@ public class StoreDurableSubscriberCurso
}
@Override
+ public boolean isTransient() {
+ return subscription.isKeepDurableSubsActive();
+ }
+
+ @Override
+ public void addMessageFirst(MessageReference node) throws Exception {
+ // for keep durable subs active, need to deal with redispatch
+ if (node != null) {
+ Message msg = node.getMessage();
+ if (!msg.isPersistent()) {
+ nonPersistent.addMessageFirst(node);
+ } else {
+ Destination dest = msg.getRegionDestination();
+ TopicStorePrefetch tsp = topics.get(dest);
+ if (tsp != null) {
+ tsp.addMessageFirst(node);
+ }
+ }
+ }
+ }
+
+ @Override
public synchronized void addRecoveredMessage(MessageReference node) throws Exception
{
nonPersistent.addMessageLast(node);
}
@@ -379,5 +410,6 @@ public class StoreDurableSubscriberCurso
public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
this.immediatePriorityDispatch = immediatePriorityDispatch;
- }
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
Wed Mar 2 16:23:40 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -42,7 +43,7 @@ public class FilePendingDurableSubscribe
* @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub) {
return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
Wed Mar 2 16:23:40 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -38,5 +39,5 @@ public interface PendingDurableSubscribe
* @param sub
* @return the Pending Message cursor
*/
- PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub);
+ PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Wed Mar 2 16:23:40 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -58,7 +59,7 @@ public class StorePendingDurableSubscrib
* @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, Subscription sub) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name, int maxBatchSize, DurableTopicSubscription sub) {
StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId,
name, maxBatchSize, sub);
cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
return cursor;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=1076278&r1=1076277&r2=1076278&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
Wed Mar 2 16:23:40 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -40,7 +41,7 @@ public class VMPendingDurableSubscriberM
* @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name,int maxBatchSize, Subscription sub) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId,
String name,int maxBatchSize, DurableTopicSubscription sub) {
return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub));
}
}
|