activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r691114 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/jmx/
Date Tue, 02 Sep 2008 06:14:24 GMT
Author: rajdavies
Date: Mon Sep  1 23:14:22 2008
New Revision: 691114

URL: http://svn.apache.org/viewvc?rev=691114&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1909
and https://issues.apache.org/activemq/browse/AMQ-1914

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Sep  1 23:14:22 2008
@@ -38,7 +38,7 @@
      * from persistent storage
      */
     public static final int MAX_PAGE_SIZE=200;
-   
+    public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
@@ -49,6 +49,7 @@
     private int maxAuditDepth=2048;
     private boolean enableAudit=true;
     private int maxPageSize=MAX_PAGE_SIZE;
+    private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
     private boolean useCache=true;
     private int minimumMessageSize=1024;
     private boolean lazyDispatch=false;
@@ -187,6 +188,14 @@
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
     }
+    
+    public int getMaxBrowsePageSize() {
+        return this.maxBrowsePageSize;
+    }
+
+    public void setMaxBrowsePageSize(int maxPageSize) {
+        this.maxBrowsePageSize = maxPageSize;
+    } 
 
     public boolean isUseCache() {
         return useCache;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Mon Sep  1 23:14:22 2008
@@ -91,6 +91,10 @@
     
     public void setMaxPageSize(int maxPageSize);
     
+    public int getMaxBrowsePageSize();
+
+    public void setMaxBrowsePageSize(int maxPageSize);
+    
     public boolean isUseCache();
     
     public void setUseCache(boolean useCache);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Sep  1 23:14:22 2008
@@ -249,4 +249,12 @@
     public void messageExpired(ConnectionContext context, Subscription subs,MessageReference
node) {
        next.messageExpired(context,subs, node);    
     }
+
+    public int getMaxBrowsePageSize() {
+       return next.getMaxBrowsePageSize();
+    }
+
+    public void setMaxBrowsePageSize(int maxPageSize) {
+        next.setMaxBrowsePageSize(maxPageSize);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Sep  1 23:14:22 2008
@@ -21,6 +21,7 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -616,80 +617,91 @@
     }
 
     public Message[] browse() {
+        int count = 0;
         List<Message> l = new ArrayList<Message>();
         try {
-            doPageIn(true);
-        } catch (Exception e) {
-            LOG.error("caught an exception browsing " + this, e);
-        }
-        synchronized (pagedInMessages) {
-            for (QueueMessageReference node:pagedInMessages.values()){
-                node.incrementReferenceCount();
-                try {
-                    Message m = node.getMessage();
-                    if (m != null) {
-                        l.add(m);
+            synchronized (this.pagedInPendingDispatch) {
+                for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
+                        .iterator(); i.hasNext()
+                        && count < getMaxBrowsePageSize();) {
+                    l.add(i.next().getMessage());
+                    count++;
+                }
+            }
+            if (count < getMaxBrowsePageSize()) {
+                synchronized (pagedInMessages) {
+                    for (Iterator<QueueMessageReference> i = this.pagedInMessages
+                            .values().iterator(); i.hasNext()
+                            && count < getMaxBrowsePageSize();) {
+                        Message m = i.next().getMessage();
+                        if (l.contains(m) == false) {
+                            l.add(m);
+                            count++;
+                        }
                     }
-                } catch (IOException e) {
-                    LOG.error("caught an exception browsing " + this, e);
-                } finally {
-                    node.decrementReferenceCount();
                 }
             }
-        }
-        synchronized (messages) {
-            try {
-                messages.reset();
-                while (messages.hasNext()) {
+            if (count < getMaxBrowsePageSize()) {
+                synchronized (messages) {
                     try {
-                        MessageReference r = messages.next();
-                        r.incrementReferenceCount();
-                        try {
-                            Message m = r.getMessage();
-                            if (m != null) {
-                                l.add(m);
+                        messages.reset();
+                        while (messages.hasNext()
+                                && count < getMaxBrowsePageSize()) {
+                            MessageReference node = messages.next();
+                            messages.rollback(node.getMessageId());
+                            if (node != null) {
+                                Message m = node.getMessage();
+                                if (l.contains(m) == false) {
+                                    l.add(m);
+                                    count++;
+                                }
                             }
-                        } finally {
-                            r.decrementReferenceCount();
                         }
-                    } catch (IOException e) {
-                        LOG.error("caught an exception brwsing " + this, e);
+                    } finally {
+                        messages.release();
                     }
                 }
-            } finally {
-                messages.release();
             }
+        } catch (IOException e) {
+            LOG.error("Problem retrieving message in browse() ", e);
         }
-
         return l.toArray(new Message[l.size()]);
     }
 
-    public Message getMessage(String messageId) {
-        synchronized (messages) {
-            try {
-                messages.reset();
-                while (messages.hasNext()) {
-                    try {
-                        MessageReference r = messages.next();
-                        if (messageId.equals(r.getMessageId().toString())) {
-                            r.incrementReferenceCount();
-                            try {
+    public Message getMessage(String id) {
+        MessageId msgId = new MessageId(id);
+        try {
+            synchronized (pagedInMessages) {
+                QueueMessageReference r = this.pagedInMessages.get(msgId);
+                if (r != null) {
+                    return r.getMessage();
+                }
+            }
+            synchronized (messages) {
+                try {
+                    messages.reset();
+                    while (messages.hasNext()) {
+                        try {
+                            MessageReference r = messages.next();
+                            messages.rollback(r.getMessageId());
+                            if (msgId.equals(r.getMessageId())) {
                                 Message m = r.getMessage();
                                 if (m != null) {
                                     return m;
                                 }
-                            } finally {
-                                r.decrementReferenceCount();
+                                break;
                             }
-                            break;
+                        } catch (IOException e) {
+                            LOG.error("got an exception retrieving message "
+                                    + id);
                         }
-                    } catch (IOException e) {
-                        LOG.error("got an exception retrieving message " + messageId);
                     }
+                } finally {
+                    messages.release();
                 }
-            } finally {
-                messages.release();
             }
+        } catch (IOException e) {
+            LOG.error("got an exception retrieving message " + id);
         }
         return null;
     }
@@ -852,7 +864,7 @@
      * @return the number of messages removed
      */
     public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination
dest) throws Exception {
-        return moveMatchingMessagesTo(context, selector, dest, -1);
+        return moveMatchingMessagesTo(context, selector, dest,Integer.MAX_VALUE);
     }
 
     /**
@@ -867,7 +879,9 @@
      * Moves the messages matching the given filter up to the maximum number of
      * matched messages
      */
-    public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter,
ActiveMQDestination dest,int maximumMessages) throws Exception {
+    public int moveMatchingMessagesTo(ConnectionContext context,
+            MessageReferenceFilter filter, ActiveMQDestination dest,
+            int maximumMessages) throws Exception {
         int movedCounter = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
@@ -875,28 +889,27 @@
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
-            List <MessageReference>list = new ArrayList<MessageReference>(set);
-            for (MessageReference ref:list) {
+            List<MessageReference> list = new ArrayList<MessageReference>(set);
+            for (MessageReference ref : list) {
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
-                    r.incrementReferenceCount();
-                    try {
-                        Message m = r.getMessage();
-                        BrokerSupport.resend(context, m, dest);
-                        removeMessage(context, r);
-                        set.remove(r);
-                        if (++movedCounter >= maximumMessages
-                                && maximumMessages > 0) {
-                            return movedCounter;
-                        }
-                    } finally {
-                        r.decrementReferenceCount();
+                    Message m = r.getMessage();
+                    BrokerSupport.resend(context, m, dest);
+                    removeMessage(context, r);
+                    set.remove(r);
+                    if (++movedCounter >= maximumMessages
+                            && maximumMessages > 0) {
+                        return movedCounter;
+                    }
+                } else {
+                    synchronized (messages) {
+                        messages.rollback(r.getMessageId());
                     }
                 }
-                
             }
-        } while (set.size() < this.destinationStatistics.getMessages().getCount());
+        } while (set.size() < this.destinationStatistics.getMessages().getCount()
+                && set.size() < maximumMessages);
         return movedCounter;
     }
     
@@ -936,7 +949,9 @@
 	                            // make sure it gets queued for dispatched again
 	                            dispatchLock.lock();
 	                            try {
-	                                pagedInPendingDispatch.add(node);
+	                                synchronized(pagedInPendingDispatch) {
+	                                    pagedInPendingDispatch.add(node);
+	                                }
 	                            } finally {
 	                                dispatchLock.unlock();
 	                            }
@@ -993,6 +1008,9 @@
             public boolean evaluate(ConnectionContext context, MessageReference r) {
                 return messageId.equals(r.getMessageId().toString());
             }
+            public String toString() {
+                return "MessageIdFilter: "+messageId;
+            }
         };
     }
 
@@ -1031,21 +1049,13 @@
         acknowledge(context, sub, ack, reference);
 
         if (!ack.isInTransaction()) {
-            reference.drop();
-            destinationStatistics.getMessages().decrement();
-            synchronized(pagedInMessages) {
-                pagedInMessages.remove(reference.getMessageId());
-            }
+            dropMessage(reference);
             wakeup();
         } else {
             context.getTransaction().addSynchronization(new Synchronization() {
                 
                 public void afterCommit() throws Exception {
-                    reference.drop();
-                    destinationStatistics.getMessages().decrement();
-                    synchronized(pagedInMessages) {
-                        pagedInMessages.remove(reference.getMessageId());
-                    }
+                    dropMessage(reference);
                     wakeup();
                 }
                 
@@ -1057,6 +1067,17 @@
 
     }
     
+    private void dropMessage(QueueMessageReference reference) {
+        reference.drop();
+        destinationStatistics.getMessages().decrement();
+        synchronized(pagedInMessages) {
+            pagedInMessages.remove(reference.getMessageId());
+        }
+        synchronized(messages) {
+            messages.rollback(reference.getMessageId());
+        }
+    }
+    
     public void messageExpired(ConnectionContext context,MessageReference reference) {
         messageExpired(context,null,reference);
     }
@@ -1117,8 +1138,16 @@
         List<QueueMessageReference> result = null;
         dispatchLock.lock();
         try{
-            int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount())
- pagedInMessages.size();
-            toPageIn = Math.min(toPageIn,getMaxPageSize());
+           
+            int toPageIn = 0;
+            if (force) {
+                toPageIn = getMaxPageSize();
+            } else {
+                toPageIn = (getMaxPageSize() + (int) destinationStatistics
+                        .getInflight().getCount())
+                        - pagedInMessages.size();
+                toPageIn = Math.min(toPageIn, getMaxPageSize());
+            }
             if (isLazyDispatch()&& !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1129,6 +1158,7 @@
                 result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
                     try {
+                      
                         messages.reset();
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
@@ -1161,17 +1191,19 @@
     private void doDispatch(List<QueueMessageReference> list) throws Exception {
         dispatchLock.lock();
         try {
-            if(!pagedInPendingDispatch.isEmpty()) {
-                // Try to first dispatch anything that had not been dispatched before.
-                pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
-            }
-            // and now see if we can dispatch the new stuff.. and append to the pending 
-            // list anything that does not actually get dispatched.
-            if (list != null && !list.isEmpty()) {
-                if (pagedInPendingDispatch.isEmpty()) {
-                    pagedInPendingDispatch.addAll(doActualDispatch(list));
-                } else {
-                    pagedInPendingDispatch.addAll(list);
+            synchronized(pagedInPendingDispatch) {
+                if(!pagedInPendingDispatch.isEmpty()) {
+                    // Try to first dispatch anything that had not been dispatched before.
+                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+                }
+                // and now see if we can dispatch the new stuff.. and append to the pending

+                // list anything that does not actually get dispatched.
+                if (list != null && !list.isEmpty()) {
+                    if (pagedInPendingDispatch.isEmpty()) {
+                        pagedInPendingDispatch.addAll(doActualDispatch(list));
+                    } else {
+                        pagedInPendingDispatch.addAll(list);
+                    }
                 }
             }
         } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Mon Sep  1 23:14:22 2008
@@ -21,6 +21,7 @@
 import java.util.List;
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
@@ -34,7 +35,7 @@
  */
 public class AbstractPendingMessageCursor implements PendingMessageCursor {
     protected int memoryUsageHighWaterMark = 70;
-    protected int maxBatchSize = 100;
+    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
     protected SystemUsage systemUsage;
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
@@ -285,7 +286,7 @@
         return this.audit.isDuplicate(messageId);
     }
     
-    protected synchronized void rollback(MessageId id) {
+    public synchronized void rollback(MessageId id) {
         if (this.audit != null) {
             audit.rollback(id);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Sep  1 23:14:22 2008
@@ -41,6 +41,7 @@
     protected boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
+    protected Iterator<Entry<MessageId, Message>> iterator = null;
     protected int size;
     
     protected AbstractStoreCursor(Destination destination) {
@@ -93,8 +94,22 @@
     }
     
     public final void reset() {
+        if (batchList.isEmpty()) {
+            try {
+                fillBatch();
+            } catch (Exception e) {
+                LOG.error("Failed to fill batch", e);
+                throw new RuntimeException(e);
+            }
+        }
+        this.iterator = this.batchList.entrySet().iterator();
+    }
+    
+    public void release() {
+        this.iterator=null;
     }
 
+
     public final void finished() {
     }
         
@@ -102,22 +117,24 @@
         if (batchList.isEmpty()) {
             try {
                 fillBatch();
+                this.iterator = this.batchList.entrySet().iterator();
             } catch (Exception e) {
                 LOG.error("Failed to fill batch", e);
                 throw new RuntimeException(e);
             }
+        }else {
+            if (this.iterator==null) {
+                this.iterator=this.batchList.entrySet().iterator();
+            }
         }
-        boolean result= !batchList.isEmpty();
-        return result;
+        return this.iterator.hasNext();
     }
     
     public final synchronized MessageReference next() {
         Message result = null;
-        if (!this.batchList.isEmpty()) {
-            Iterator<Entry<MessageId, Message>> i = this.batchList.entrySet().iterator();
-            result = i.next().getValue();
+        if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
+            result = this.iterator.next().getValue();
             result.decrementReferenceCount();
-            i.remove();
         }
         return result;
     }
@@ -141,6 +158,9 @@
         if (size==0 && isStarted() && cacheEnabled) {
             cacheEnabled=true;
         }
+        if (iterator!=null) {
+            iterator.remove();
+        }
     }
 
     public final synchronized void remove(MessageReference node) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Mon Sep  1 23:14:22 2008
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -282,5 +283,11 @@
      * @return true if a cache is being used
      */
     public boolean isUseCache();
+    
+    /**
+     * remove from auditing the message id
+     * @param id
+     */
+    public void rollback(MessageId id);
    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Mon Sep  1 23:14:22 2008
@@ -16,12 +16,10 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
-import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -163,6 +161,12 @@
         nonPersistent.reset();
         persistent.reset();
     }
+    
+    public void release() {
+        nonPersistent.release();
+        persistent.release();
+    }
+
 
     public synchronized int size() {
         return pendingCount;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Sep  1 23:14:22 2008
@@ -59,6 +59,7 @@
     private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
     private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
+    private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
     private boolean useCache=true;
     private long minimumMessageSize=1024;
     private boolean useConsumerPriority=true;
@@ -119,6 +120,7 @@
         destination.setMaxAuditDepth(getMaxQueueAuditDepth());
         destination.setMaxProducersToAudit(getMaxProducersToAudit());
         destination.setMaxPageSize(getMaxPageSize());
+        destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
         destination.setUseCache(isUseCache());
         destination.setMinimumMessageSize((int) getMinimumMessageSize());
         destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
@@ -387,7 +389,15 @@
 
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
-    }    
+    } 
+    
+    public int getMaxBrowsePageSize() {
+        return maxBrowsePageSize;
+    }
+
+    public void setMaxBrowsePageSize(int maxPageSize) {
+        this.maxBrowsePageSize = maxPageSize;
+    } 
     
     public boolean isUseCache() {
         return useCache;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Mon Sep  1 23:14:22 2008
@@ -100,9 +100,7 @@
         else {
             echo("Current queue size: " + initialQueueSize);
         }
-        // TODO uncommenting this line causes a hang!
-        //int messageCount = initialQueueSize;
-        int messageCount = 10;
+        int messageCount = initialQueueSize;
         String[] messageIDs = new String[messageCount];
         for (int i = 0; i < messageCount; i++) {
             CompositeData cdata = compdatalist[i];
@@ -124,8 +122,7 @@
         compdatalist = queue.browse();
         int actualCount = compdatalist.length;
         echo("Current queue size: " + actualCount);
-        // TODO we seem to have browsed the queue and now there are messages missing!
-        //assertEquals("Should now have empty queue but was", initialQueueSize - messageCount,
actualCount);
+        assertEquals("Should now have empty queue but was", initialQueueSize - messageCount,
actualCount);
 
         echo("Now browsing the second queue");
 
@@ -137,7 +134,7 @@
         assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
     }
 
-    public void TODO_testRetryMessages() throws Exception {
+    public void testRetryMessages() throws Exception {
         // lets speed up redelivery
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
         factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
@@ -186,10 +183,7 @@
         else {
             echo("Current DLQ queue size: " + dlqQueueSize);
         }
-
-        // TODO uncommenting this line causes a hang!
-        //int messageCount = dlqQueueSize;
-        int messageCount = 10;
+        int messageCount = dlqQueueSize;
         String[] messageIDs = new String[messageCount];
         for (int i = 0; i < messageCount; i++) {
             CompositeData cdata = compdatalist[i];



Mime
View raw message