activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1181112 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Mon, 10 Oct 2011 18:21:46 GMT
Author: gtully
Date: Mon Oct 10 18:21:46 2011
New Revision: 1181112

URL: http://svn.apache.org/viewvc?rev=1181112&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3532 - expiry of offline durable subscription on
activation can lead do duplicate expiry processing and negative pending cursor size, resolve
duplicate cursor remove and contention with dispatch, additional test

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Oct 10 18:21:46 2011
@@ -20,11 +20,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.Future;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -66,7 +64,7 @@ public class Topic extends BaseDestinati
     protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
     private final TopicMessageStore topicStore;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
-    protected final Valve dispatchValve = new Valve(true);
+    private final Valve dispatchValve = new Valve(true);
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers
= new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@@ -541,15 +539,11 @@ public class Topic extends BaseDestinati
     private void doBrowse(final List<Message> browseList, final int max) {
         try {
             if (topicStore != null) {
-                final ConnectionContext connectionContext = createConnectionContext();
+                final List<Message> toExpire = new ArrayList<Message>();
                 topicStore.recover(new MessageRecoveryListener() {
                     public boolean recoverMessage(Message message) throws Exception {
                         if (message.isExpired()) {
-                            for (DurableTopicSubscription sub : durableSubcribers.values())
{
-                                if (!sub.isActive()) {
-                                    messageExpired(connectionContext, sub, message);
-                                }
-                            }
+                            toExpire.add(message);
                         }
                         browseList.add(message);
                         return true;
@@ -567,6 +561,14 @@ public class Topic extends BaseDestinati
                         return false;
                     }
                 });
+                final ConnectionContext connectionContext = createConnectionContext();
+                for (Message message : toExpire) {
+                    for (DurableTopicSubscription sub : durableSubcribers.values()) {
+                        if (!sub.isActive()) {
+                            messageExpired(connectionContext, sub, message);
+                        }
+                    }
+                }
                 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                 if (msgs != null) {
                     for (int i = 0; i < msgs.length && browseList.size() <
max; i++) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Oct 10 18:21:46 2011
@@ -222,9 +222,10 @@ public abstract class AbstractStoreCurso
 
     
     public final synchronized void remove(MessageReference node) {
-        size--;
-        setCacheEnabled(false);
-        batchList.remove(node);
+        if (batchList.remove(node) != null) {
+            size--;
+            setCacheEnabled(false);
+        }
     }
     
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Mon Oct 10 18:21:46 2011
@@ -89,11 +89,13 @@ public class OrderedPendingList implemen
         };
     }
 
-    public void remove(MessageReference message) {
+    public PendingNode remove(MessageReference message) {
+        PendingNode node = null;
         if (message != null) {
-            PendingNode node = this.map.remove(message.getMessageId());
+            node = this.map.remove(message.getMessageId());
             removeNode(node);
         }
+        return node;
     }
 
     public int size() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
Mon Oct 10 18:21:46 2011
@@ -25,7 +25,7 @@ public interface PendingList {
     public void clear();
     public PendingNode addMessageFirst(MessageReference message);
     public PendingNode addMessageLast(MessageReference message);
-    public void remove(MessageReference message);
+    public PendingNode remove(MessageReference message);
     public int size();
     public Iterator<MessageReference> iterator();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
Mon Oct 10 18:21:46 2011
@@ -61,13 +61,15 @@ public class PrioritizedPendingList impl
         return new PrioritizedPendingListIterator();
     }
 
-    public void remove(MessageReference message) {
+    public PendingNode remove(MessageReference message) {
+        PendingNode node = null;
         if (message != null) {
-            PendingNode node = this.map.remove(message.getMessageId());
+            node = this.map.remove(message.getMessageId());
             if (node != null) {
                 node.getList().removeNode(node);
             }
         }
+        return node;
     }
 
     public int size() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1181112&r1=1181111&r2=1181112&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Oct 10 18:21:46 2011
@@ -1197,6 +1197,8 @@ public abstract class MessageDatabase ex
                 }
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, sequence);
+            } else if (LOG.isDebugEnabled()) {
+                LOG.debug("no message sequence exists for id: " + command.getMessageId()
+ " and sub: " + command.getSubscriptionKey());
             }
 
         }
@@ -1900,7 +1902,7 @@ public abstract class MessageDatabase ex
      * @param tx
      * @param sd
      * @param subscriptionKey
-     * @param sequenceId
+     * @param messageSequence
      * @throws IOException
      */
     private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey,
Long messageSequence) throws IOException {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java?rev=1181112&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
Mon Oct 10 18:21:46 2011
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertNotNull;
+
+public class DurableSubscriptionHangTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
+    final static String brokerName = "DurableSubscriptionHangTestCase";
+    final static String clientID = "myId";
+    private static final String topicName = "myTopic";
+    private static final String durableSubName = "mySub";
+    BrokerService brokerService;
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setBrokerName(brokerName);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(5000);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+        brokerService.start();
+    }
+
+    @After
+    public void brokerStop() throws Exception {
+        brokerService.stop();
+    }
+
+	@Test
+	public void testHanging() throws Exception
+	{
+		registerDurableSubscription();
+		produceExpiredAndOneNonExpiredMessages();
+		TimeUnit.SECONDS.sleep(10);		// make sure messages are expired
+        Message message = collectMessagesFromDurableSubscriptionForOneMinute();
+        LOG.info("got message:" + message);
+        assertNotNull("Unable to read unexpired message", message);
+	}
+
+	private void produceExpiredAndOneNonExpiredMessages() throws JMSException {
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
+        TopicConnection connection = connectionFactory.createTopicConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+        MessageProducer producer = session.createProducer(topic);
+        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
+        for(int i=0; i<40000; i++)
+        {
+        	sendRandomMessage(session, producer);
+        }
+        producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
+        sendRandomMessage(session, producer);
+        connection.close();
+        LOG.info("produceExpiredAndOneNonExpiredMessages done");
+	}
+
+	private void registerDurableSubscription() throws JMSException
+	{
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
+		TopicConnection connection = connectionFactory.createTopicConnection();
+		connection.setClientID(clientID);
+		TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+		Topic topic = topicSession.createTopic(topicName);
+		TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName);
+		connection.start();
+		durableSubscriber.close();
+		connection.close();
+		LOG.info("Durable Sub Registered");
+	}
+
+	private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception
+	{
+		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
+		TopicConnection connection = connectionFactory.createTopicConnection();
+
+		connection.setClientID(clientID);
+		TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+		Topic topic = topicSession.createTopic(topicName);
+		connection.start();
+		TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
+		LOG.info("About to receive messages");
+		Message message = subscriber.receive(120000);
+		subscriber.close();
+		connection.close();
+		LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
+
+		return message;
+	}
+
+	private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException
{
+		TextMessage textMessage = session.createTextMessage();
+		textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz"));
+		producer.send(textMessage);
+	}
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message