activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r516444 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ virtual/
Date Fri, 09 Mar 2007 15:59:16 GMT
Author: chirino
Date: Fri Mar  9 07:59:14 2007
New Revision: 516444

URL: http://svn.apache.org/viewvc?view=rev&rev=516444
Log:
Refactor so that the ProducerBrokerExchange is passed all the way down to the Topic and Queue
implementations.
This is laying the ground work to implement window based producer flow control.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Fri Mar  9 07:59:14 2007
@@ -303,7 +303,7 @@
             producerExchange.setRegionDestination(regionDestination);
         }
         
-        producerExchange.getRegionDestination().send(context, messageSend);
+        producerExchange.getRegionDestination().send(producerExchange, messageSend);
     }
 
     public void acknowledge(ConsumerBrokerExchange consumerExchange,MessageAck ack) throws
Exception{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Fri Mar  9 07:59:14 2007
@@ -21,6 +21,7 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -36,7 +37,7 @@
     void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
     void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
     
-    void send(ConnectionContext context, Message messageSend) throws Exception;
+    void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
     boolean lock(MessageReference node, LockOwner lockOwner);
     void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final
MessageReference node) throws IOException;
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Fri Mar  9 07:59:14 2007
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -89,7 +90,7 @@
         next.removeSubscription(context, sub);
     }
 
-    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    public void send(ProducerBrokerExchange context, Message messageSend) throws Exception
{
         next.send(context, messageSend);
     }
 
@@ -104,8 +105,8 @@
     /**
      * Sends a message to the given destination which may be a wildcard
      */
-    protected void send(ConnectionContext context, Message message, ActiveMQDestination destination)
throws Exception {
-        Broker broker = context.getBroker();
+    protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination
destination) throws Exception {
+        Broker broker = context.getConnectionContext().getBroker();
         Set destinations = broker.getDestinations(destination);
 
         for (Iterator iter = destinations.iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar  9 07:59:14 2007
@@ -23,9 +23,12 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -316,7 +319,8 @@
 
     }
 
-    public void send(final ConnectionContext context,final Message message) throws Exception{
+    public void send(final ProducerBrokerExchange producerExchange,final Message message)
throws Exception {
+    	final ConnectionContext context = producerExchange.getConnectionContext(); 
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if(message.isExpired()){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Mar  9 07:59:14 2007
@@ -236,8 +236,9 @@
     
 
 
-    public void send(final ConnectionContext context, final Message message) throws Exception
{
-
+    public void send(final ProducerBrokerExchange producerExchange, final Message message)
throws Exception {
+    	final ConnectionContext context = producerExchange.getConnectionContext();
+    	
     	// There is delay between the client sending it and it arriving at the
     	// destination.. it may have expired.
     	if( message.isExpired() ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java
Fri Mar  9 07:59:14 2007
@@ -17,16 +17,16 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
-import java.util.Collection;
-import java.util.Iterator;
-
 /**
  * Represents a composite {@link Destination} where send()s are replicated to
  * each Destination instance.
@@ -46,7 +46,7 @@
         this.copyMessage = copyMessage;
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange context, Message message) throws Exception {
         MessageEvaluationContext messageContext = null;
 
         for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
Fri Mar  9 07:59:14 2007
@@ -16,18 +16,18 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationMap;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 /**
  * Implements <a
  * href="http://activemq.apache.org/virtual-destinations.html">Virtual
@@ -77,7 +77,7 @@
 
     protected Destination createCompositeDestination(Destination destination, final List
destinations) {
         return new DestinationFilter(destination) {
-            public void send(ConnectionContext context, Message messageSend) throws Exception
{
+            public void send(ProducerBrokerExchange context, Message messageSend) throws
Exception {
                 for (Iterator iter = destinations.iterator(); iter.hasNext();) {
                     Destination destination = (Destination) iter.next();
                     destination.send(context, messageSend);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?view=diff&rev=516444&r1=516443&r2=516444
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Fri Mar  9 07:59:14 2007
@@ -17,7 +17,7 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
-import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -41,7 +41,7 @@
         this.postfix = postfix;
     }
 
-    public void send(ConnectionContext context, Message message) throws Exception {
+    public void send(ProducerBrokerExchange context, Message message) throws Exception {
         ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
         send(context, message, queueConsumers);
     }



Mime
View raw message