Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 37391 invoked from network); 9 Mar 2007 16:05:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Mar 2007 16:05:22 -0000 Received: (qmail 39008 invoked by uid 500); 9 Mar 2007 16:05:03 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 38858 invoked by uid 500); 9 Mar 2007 16:05:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 38605 invoked by uid 99); 9 Mar 2007 16:05:00 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Mar 2007 08:05:00 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Mar 2007 07:59:46 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 954C01A983A; Fri, 9 Mar 2007 07:59:16 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r516444 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ virtual/ Date: Fri, 09 Mar 2007 15:59:16 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070309155916.954C01A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Mar 9 07:59:14 2007 New Revision: 516444 URL: http://svn.apache.org/viewvc?view=rev&rev=516444 Log: Refactor so that the ProducerBrokerExchange is passed all the way down to the Topic and Queue implementations. This is laying the ground work to implement window based producer flow control. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Mar 9 07:59:14 2007 @@ -303,7 +303,7 @@ producerExchange.setRegionDestination(regionDestination); } - producerExchange.getRegionDestination().send(context, messageSend); + producerExchange.getRegionDestination().send(producerExchange, messageSend); } public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws Exception{ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Mar 9 07:59:14 2007 @@ -21,6 +21,7 @@ import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -36,7 +37,7 @@ void addSubscription(ConnectionContext context, Subscription sub) throws Exception; void removeSubscription(ConnectionContext context, Subscription sub) throws Exception; - void send(ConnectionContext context, Message messageSend) throws Exception; + void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception; boolean lock(MessageReference node, LockOwner lockOwner); void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Mar 9 07:59:14 2007 @@ -19,6 +19,7 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -89,7 +90,7 @@ next.removeSubscription(context, sub); } - public void send(ConnectionContext context, Message messageSend) throws Exception { + public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { next.send(context, messageSend); } @@ -104,8 +105,8 @@ /** * Sends a message to the given destination which may be a wildcard */ - protected void send(ConnectionContext context, Message message, ActiveMQDestination destination) throws Exception { - Broker broker = context.getBroker(); + protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { + Broker broker = context.getConnectionContext().getBroker(); Set destinations = broker.getDestinations(destination); for (Iterator iter = destinations.iterator(); iter.hasNext();) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Mar 9 07:59:14 2007 @@ -23,9 +23,12 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; + import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; @@ -316,7 +319,8 @@ } - public void send(final ConnectionContext context,final Message message) throws Exception{ + public void send(final ProducerBrokerExchange producerExchange,final Message message) throws Exception { + final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if(message.isExpired()){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Mar 9 07:59:14 2007 @@ -236,8 +236,9 @@ - public void send(final ConnectionContext context, final Message message) throws Exception { - + public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { + final ConnectionContext context = producerExchange.getConnectionContext(); + // There is delay between the client sending it and it arriving at the // destination.. it may have expired. if( message.isExpired() ) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java Fri Mar 9 07:59:14 2007 @@ -17,16 +17,16 @@ */ package org.apache.activemq.broker.region.virtual; -import org.apache.activemq.broker.ConnectionContext; +import java.util.Collection; +import java.util.Iterator; + +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.MessageEvaluationContext; -import java.util.Collection; -import java.util.Iterator; - /** * Represents a composite {@link Destination} where send()s are replicated to * each Destination instance. @@ -46,7 +46,7 @@ this.copyMessage = copyMessage; } - public void send(ConnectionContext context, Message message) throws Exception { + public void send(ProducerBrokerExchange context, Message message) throws Exception { MessageEvaluationContext messageContext = null; for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java Fri Mar 9 07:59:14 2007 @@ -16,18 +16,18 @@ */ package org.apache.activemq.broker.region.virtual; -import org.apache.activemq.broker.ConnectionContext; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationMap; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - /** * Implements Virtual @@ -77,7 +77,7 @@ protected Destination createCompositeDestination(Destination destination, final List destinations) { return new DestinationFilter(destination) { - public void send(ConnectionContext context, Message messageSend) throws Exception { + public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { for (Iterator iter = destinations.iterator(); iter.hasNext();) { Destination destination = (Destination) iter.next(); destination.send(context, messageSend); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java Fri Mar 9 07:59:14 2007 @@ -17,7 +17,7 @@ */ package org.apache.activemq.broker.region.virtual; -import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.command.ActiveMQDestination; @@ -41,7 +41,7 @@ this.postfix = postfix; } - public void send(ConnectionContext context, Message message) throws Exception { + public void send(ProducerBrokerExchange context, Message message) throws Exception { ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination()); send(context, message, queueConsumers); }