Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 77790 invoked from network); 25 Jun 2010 10:29:19 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 25 Jun 2010 10:29:19 -0000 Received: (qmail 37265 invoked by uid 500); 25 Jun 2010 10:29:19 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 37203 invoked by uid 500); 25 Jun 2010 10:29:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37196 invoked by uid 99); 25 Jun 2010 10:29:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 10:29:18 +0000 X-ASF-Spam-Status: No, hits=-1894.9 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Jun 2010 10:29:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C9BBE2388993; Fri, 25 Jun 2010 10:28:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r957881 [1/2] - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/jmx/ broker/region/ broker/region/cursors/ broker/region/policy/ broker/util/ command/ memory/list/ plugin/ Date: Fri, 25 Jun 2010 10:28:19 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100625102819.C9BBE2388993@eris.apache.org> Author: rajdavies Date: Fri Jun 25 10:28:17 2010 New Revision: 957881 URL: http://svn.apache.org/viewvc?rev=957881&view=rev Log: changes for https://issues.apache.org/activemq/browse/AMQ-2791 Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java Fri Jun 25 10:28:17 2010 @@ -22,7 +22,7 @@ import java.util.List; import org.apache.activemq.command.MessageDispatch; public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel { - private static Integer MAX_PRIORITY = 10; + private static final Integer MAX_PRIORITY = 10; private final Object mutex = new Object(); private final LinkedList[] lists; private boolean closed; @@ -234,7 +234,7 @@ public class SimplePriorityMessageDispat } protected int getPriority(MessageDispatch message) { - int priority = Message.DEFAULT_PRIORITY; + int priority = javax.jms.Message.DEFAULT_PRIORITY; if (message.getMessage() != null) { Math.max(message.getMessage().getPriority(), 0); priority = Math.min(priority, 9); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Fri Jun 25 10:28:17 2010 @@ -16,19 +16,6 @@ */ package org.apache.activemq.broker.jmx; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.Message; -import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.selector.SelectorParser; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -48,6 +35,19 @@ import javax.management.openmbean.OpenDa import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Message; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.selector.SelectorParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class DestinationView implements DestinationViewMBean { private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class); @@ -126,6 +126,10 @@ public class DestinationView implements public long getMinEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getMinTime(); } + + public boolean isPrioritizedMessages() { + return destination.isPrioritizedMessages(); + } public CompositeData[] browse() throws OpenDataException { try { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Fri Jun 25 10:28:17 2010 @@ -16,16 +16,15 @@ */ package org.apache.activemq.broker.jmx; +import java.io.IOException; import java.util.List; import java.util.Map; -import java.io.IOException; - import javax.jms.InvalidSelectorException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; public interface DestinationViewMBean { @@ -314,6 +313,12 @@ public interface DestinationViewMBean { public boolean isUseCache(); /** + * @return true if prioritized messages are enabled for the destination + */ + @MBeanInfo("Prioritized messages is enabled") + public boolean isPrioritizedMessages(); + + /** * @param value * enable/disable caching on the destination */ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Jun 25 10:28:17 2010 @@ -81,6 +81,7 @@ public abstract class BaseDestination im protected int cursorMemoryHighWaterMark = 70; protected int storeUsageHighWaterMark = 100; private SlowConsumerStrategy slowConsumerStrategy; + private boolean prioritizedMessages; /** * @param broker @@ -580,5 +581,14 @@ public abstract class BaseDestination im public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { this.slowConsumerStrategy = slowConsumerStrategy; } + + + public boolean isPrioritizedMessages() { + return this.prioritizedMessages; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Jun 25 10:28:17 2010 @@ -18,14 +18,12 @@ package org.apache.activemq.broker.regio import java.io.IOException; import java.util.List; - import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; @@ -215,4 +213,6 @@ public interface Destination extends Ser * @throws Exception */ void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception; + + boolean isPrioritizedMessages(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Jun 25 10:28:17 2010 @@ -19,7 +19,6 @@ package org.apache.activemq.broker.regio import java.io.IOException; import java.util.List; import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -39,7 +38,7 @@ import org.apache.activemq.usage.Usage; */ public class DestinationFilter implements Destination { - private Destination next; + private final Destination next; public DestinationFilter(Destination next) { this.next = next; @@ -270,4 +269,8 @@ public class DestinationFilter implement public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); } + + public boolean isPrioritizedMessages() { + return next.isPrioritizedMessages(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Fri Jun 25 10:28:17 2010 @@ -16,8 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; - import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; @@ -33,7 +31,7 @@ public interface MessageReference { MessageId getMessageId(); Message getMessageHardRef(); - Message getMessage() throws IOException; + Message getMessage(); boolean isPersistent(); Destination getRegionDestination(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Fri Jun 25 10:28:17 2010 @@ -16,8 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; - import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; @@ -28,7 +26,7 @@ import org.apache.activemq.command.Messa */ final class NullMessageReference implements QueueMessageReference { - private ActiveMQMessage message = new ActiveMQMessage(); + private final ActiveMQMessage message = new ActiveMQMessage(); private volatile int references; public void drop() { @@ -75,7 +73,7 @@ final class NullMessageReference impleme return 0; } - public Message getMessage() throws IOException { + public Message getMessage() { return message; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jun 25 10:28:17 2010 @@ -78,7 +78,7 @@ public abstract class PrefetchSubscripti } public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { - this(broker,usageManager,context, info, new VMPendingMessageCursor()); + this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jun 25 10:28:17 2010 @@ -236,7 +236,7 @@ public class Queue extends BaseDestinati public void initialize() throws Exception { if (this.messages == null) { if (destination.isTemporary() || broker == null || store == null) { - this.messages = new VMPendingMessageCursor(); + this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); } else { this.messages = new StoreQueueCursor(broker, this); } @@ -951,38 +951,30 @@ public class Queue extends BaseDestinati public Message getMessage(String id) { MessageId msgId = new MessageId(id); - try { - synchronized (pagedInMessages) { - QueueMessageReference r = this.pagedInMessages.get(msgId); - if (r != null) { - return r.getMessage(); - } + synchronized (pagedInMessages) { + QueueMessageReference r = this.pagedInMessages.get(msgId); + if (r != null) { + return r.getMessage(); } - synchronized (messages) { - try { - messages.reset(); - while (messages.hasNext()) { - try { - MessageReference r = messages.next(); - r.decrementReferenceCount(); - messages.rollback(r.getMessageId()); - if (msgId.equals(r.getMessageId())) { - Message m = r.getMessage(); - if (m != null) { - return m; - } - break; - } - } catch (IOException e) { - LOG.error("got an exception retrieving message " + id); + } + synchronized (messages) { + try { + messages.reset(); + while (messages.hasNext()) { + MessageReference r = messages.next(); + r.decrementReferenceCount(); + messages.rollback(r.getMessageId()); + if (msgId.equals(r.getMessageId())) { + Message m = r.getMessage(); + if (m != null) { + return m; } + break; } - } finally { - messages.release(); } + } finally { + messages.release(); } - } catch (IOException e) { - LOG.error("got an exception retrieving message " + id); } return null; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Fri Jun 25 10:28:17 2010 @@ -56,7 +56,7 @@ public class TempQueue extends Queue{ @Override public void initialize() throws Exception { - this.messages=new VMPendingMessageCursor(); + this.messages=new VMPendingMessageCursor(false); this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jun 25 10:28:17 2010 @@ -72,9 +72,9 @@ public class TopicSubscription extends A this.usageManager = usageManager; String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) { - this.matched = new VMPendingMessageCursor(); + this.matched = new VMPendingMessageCursor(false); } else { - this.matched = new FilePendingMessageCursor(broker,matchedName); + this.matched = new FilePendingMessageCursor(broker,matchedName,false); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Jun 25 10:28:17 2010 @@ -19,11 +19,14 @@ package org.apache.activemq.broker.regio import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; @@ -44,6 +47,11 @@ public abstract class AbstractPendingMes protected boolean useCache=true; private boolean started=false; protected MessageReference last = null; + protected final boolean prioritizedMessages; + + public AbstractPendingMessageCursor(boolean prioritizedMessages) { + this.prioritizedMessages=prioritizedMessages; + } public synchronized void start() throws Exception { @@ -304,4 +312,19 @@ public abstract class AbstractPendingMes protected synchronized boolean isStarted() { return started; } + + public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) { + boolean result = false; + Set destinations = broker.getDestinations(sub.getActiveMQDestination()); + if (destinations != null) { + for (Destination dest:destinations) { + if (dest.isPrioritizedMessages()) { + result = true; + break; + } + } + } + return result; + + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Fri Jun 25 10:28:17 2010 @@ -17,8 +17,6 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map.Entry; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; @@ -34,8 +32,8 @@ import org.apache.commons.logging.LogFac public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); protected final Destination regionDestination; - private final LinkedHashMap batchList = new LinkedHashMap (); - private Iterator> iterator = null; + private final PendingList batchList; + private Iterator iterator = null; private boolean cacheEnabled=false; protected boolean batchResetNeeded = true; protected boolean storeHasMessages = false; @@ -43,10 +41,16 @@ public abstract class AbstractStoreCurso private MessageId lastCachedId; protected AbstractStoreCursor(Destination destination) { + super((destination != null ? destination.isPrioritizedMessages():false)); this.regionDestination=destination; + if (this.prioritizedMessages) { + this.batchList= new PrioritizedPendingList(); + }else { + this.batchList = new OrderedPendingList(); + } } - @Override + public final synchronized void start() throws Exception{ if (!isStarted()) { super.start(); @@ -60,7 +64,7 @@ public abstract class AbstractStoreCurso } } - @Override + public final synchronized void stop() throws Exception { resetBatch(); super.stop(); @@ -82,7 +86,7 @@ public abstract class AbstractStoreCurso } } message.incrementReferenceCount(); - batchList.put(message.getMessageId(), message); + batchList.addMessageLast(message); clearIterator(true); recovered = true; } else { @@ -99,7 +103,7 @@ public abstract class AbstractStoreCurso return recovered; } - @Override + public final void reset() { if (batchList.isEmpty()) { try { @@ -113,7 +117,7 @@ public abstract class AbstractStoreCurso size(); } - @Override + public synchronized void release() { clearIterator(false); } @@ -129,7 +133,7 @@ public abstract class AbstractStoreCurso private synchronized void ensureIterator() { if(this.iterator==null) { - this.iterator=this.batchList.entrySet().iterator(); + this.iterator=this.batchList.iterator(); } } @@ -137,7 +141,7 @@ public abstract class AbstractStoreCurso public final void finished() { } - @Override + public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { @@ -151,11 +155,11 @@ public abstract class AbstractStoreCurso return this.iterator.hasNext(); } - @Override + public final synchronized MessageReference next() { MessageReference result = null; if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { - result = this.iterator.next().getValue(); + result = this.iterator.next(); } last = result; if (result != null) { @@ -164,7 +168,7 @@ public abstract class AbstractStoreCurso return result; } - @Override + public final synchronized void addMessageLast(MessageReference node) throws Exception { if (cacheEnabled && hasSpace()) { recoverMessage(node.getMessage(),true); @@ -189,13 +193,13 @@ public abstract class AbstractStoreCurso protected void setBatch(MessageId messageId) throws Exception { } - @Override + public final synchronized void addMessageFirst(MessageReference node) throws Exception { cacheEnabled=false; size++; } - @Override + public final synchronized void remove() { size--; if (iterator!=null) { @@ -212,21 +216,22 @@ public abstract class AbstractStoreCurso } } - @Override + public final synchronized void remove(MessageReference node) { size--; cacheEnabled=false; - batchList.remove(node.getMessageId()); + batchList.remove(node); } - @Override + public final synchronized void clear() { gc(); } - @Override + public final synchronized void gc() { - for (Message msg : batchList.values()) { + for (Iteratori = batchList.iterator();i.hasNext();) { + MessageReference msg = i.next(); rollback(msg.getMessageId()); msg.decrementReferenceCount(); } @@ -241,7 +246,7 @@ public abstract class AbstractStoreCurso } } - @Override + protected final synchronized void fillBatch() { if (batchResetNeeded) { resetBatch(); @@ -261,18 +266,18 @@ public abstract class AbstractStoreCurso } } - @Override + public final synchronized boolean isEmpty() { // negative means more messages added to store through queue.send since last reset return size == 0; } - @Override + public final synchronized boolean hasMessagesBufferedToDeliver() { return !batchList.isEmpty(); } - @Override + public final synchronized int size() { if (size < 0) { this.size = getStoreSize(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Jun 25 10:28:17 2010 @@ -63,9 +63,11 @@ public class FilePendingMessageCursor ex /** * @param broker * @param name + * @param prioritizedMessages * @param store */ - public FilePendingMessageCursor(Broker broker, String name) { + public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { + super(prioritizedMessages); this.useCache = false; this.broker = broker; // the store can be null if the BrokerService has persistence @@ -190,6 +192,7 @@ public class FilePendingMessageCursor ex tryAddMessageLast(node, 0); } + @Override public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { if (!node.isExpired()) { try { Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=957881&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java Fri Jun 25 10:28:17 2010 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; + +public class OrderedPendingList implements PendingList { + PendingNode root = null; + PendingNode tail = null; + final Map map = new HashMap(); + + public PendingNode addMessageFirst(MessageReference message) { + PendingNode node = new PendingNode(this, message); + if (root == null) { + root = node; + tail = node; + } else { + root.linkBefore(node); + } + this.map.put(message.getMessageId(), node); + return node; + } + + public PendingNode addMessageLast(MessageReference message) { + PendingNode node = new PendingNode(this, message); + if (root == null) { + root = node; + } else { + tail.linkAfter(node); + } + tail = node; + this.map.put(message.getMessageId(), node); + return node; + } + + public void clear() { + this.root = null; + this.tail = null; + this.map.clear(); + } + + public boolean isEmpty() { + return this.map.isEmpty(); + } + + public Iterator iterator() { + return new Iterator() { + private PendingNode current = null; + private PendingNode next = root; + + public boolean hasNext() { + return next != null; + } + + public MessageReference next() { + MessageReference result = null; + this.current = this.next; + result = this.current.getMessage(); + this.next = (PendingNode) this.next.getNext(); + return result; + } + + public void remove() { + if (this.current != null && this.current.getMessage() != null) { + map.remove(this.current.getMessage().getMessageId()); + } + removeNode(this.current); + } + }; + } + + public void remove(MessageReference message) { + if (message != null) { + PendingNode node = this.map.remove(message.getMessageId()); + removeNode(node); + } + } + + public int size() { + return this.map.size(); + } + + void removeNode(PendingNode node) { + if (node != null) { + map.remove(node.getMessage().getMessageId()); + if (root == node) { + root = (PendingNode) node.getNext(); + } + if (tail == node) { + tail = (PendingNode) node.getPrevious(); + } + node.unlink(); + } + } + + List getAsList() { + List result = new ArrayList(size()); + PendingNode node = root; + while (node != null) { + result.add(node); + node = (PendingNode) node.getNext(); + } + return result; + } + + @Override + public String toString() { + return "OrderedPendingList(" + System.identityHashCode(this) + ")"; + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=957881&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java Fri Jun 25 10:28:17 2010 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.Iterator; +import org.apache.activemq.broker.region.MessageReference; + +public interface PendingList { + + public boolean isEmpty(); + public void clear(); + public PendingNode addMessageFirst(MessageReference message); + public PendingNode addMessageLast(MessageReference message); + public void remove(MessageReference message); + public int size(); + public Iterator iterator(); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java?rev=957881&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java Fri Jun 25 10:28:17 2010 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.util.LinkedNode; + +public class PendingNode extends LinkedNode { + private final MessageReference message; + private final OrderedPendingList list; + public PendingNode(OrderedPendingList list,MessageReference message) { + this.list = list; + this.message = message; + } + + MessageReference getMessage() { + return this.message; + } + + OrderedPendingList getList() { + return this.list; + } + + @Override + public String toString() { + PendingNode n = (PendingNode) getNext(); + String str = "PendingNode("; + str += System.identityHashCode(this) + "),root="+isHeadNode()+",next="+(n != null ?System.identityHashCode(n):"NULL"); + return str; + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=957881&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java Fri Jun 25 10:28:17 2010 @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; + +public class PrioritizedPendingList implements PendingList { + static final Integer MAX_PRIORITY = 10; + private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY]; + final Map map = new HashMap(); + + public PrioritizedPendingList() { + for (int i = 0; i < MAX_PRIORITY; i++) { + this.lists[i] = new OrderedPendingList(); + } + } + public PendingNode addMessageFirst(MessageReference message) { + PendingNode node = getList(message).addMessageFirst(message); + this.map.put(message.getMessageId(), node); + return node; + } + + public PendingNode addMessageLast(MessageReference message) { + PendingNode node = getList(message).addMessageLast(message); + this.map.put(message.getMessageId(), node); + return node; + } + + public void clear() { + for (int i = 0; i < MAX_PRIORITY; i++) { + this.lists[i].clear(); + } + this.map.clear(); + } + + public boolean isEmpty() { + return this.map.isEmpty(); + } + + public Iterator iterator() { + return new PrioritizedPendingListIterator(); + } + + public void remove(MessageReference message) { + if (message != null) { + PendingNode node = this.map.remove(message.getMessageId()); + if (node != null) { + node.getList().removeNode(node); + } + } + } + + public int size() { + return this.map.size(); + } + + @Override + public String toString() { + return "PrioritizedPendingList(" + System.identityHashCode(this) + ")"; + } + + protected int getPriority(MessageReference message) { + int priority = javax.jms.Message.DEFAULT_PRIORITY; + if (message.getMessageId() != null) { + Math.max(message.getMessage().getPriority(), 0); + priority = Math.min(priority, 9); + } + return priority; + } + + protected OrderedPendingList getList(MessageReference msg) { + return lists[getPriority(msg)]; + } + + private class PrioritizedPendingListIterator implements Iterator { + private int index = 0; + private int currentIndex = 0; + List list = new ArrayList(size()); + + PrioritizedPendingListIterator() { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + OrderedPendingList orderedPendingList = lists[i]; + if (!orderedPendingList.isEmpty()) { + list.addAll(orderedPendingList.getAsList()); + } + } + } + public boolean hasNext() { + return list.size() > index; + } + + public MessageReference next() { + PendingNode node = list.get(this.index); + this.currentIndex = this.index; + this.index++; + return node.getMessage(); + } + + public void remove() { + PendingNode node = list.get(this.currentIndex); + if (node != null) { + map.remove(node.getMessage().getMessageId()); + node.getList().removeNode(node); + } + + } + + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Jun 25 10:28:17 2010 @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -58,13 +57,14 @@ public class StoreDurableSubscriberCurso * @param subscription subscription for this cursor */ public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { + super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); this.subscription=subscription; this.clientId = clientId; this.subscriberName = subscriberName; if (broker.getBrokerService().isPersistent()) { - this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName); + this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); }else { - this.nonPersistent = new VMPendingMessageCursor(); + this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } this.nonPersistent.setMaxBatchSize(maxBatchSize); @@ -72,6 +72,7 @@ public class StoreDurableSubscriberCurso this.storePrefetches.add(this.nonPersistent); } + @Override public synchronized void start() throws Exception { if (!isStarted()) { super.start(); @@ -82,6 +83,7 @@ public class StoreDurableSubscriberCurso } } + @Override public synchronized void stop() throws Exception { if (isStarted()) { super.stop(); @@ -98,6 +100,7 @@ public class StoreDurableSubscriberCurso * @param destination * @throws Exception */ + @Override public synchronized void add(ConnectionContext context, Destination destination) throws Exception { if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); @@ -122,6 +125,7 @@ public class StoreDurableSubscriberCurso * @param destination * @throws Exception */ + @Override public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { PendingMessageCursor tsp = topics.remove(destination); if (tsp != null) { @@ -133,6 +137,7 @@ public class StoreDurableSubscriberCurso /** * @return true if there are no pending messages */ + @Override public synchronized boolean isEmpty() { for (PendingMessageCursor tsp : storePrefetches) { if( !tsp.isEmpty() ) @@ -141,6 +146,7 @@ public class StoreDurableSubscriberCurso return true; } + @Override public synchronized boolean isEmpty(Destination destination) { boolean result = true; TopicStorePrefetch tsp = topics.get(destination); @@ -157,10 +163,12 @@ public class StoreDurableSubscriberCurso * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor * @return true if recovery required */ + @Override public boolean isRecoveryRequired() { return false; } + @Override public synchronized void addMessageLast(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); @@ -179,16 +187,19 @@ public class StoreDurableSubscriberCurso } } + @Override public synchronized void addRecoveredMessage(MessageReference node) throws Exception { nonPersistent.addMessageLast(node); } + @Override public synchronized void clear() { for (PendingMessageCursor tsp : storePrefetches) { tsp.clear(); } } + @Override public synchronized boolean hasNext() { boolean result = true; if (result) { @@ -203,35 +214,41 @@ public class StoreDurableSubscriberCurso return result; } + @Override public synchronized MessageReference next() { MessageReference result = currentCursor != null ? currentCursor.next() : null; return result; } + @Override public synchronized void remove() { if (currentCursor != null) { currentCursor.remove(); } } + @Override public synchronized void remove(MessageReference node) { if (currentCursor != null) { currentCursor.remove(node); } } + @Override public synchronized void reset() { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.reset(); } } + @Override public synchronized void release() { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.release(); } } + @Override public synchronized int size() { int pendingCount=0; for (PendingMessageCursor tsp : storePrefetches) { @@ -240,6 +257,7 @@ public class StoreDurableSubscriberCurso return pendingCount; } + @Override public void setMaxBatchSize(int maxBatchSize) { for (PendingMessageCursor storePrefetch : storePrefetches) { storePrefetch.setMaxBatchSize(maxBatchSize); @@ -247,12 +265,14 @@ public class StoreDurableSubscriberCurso super.setMaxBatchSize(maxBatchSize); } + @Override public synchronized void gc() { for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); } } + @Override public void setSystemUsage(SystemUsage usageManager) { super.setSystemUsage(usageManager); for (PendingMessageCursor tsp : storePrefetches) { @@ -260,6 +280,7 @@ public class StoreDurableSubscriberCurso } } + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); for (PendingMessageCursor cursor : storePrefetches) { @@ -267,6 +288,7 @@ public class StoreDurableSubscriberCurso } } + @Override public void setMaxProducersToAudit(int maxProducersToAudit) { super.setMaxProducersToAudit(maxProducersToAudit); for (PendingMessageCursor cursor : storePrefetches) { @@ -274,6 +296,7 @@ public class StoreDurableSubscriberCurso } } + @Override public void setMaxAuditDepth(int maxAuditDepth) { super.setMaxAuditDepth(maxAuditDepth); for (PendingMessageCursor cursor : storePrefetches) { @@ -281,6 +304,7 @@ public class StoreDurableSubscriberCurso } } + @Override public void setEnableAudit(boolean enableAudit) { super.setEnableAudit(enableAudit); for (PendingMessageCursor cursor : storePrefetches) { @@ -288,6 +312,7 @@ public class StoreDurableSubscriberCurso } } + @Override public void setUseCache(boolean useCache) { super.setUseCache(useCache); for (PendingMessageCursor cursor : storePrefetches) { @@ -313,6 +338,7 @@ public class StoreDurableSubscriberCurso return currentCursor; } + @Override public String toString() { return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Jun 25 10:28:17 2010 @@ -32,21 +32,21 @@ import org.apache.commons.logging.LogFac public class StoreQueueCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class); - private Broker broker; + private final Broker broker; private int pendingCount; - private Queue queue; + private final Queue queue; private PendingMessageCursor nonPersistent; - private QueueStorePrefetch persistent; + private final QueueStorePrefetch persistent; private boolean started; private PendingMessageCursor currentCursor; /** * Construct - * + * @param broker * @param queue - * @param tmpStore */ public StoreQueueCursor(Broker broker,Queue queue) { + super((queue != null ? queue.isPrioritizedMessages():false)); this.broker=broker; this.queue = queue; this.persistent = new QueueStorePrefetch(queue); @@ -58,9 +58,9 @@ public class StoreQueueCursor extends Ab super.start(); if (nonPersistent == null) { if (broker.getBrokerService().isPersistent()) { - nonPersistent = new FilePendingMessageCursor(broker,queue.getName()); + nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); }else { - nonPersistent = new VMPendingMessageCursor(); + nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } nonPersistent.setMaxBatchSize(getMaxBatchSize()); nonPersistent.setSystemUsage(systemUsage); @@ -101,7 +101,7 @@ public class StoreQueueCursor extends Ab } } } - + public synchronized void addMessageFirst(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); @@ -240,6 +240,7 @@ public class StoreQueueCursor extends Ab } } + @Override public void setUseCache(boolean useCache) { super.setUseCache(useCache); if (persistent != null) { @@ -250,6 +251,7 @@ public class StoreQueueCursor extends Ab } } + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); if (persistent != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Fri Jun 25 10:28:17 2010 @@ -32,13 +32,20 @@ import org.apache.activemq.broker.region * @version $Revision$ */ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { - private final LinkedList list = new LinkedList(); + private final PendingList list; private Iterator iter; - public VMPendingMessageCursor() { + + public VMPendingMessageCursor(boolean prioritizedMessages) { + super(prioritizedMessages); this.useCache = false; + if (this.prioritizedMessages) { + this.list= new PrioritizedPendingList(); + }else { + this.list = new OrderedPendingList(); + } } - @Override + public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { List rc = new ArrayList(); @@ -56,7 +63,7 @@ public class VMPendingMessageCursor exte /** * @return true if there are no pending messages */ - @Override + public synchronized boolean isEmpty() { if (list.isEmpty()) { return true; @@ -79,9 +86,9 @@ public class VMPendingMessageCursor exte /** * reset the cursor */ - @Override + public synchronized void reset() { - iter = list.listIterator(); + iter = list.iterator(); last = null; } @@ -90,10 +97,10 @@ public class VMPendingMessageCursor exte * * @param node */ - @Override + public synchronized void addMessageLast(MessageReference node) { node.incrementReferenceCount(); - list.addLast(node); + list.addMessageLast(node); } /** @@ -102,16 +109,16 @@ public class VMPendingMessageCursor exte * @param position * @param node */ - @Override + public synchronized void addMessageFirst(MessageReference node) { node.incrementReferenceCount(); - list.addFirst(node); + list.addMessageFirst(node); } /** * @return true if there pending messages to dispatch */ - @Override + public synchronized boolean hasNext() { return iter.hasNext(); } @@ -119,7 +126,7 @@ public class VMPendingMessageCursor exte /** * @return the next pending message */ - @Override + public synchronized MessageReference next() { last = iter.next(); if (last != null) { @@ -131,7 +138,7 @@ public class VMPendingMessageCursor exte /** * remove the message at the cursor position */ - @Override + public synchronized void remove() { if (last != null) { last.decrementReferenceCount(); @@ -142,7 +149,7 @@ public class VMPendingMessageCursor exte /** * @return the number of pending messages */ - @Override + public synchronized int size() { return list.size(); } @@ -150,7 +157,7 @@ public class VMPendingMessageCursor exte /** * clear all pending messages */ - @Override + public synchronized void clear() { for (Iterator i = list.iterator(); i.hasNext();) { MessageReference ref = i.next(); @@ -159,16 +166,10 @@ public class VMPendingMessageCursor exte list.clear(); } - @Override + public synchronized void remove(MessageReference node) { - for (Iterator i = list.iterator(); i.hasNext();) { - MessageReference ref = i.next(); - if (node.getMessageId().equals(ref.getMessageId())) { - ref.decrementReferenceCount(); - i.remove(); - break; - } - } + list.remove(node); + node.decrementReferenceCount(); } /** @@ -177,10 +178,11 @@ public class VMPendingMessageCursor exte * @param maxItems * @return a list of paged in messages */ - @Override + public LinkedList pageInList(int maxItems) { LinkedList result = new LinkedList(); - for (MessageReference ref: list) { + for (Iteratori = list.iterator();i.hasNext();) { + MessageReference ref = i.next(); ref.incrementReferenceCount(); result.add(ref); if (result.size() >= maxItems) { @@ -190,12 +192,12 @@ public class VMPendingMessageCursor exte return result; } - @Override + public boolean isTransient() { return true; } - @Override + public void destroy() throws Exception { super.destroy(); clear(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -42,6 +43,6 @@ public class FilePendingDurableSubscribe * @return the Pending Message cursor */ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { - return new FilePendingMessageCursor(broker,name); + return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -39,7 +39,7 @@ public class FilePendingQueueMessageStor * org.apache.activemq.kaha.Store) */ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { - return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName()); + return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName(),queue.isPrioritizedMessages()); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -31,14 +33,16 @@ import org.apache.activemq.broker.region public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy { /** - * @param broker + * @param broker * @param name * @param maxBatchSize * @return a Cursor * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { - return new FilePendingMessageCursor(broker,"PendingCursor:" + name); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker, String name, int maxBatchSize, + Subscription subs) { + return new FilePendingMessageCursor(broker, "PendingCursor:" + name, AbstractPendingMessageCursor + .isPrioritizedMessageSubscriber(broker, subs)); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; /** @@ -35,5 +36,5 @@ public interface PendingSubscriberMessag * @param maxBatchSize * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize); + PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jun 25 10:28:17 2010 @@ -87,6 +87,7 @@ public class PolicyEntry extends Destina private int cursorMemoryHighWaterMark = 70; private int storeUsageHighWaterMark = 100; private SlowConsumerStrategy slowConsumerStrategy; + private boolean prioritizedMessages; public void configure(Broker broker,Queue queue) { @@ -155,6 +156,7 @@ public class PolicyEntry extends Destina scs.setScheduler(broker.getScheduler()); } destination.setSlowConsumerStrategy(scs); + destination.setPrioritizedMessages(isPrioritizedMessages()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -184,7 +186,7 @@ public class PolicyEntry extends Destina if (pendingSubscriberPolicy != null) { String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); - subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize)); + subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); } if (enableAudit) { subscription.setEnableAudit(enableAudit); @@ -739,5 +741,14 @@ public class PolicyEntry extends Destina public SlowConsumerStrategy getSlowConsumerStrategy() { return this.slowConsumerStrategy; } + + + public boolean isPrioritizedMessages() { + return this.prioritizedMessages; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -40,6 +41,6 @@ public class VMPendingDurableSubscriberM * @return the Pending Message cursor */ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) { - return new VMPendingMessageCursor(); + return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -37,6 +37,6 @@ public class VMPendingQueueMessageStorag * @return the cursor */ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) { - return new VMPendingMessageCursor(); + return new VMPendingMessageCursor(queue.isPrioritizedMessages()); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010 @@ -17,6 +17,8 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -38,7 +40,7 @@ public class VMPendingSubscriberMessageS * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, * org.apache.activemq.kaha.Store, int) */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) { - return new VMPendingMessageCursor(); + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs) { + return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, subs)); } }