activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1135649 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/usecases/
Date Tue, 14 Jun 2011 15:31:54 GMT
Author: dejanb
Date: Tue Jun 14 15:31:54 2011
New Revision: 1135649

URL: http://svn.apache.org/viewvc?rev=1135649&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3362 - periodic expiry of durable sub msgs

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Jun 14 15:31:54 2011
@@ -32,8 +32,11 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
@@ -93,6 +96,7 @@ public abstract class BaseDestination im
     private boolean gcWithNetworkConsumers;
     private long lastActiveTime=0l;
     private boolean reduceMemoryFootprint = false;
+    protected final Scheduler scheduler;
 
     /**
      * @param brokerService
@@ -113,6 +117,7 @@ public abstract class BaseDestination im
         this.memoryUsage = this.systemUsage.getMemoryUsage();
         this.memoryUsage.setUsagePortion(1.0f);
         this.regionBroker = brokerService.getRegionBroker();
+        this.scheduler = brokerService.getBroker().getScheduler();
     }
 
     /**
@@ -707,4 +712,12 @@ public abstract class BaseDestination im
         }
         return hasRegularConsumers;
     }
+
+    protected ConnectionContext createConnectionContext() {
+        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
+        answer.setBroker(this.broker);
+        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
+        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+        return answer;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Tue Jun 14 15:31:54 2011
@@ -110,7 +110,7 @@ public class DurableTopicSubscription ex
             try {
                 this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
             } catch (IOException e) {
-                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from
store "+ e);
+                JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from
store "+ e);
                 jmsEx.setLinkedException(e);
                 throw jmsEx;
             }
@@ -228,6 +228,10 @@ public class DurableTopicSubscription ex
             super.dispatchPending();
         }
     }
+
+    public void removePending(MessageReference node) throws IOException {
+        pending.remove(node);
+    }
     
     protected void doAddRecoveredMessage(MessageReference message) throws Exception {
         synchronized(pending) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jun 14 15:31:54 2011
@@ -126,7 +126,7 @@ public class Queue extends BaseDestinati
 
     private final Object iteratingMutex = new Object() {
     };
-    private final Scheduler scheduler;
+
 
     class TimeoutMessage implements Delayed {
 
@@ -210,7 +210,6 @@ public class Queue extends BaseDestinati
         super(brokerService, store, destination, parentStats);
         this.taskFactory = taskFactory;
         this.dispatchSelector = new QueueDispatchSelector(destination);
-        this.scheduler = brokerService.getBroker().getScheduler();
     }
 
     public List<Subscription> getConsumers() {
@@ -1615,14 +1614,6 @@ public class Queue extends BaseDestinati
         }
     }
 
-    protected ConnectionContext createConnectionContext() {
-        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
-        answer.setBroker(this.broker);
-        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
-        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-        return answer;
-    }
-
     final void sendMessage(final Message msg) throws Exception {
         messagesLock.writeLock().lock();
         try{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Jun 14 15:31:54 2011
@@ -510,6 +510,10 @@ public class Topic extends BaseDestinati
             memoryUsage.start();
         }
 
+        if (getExpireMessagesPeriod() > 0) {
+            scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
+        }
+
     }
 
     public void stop() throws Exception {
@@ -523,14 +527,22 @@ public class Topic extends BaseDestinati
         if (this.topicStore != null) {
             this.topicStore.stop();
         }
+
+         scheduler.cancel(expireMessagesTask);
     }
 
     public Message[] browse() {
+        final ConnectionContext connectionContext = createConnectionContext();
         final Set<Message> result = new CopyOnWriteArraySet<Message>();
         try {
             if (topicStore != null) {
                 topicStore.recover(new MessageRecoveryListener() {
                     public boolean recoverMessage(Message message) throws Exception {
+                        if (message.isExpired()) {
+                            for (Subscription sub : durableSubcribers.values()) {
+                                messageExpired(connectionContext, sub, message);
+                            }
+                        }
                         result.add(message);
                         return true;
                     }
@@ -640,6 +652,12 @@ public class Topic extends BaseDestinati
         }
     }
 
+    private final Runnable expireMessagesTask = new Runnable() {
+        public void run() {
+            browse();
+        }
+    };
+
     public void messageExpired(ConnectionContext context, Subscription subs, MessageReference
reference) {
         broker.messageExpired(context, reference, subs);
         // AMQ-2586: Better to leave this stat at zero than to give the user
@@ -652,8 +670,11 @@ public class Topic extends BaseDestinati
         ack.setDestination(destination);
         ack.setMessageID(reference.getMessageId());
         try {
+            if (subs instanceof DurableTopicSubscription) {
+                ((DurableTopicSubscription)subs).removePending(reference);
+            }
             acknowledge(context, subs, ack, reference);
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Failed to remove expired Message from the store ", e);
         }
     }
@@ -663,4 +684,5 @@ public class Topic extends BaseDestinati
         return LOG;
     }
 
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Tue Jun 14 15:31:54 2011
@@ -276,8 +276,8 @@ public class StoreDurableSubscriberCurso
 
     @Override
     public synchronized void remove(MessageReference node) {
-        if (currentCursor != null) {
-            currentCursor.remove(node);
+        for (PendingMessageCursor tsp : storePrefetches) {
+            tsp.remove(node);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1135649&r1=1135648&r2=1135649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Tue Jun 14 15:31:54 2011
@@ -20,12 +20,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import javax.jms.*;
 import javax.management.ObjectName;
 
 import junit.framework.Test;
@@ -41,6 +36,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -401,6 +397,71 @@ public class ExpiredMessagesWithNoConsum
     }
 
 
+    public void testExpireMessagesForDurableSubscriber() throws Exception {
+        createBroker();
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        connection = factory.createConnection();
+        connection.setClientID("myConnection");
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        connection.start();
+        Topic destination = session.createTopic("test");
+        producer = session.createProducer(destination);
+        final int ttl = 300;
+        producer.setTimeToLive(ttl);
+
+        final long sendCount = 10;
+
+        TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub");
+        sub.close();
+
+        for (int i=0; i < sendCount; i++) {
+            producer.send(session.createTextMessage("test"));
+        }
+
+        DestinationViewMBean view = createView((ActiveMQTopic)destination);
+
+
+        LOG.info("messages sent");
+        LOG.info("expired=" + view.getExpiredCount() + " " +  view.getEnqueueCount());
+        assertEquals(0, view.getExpiredCount());
+        assertEquals(10, view.getEnqueueCount());
+
+
+        Thread.sleep(4000);
+
+        LOG.info("expired=" + view.getExpiredCount() + " " +  view.getEnqueueCount());
+        assertEquals(10, view.getExpiredCount());
+        assertEquals(0, view.getEnqueueCount());
+
+
+        final AtomicLong received = new AtomicLong();
+        sub = session.createDurableSubscriber(destination, "mySub");
+        sub.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                received.incrementAndGet();
+            }
+        });
+
+        LOG.info("Waiting for messages to arrive");
+
+
+        Wait.waitFor(new Wait.Condition() {
+             public boolean isSatisified() throws Exception {
+                 return received.get() >= sendCount;
+             }
+         }, 1000);
+
+        LOG.info("received=" + received.get());
+        LOG.info("expired=" + view.getExpiredCount() + " " +  view.getEnqueueCount());
+
+        assertEquals(0, received.get());
+        assertEquals(10, view.getExpiredCount());
+        assertEquals(0, view.getEnqueueCount());
+
+    }
+
+
 
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
         String domain = "org.apache.activemq";



Mime
View raw message