Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 78071 invoked from network); 15 Mar 2006 19:04:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 15 Mar 2006 19:04:57 -0000 Received: (qmail 82120 invoked by uid 500); 15 Mar 2006 19:04:56 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 82075 invoked by uid 500); 15 Mar 2006 19:04:56 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 82066 invoked by uid 99); 15 Mar 2006 19:04:56 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2006 11:04:56 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 15 Mar 2006 11:04:55 -0800 Received: (qmail 77699 invoked by uid 65534); 15 Mar 2006 19:04:34 -0000 Message-ID: <20060315190434.77694.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r386132 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridgeSupport.java ForwardingBridge.java Date: Wed, 15 Mar 2006 19:04:34 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Wed Mar 15 11:04:32 2006 New Revision: 386132 URL: http://svn.apache.org/viewcvs?rev=386132&view=rev Log: disable the use of range acks with network connectors since that could cause the broker to block waiting for messages to be consumed (in the case of big messages being sent). Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=386132&r1=386131&r2=386132&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Mar 15 11:04:32 2006 @@ -375,11 +375,18 @@ serviceLocalException(er.getException()); } } - int dispatched = sub.incrementDispatched(); - if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){ - localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched)); - sub.setDispatched(0); - } + + // Ack on every message since we don't know if the broker is blocked due to memory + // usage and is waiting for an Ack to un-block him. + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + + // Acking a range is more efficient, but also more prone to locking up a server + // Perhaps doing something like the following should be policy based. +// int dispatched = sub.incrementDispatched(); +// if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){ +// localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched)); +// sub.setDispatched(0); +// } } }else if(command.isBrokerInfo()){ serviceLocalBrokerInfo(command); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=386132&r1=386131&r2=386132&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Mar 15 11:04:32 2006 @@ -218,19 +218,25 @@ remoteBroker.oneway( message ); - if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) { - queueDispatched++; - if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) { - localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched)); - queueDispatched=0; - } - } else { - topicDispatched++; - if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) { - localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched)); - topicDispatched=0; - } - } + // Ack on every message since we don't know if the broker is blocked due to memory + // usage and is waiting for an Ack to un-block him. + localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1)); + + // Acking a range is more efficient, but also more prone to locking up a server + // Perhaps doing something like the following should be policy based. +// if( md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) { +// queueDispatched++; +// if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2) ) { +// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, queueDispatched)); +// queueDispatched=0; +// } +// } else { +// topicDispatched++; +// if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2) ) { +// localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, topicDispatched)); +// topicDispatched=0; +// } +// } } else if(command.isBrokerInfo() ) { synchronized( this ) { localBrokerId = ((BrokerInfo)command).getBrokerId();