Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 26280 invoked from network); 7 Jul 2006 17:26:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 7 Jul 2006 17:26:17 -0000 Received: (qmail 92304 invoked by uid 500); 7 Jul 2006 17:26:17 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 92260 invoked by uid 500); 7 Jul 2006 17:26:16 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 92250 invoked by uid 99); 7 Jul 2006 17:26:16 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jul 2006 10:26:16 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jul 2006 10:26:16 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 7505F1A981A; Fri, 7 Jul 2006 10:25:04 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r419930 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Date: Fri, 07 Jul 2006 17:25:04 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060707172504.7505F1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jstrachan Date: Fri Jul 7 10:25:03 2006 New Revision: 419930 URL: http://svn.apache.org/viewvc?rev=419930&view=rev Log: Added support for AMQ-798 to enable a new boolean header called JMSXGroupFirstForConsumer Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=419930&r1=419929&r2=419930&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Jul 7 10:25:03 2006 @@ -19,19 +19,25 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.group.MessageGroupMap; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.transaction.Synchronization; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; import java.io.IOException; import java.util.Iterator; public class QueueSubscription extends PrefetchSubscription implements LockOwner { + private static final Log log = LogFactory.getLog(QueueSubscription.class); + public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { super(broker,context, info); } @@ -80,7 +86,7 @@ // If we can own the first, then no-one else should own the rest. if( sequence == 1 ) { if( node.lock(this) ) { - messageGroupOwners.put(groupId, info.getConsumerId()); + assignGroupToMe(messageGroupOwners, n, groupId); return true; } else { return false; @@ -94,7 +100,7 @@ groupOwner = messageGroupOwners.get(groupId); if( groupOwner==null ) { if( node.lock(this) ) { - messageGroupOwners.put(groupId, info.getConsumerId()); + assignGroupToMe(messageGroupOwners, n, groupId); return true; } else { return false; @@ -116,6 +122,24 @@ return node.lock(this); } + } + + /** + * Assigns the message group to this subscription and set the flag on the message that it is the first message + * to be dispatched. + */ + protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { + messageGroupOwners.put(groupId, info.getConsumerId()); + Message message = n.getMessage(); + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + try { + activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true); + } + catch (JMSException e) { + log.warn("Failed to set boolean header: " + e, e); + } + } } public String toString() {