activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r515631 - in /activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq: broker/TransportConnection.java broker/region/PrefetchSubscription.java broker/region/TopicSubscription.java command/MessageDispatch.java
Date Wed, 07 Mar 2007 16:15:45 GMT
Author: chirino
Date: Wed Mar  7 08:15:43 2007
New Revision: 515631

URL: http://svn.apache.org/viewvc?view=rev&rev=515631
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1189

Modified:
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar  7 08:15:43 2007
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -789,7 +790,7 @@
         } else {
             if(message.isMessageDispatch()) {
                 MessageDispatch md=(MessageDispatch) message;
-                Runnable sub=(Runnable) md.getConsumer();
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
                 if(sub!=null){
                     sub.run();
@@ -807,10 +808,10 @@
 
             if(command.isMessageDispatch()){
                 MessageDispatch md=(MessageDispatch) command;
+                Runnable sub=md.getTransmitCallback();
                 broker.processDispatch(md);
                 Object consumer = md.getConsumer();
-                if (consumer instanceof Runnable) {
-                    Runnable sub=(Runnable) consumer;
+                if(sub!=null){
                     sub.run();
                 }
             }
@@ -947,7 +948,7 @@
                     Command command = (Command) iter.next();
                     if(command.isMessageDispatch()) {
                         MessageDispatch md=(MessageDispatch) command;
-                        Runnable sub=(Runnable) md.getConsumer();
+                        Runnable sub=md.getTransmitCallback();
                         broker.processDispatch(md);
                         if(sub!=null){
                             sub.run();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Mar  7 08:15:43 2007
@@ -392,7 +392,7 @@
             }
             
             if(info.isDispatchAsync()){
-                md.setConsumer(new Runnable(){
+                md.setTransmitCallback(new Runnable(){
                     public void run(){
                         // Since the message gets queued up in async dispatch, we don't want
to
                         // decrease the reference count until it gets put on the wire.

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Mar  7 08:15:43 2007
@@ -349,7 +349,7 @@
         }
                 
         if(info.isDispatchAsync()){
-            md.setConsumer(new Runnable(){
+            md.setTransmitCallback(new Runnable(){
                 public void run(){
                     node.getRegionDestination().getDestinationStatistics().getDispatched().increment();

                     node.decrementReferenceCount();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java?view=diff&rev=515631&r1=515630&r2=515631
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatch.java
Wed Mar  7 08:15:43 2007
@@ -36,6 +36,7 @@
 
     transient protected long deliverySequenceId;
     transient protected Object consumer;
+    transient protected Runnable transmitCallback;
     
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -103,5 +104,13 @@
     public Response visit(CommandVisitor visitor) throws Exception {
         return null;
     }
+
+	public Runnable getTransmitCallback() {
+		return transmitCallback;
+	}
+
+	public void setTransmitCallback(Runnable transmitCallback) {
+		this.transmitCallback = transmitCallback;
+	}
     
 }



Mime
View raw message