activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r386132 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridgeSupport.java ForwardingBridge.java
Date Wed, 15 Mar 2006 19:04:34 GMT
Author: chirino
Date: Wed Mar 15 11:04:32 2006
New Revision: 386132

URL: http://svn.apache.org/viewcvs?rev=386132&view=rev
Log:
disable the use of range acks with network connectors since that could cause the broker to
block waiting for messages to be consumed (in the case of big messages being sent).

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Mar 15 11:04:32 2006
@@ -375,11 +375,18 @@
                                 serviceLocalException(er.getException());
                             }
                         }
-                        int dispatched = sub.incrementDispatched();
-                        if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
-                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
-                            sub.setDispatched(0);
-                        }
+                        
+                      // Ack on every message since we don't know if the broker is blocked
due to memory
+                      // usage and is waiting for an Ack to un-block him. 
+                      localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+                      // Acking a range is more efficient, but also more prone to locking
up a server
+                      // Perhaps doing something like the following should be policy based.
+//                        int dispatched = sub.incrementDispatched();
+//                        if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
+//                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
+//                            sub.setDispatched(0);
+//                        }
                     }
                 }else if(command.isBrokerInfo()){
                     serviceLocalBrokerInfo(command);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Wed Mar 15 11:04:32 2006
@@ -218,19 +218,25 @@
 
                 remoteBroker.oneway( message );
                 
-                if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
-                    queueDispatched++;
-                    if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) {
-                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
queueDispatched));
-                        queueDispatched=0;
-                    }
-                } else {
-                    topicDispatched++;
-                    if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) {
-                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
topicDispatched));
-                        topicDispatched=0;
-                    }
-                }
+                // Ack on every message since we don't know if the broker is blocked due
to memory
+                // usage and is waiting for an Ack to un-block him. 
+                localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+                // Acking a range is more efficient, but also more prone to locking up a
server
+                // Perhaps doing something like the following should be policy based.
+//                if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
+//                    queueDispatched++;
+//                    if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) )
{
+//                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
queueDispatched));
+//                        queueDispatched=0;
+//                    }
+//                } else {
+//                    topicDispatched++;
+//                    if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) )
{
+//                        localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE,
topicDispatched));
+//                        topicDispatched=0;
+//                    }
+//                }
             } else if(command.isBrokerInfo() ) {
                 synchronized( this ) {
                     localBrokerId = ((BrokerInfo)command).getBrokerId();



Mime
View raw message