activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1159662 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/util/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bugs/
Date Fri, 19 Aug 2011 15:34:32 GMT
Author: gtully
Date: Fri Aug 19 15:34:31 2011
New Revision: 1159662

URL: http://svn.apache.org/viewvc?rev=1159662&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3362 - revisit, place a limit on a browse such that
the entire store is not recovered at once. Makes use of getMaxBrowsePageSize and getMaxExpirePageSize
like the queue case, overhead is too large otherwise as durable sub test shows

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1159662&r1=1159661&r2=1159662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Aug 19 15:34:31 2011
@@ -54,6 +54,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.util.InsertionCountList;
 import org.apache.activemq.command.*;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -757,24 +758,7 @@ public class Queue extends BaseDestinati
         }
 
         // just track the insertion count
-        List<Message> browsedMessages = new AbstractList<Message>() {
-            int size = 0;
-
-            @Override
-            public void add(int index, Message element) {
-                size++;
-            }
-
-            @Override
-            public int size() {
-                return size;
-            }
-
-            @Override
-            public Message get(int index) {
-                return null;
-            }
-        };
+        List<Message> browsedMessages = new InsertionCountList<Message>();
         doBrowse(browsedMessages, this.getMaxExpirePageSize());
         asyncWakeup();
         if (LOG.isDebugEnabled()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1159662&r1=1159661&r2=1159662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Aug 19 15:34:31 2011
@@ -33,6 +33,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.util.InsertionCountList;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
@@ -532,10 +533,15 @@ public class Topic extends BaseDestinati
     }
 
     public Message[] browse() {
-        final ConnectionContext connectionContext = createConnectionContext();
-        final Set<Message> result = new CopyOnWriteArraySet<Message>();
+        final List<Message> result = new ArrayList<Message>();
+        doBrowse(result, getMaxBrowsePageSize());
+        return result.toArray(new Message[result.size()]);
+    }
+
+    private void doBrowse(final List<Message> browseList, final int max) {
         try {
             if (topicStore != null) {
+                final ConnectionContext connectionContext = createConnectionContext();
                 topicStore.recover(new MessageRecoveryListener() {
                     public boolean recoverMessage(Message message) throws Exception {
                         if (message.isExpired()) {
@@ -545,7 +551,7 @@ public class Topic extends BaseDestinati
                                 }
                             }
                         }
-                        result.add(message);
+                        browseList.add(message);
                         return true;
                     }
 
@@ -554,7 +560,7 @@ public class Topic extends BaseDestinati
                     }
 
                     public boolean hasSpace() {
-                        return true;
+                        return browseList.size() < max;
                     }
 
                     public boolean isDuplicate(MessageId id) {
@@ -563,15 +569,14 @@ public class Topic extends BaseDestinati
                 });
                 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
                 if (msgs != null) {
-                    for (int i = 0; i < msgs.length; i++) {
-                        result.add(msgs[i]);
+                    for (int i = 0; i < msgs.length && browseList.size() <
max; i++) {
+                        browseList.add(msgs[i]);
                     }
                 }
             }
         } catch (Throwable e) {
             LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(),
e);
         }
-        return result.toArray(new Message[result.size()]);
     }
 
     public boolean iterate() {
@@ -656,7 +661,8 @@ public class Topic extends BaseDestinati
 
     private final Runnable expireMessagesTask = new Runnable() {
         public void run() {
-            browse();
+            List<Message> browsedMessages = new InsertionCountList<Message>();
+            doBrowse(browsedMessages, getMaxExpirePageSize());
         }
     };
 

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java?rev=1159662&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
Fri Aug 19 15:34:31 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.util.AbstractList;
+
+public class InsertionCountList<T> extends AbstractList<T> {
+    int size = 0;
+
+    @Override
+    public void add(int index, T element) {
+        size++;
+    }
+
+    @Override
+    public int size() {
+        return size;
+    }
+
+    @Override
+    public T get(int index) {
+        return null;
+    }
+};
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/InsertionCountList.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1159662&r1=1159661&r2=1159662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Aug 19 15:34:31 2011
@@ -455,7 +455,7 @@ public class KahaDBStore extends Message
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
listener.hasSpace() && iterator
                                 .hasNext(); ) {
                             Entry<Long, MessageKeys> entry = iterator.next();
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=1159662&r1=1159661&r2=1159662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Fri Aug 19 15:34:31 2011
@@ -64,7 +64,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DurableConsumerTest extends CombinationTestSupport{
     private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
-    private static int COUNT = 1024 * 10;
+    private static int COUNT = 1024 * 60;
     private static String CONSUMER_NAME = "DURABLE_TEST";
     protected BrokerService broker;
     



Mime
View raw message