Author: dejanb
Date: Tue Jun 14 15:31:54 2011
New Revision: 1135649
URL: http://svn.apache.org/viewvc?rev=1135649&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3362 - periodic expiry of durable sub msgs
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Jun 14 15:31:54 2011
@@ -32,8 +32,11 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
@@ -93,6 +96,7 @@ public abstract class BaseDestination im
private boolean gcWithNetworkConsumers;
private long lastActiveTime=0l;
private boolean reduceMemoryFootprint = false;
+ protected final Scheduler scheduler;
/**
* @param brokerService
@@ -113,6 +117,7 @@ public abstract class BaseDestination im
this.memoryUsage = this.systemUsage.getMemoryUsage();
this.memoryUsage.setUsagePortion(1.0f);
this.regionBroker = brokerService.getRegionBroker();
+ this.scheduler = brokerService.getBroker().getScheduler();
}
/**
@@ -707,4 +712,12 @@ public abstract class BaseDestination im
}
return hasRegularConsumers;
}
+
+ protected ConnectionContext createConnectionContext() {
+ ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
+ answer.setBroker(this.broker);
+ answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
+ answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+ return answer;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Jun 14 15:31:54 2011
@@ -110,7 +110,7 @@ public class DurableTopicSubscription ex
try {
this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
} catch (IOException e) {
- JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from
store "+ e);
+ JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from
store "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
@@ -228,6 +228,10 @@ public class DurableTopicSubscription ex
super.dispatchPending();
}
}
+
+ public void removePending(MessageReference node) throws IOException {
+ pending.remove(node);
+ }
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized(pending) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jun 14 15:31:54 2011
@@ -126,7 +126,7 @@ public class Queue extends BaseDestinati
private final Object iteratingMutex = new Object() {
};
- private final Scheduler scheduler;
+
class TimeoutMessage implements Delayed {
@@ -210,7 +210,6 @@ public class Queue extends BaseDestinati
super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination);
- this.scheduler = brokerService.getBroker().getScheduler();
}
public List<Subscription> getConsumers() {
@@ -1615,14 +1614,6 @@ public class Queue extends BaseDestinati
}
}
- protected ConnectionContext createConnectionContext() {
- ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
- answer.setBroker(this.broker);
- answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
- answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- return answer;
- }
-
final void sendMessage(final Message msg) throws Exception {
messagesLock.writeLock().lock();
try{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Jun 14 15:31:54 2011
@@ -510,6 +510,10 @@ public class Topic extends BaseDestinati
memoryUsage.start();
}
+ if (getExpireMessagesPeriod() > 0) {
+ scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
+ }
+
}
public void stop() throws Exception {
@@ -523,14 +527,22 @@ public class Topic extends BaseDestinati
if (this.topicStore != null) {
this.topicStore.stop();
}
+
+ scheduler.cancel(expireMessagesTask);
}
public Message[] browse() {
+ final ConnectionContext connectionContext = createConnectionContext();
final Set<Message> result = new CopyOnWriteArraySet<Message>();
try {
if (topicStore != null) {
topicStore.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
+ if (message.isExpired()) {
+ for (Subscription sub : durableSubcribers.values()) {
+ messageExpired(connectionContext, sub, message);
+ }
+ }
result.add(message);
return true;
}
@@ -640,6 +652,12 @@ public class Topic extends BaseDestinati
}
}
+ private final Runnable expireMessagesTask = new Runnable() {
+ public void run() {
+ browse();
+ }
+ };
+
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference
reference) {
broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user
@@ -652,8 +670,11 @@ public class Topic extends BaseDestinati
ack.setDestination(destination);
ack.setMessageID(reference.getMessageId());
try {
+ if (subs instanceof DurableTopicSubscription) {
+ ((DurableTopicSubscription)subs).removePending(reference);
+ }
acknowledge(context, subs, ack, reference);
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Failed to remove expired Message from the store ", e);
}
}
@@ -663,4 +684,5 @@ public class Topic extends BaseDestinati
return LOG;
}
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Jun 14 15:31:54 2011
@@ -276,8 +276,8 @@ public class StoreDurableSubscriberCurso
@Override
public synchronized void remove(MessageReference node) {
- if (currentCursor != null) {
- currentCursor.remove(node);
+ for (PendingMessageCursor tsp : storePrefetches) {
+ tsp.remove(node);
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Tue Jun 14 15:31:54 2011
@@ -20,12 +20,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
import javax.management.ObjectName;
import junit.framework.Test;
@@ -41,6 +36,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -401,6 +397,71 @@ public class ExpiredMessagesWithNoConsum
}
+ public void testExpireMessagesForDurableSubscriber() throws Exception {
+ createBroker();
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connection = factory.createConnection();
+ connection.setClientID("myConnection");
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ connection.start();
+ Topic destination = session.createTopic("test");
+ producer = session.createProducer(destination);
+ final int ttl = 300;
+ producer.setTimeToLive(ttl);
+
+ final long sendCount = 10;
+
+ TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub");
+ sub.close();
+
+ for (int i=0; i < sendCount; i++) {
+ producer.send(session.createTextMessage("test"));
+ }
+
+ DestinationViewMBean view = createView((ActiveMQTopic)destination);
+
+
+ LOG.info("messages sent");
+ LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
+ assertEquals(0, view.getExpiredCount());
+ assertEquals(10, view.getEnqueueCount());
+
+
+ Thread.sleep(4000);
+
+ LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
+ assertEquals(10, view.getExpiredCount());
+ assertEquals(0, view.getEnqueueCount());
+
+
+ final AtomicLong received = new AtomicLong();
+ sub = session.createDurableSubscriber(destination, "mySub");
+ sub.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ received.incrementAndGet();
+ }
+ });
+
+ LOG.info("Waiting for messages to arrive");
+
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return received.get() >= sendCount;
+ }
+ }, 1000);
+
+ LOG.info("received=" + received.get());
+ LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
+
+ assertEquals(0, received.get());
+ assertEquals(10, view.getExpiredCount());
+ assertEquals(0, view.getEnqueueCount());
+
+ }
+
+
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
String domain = "org.apache.activemq";
|