From commits-return-7214-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Oct 19 03:55:40 2007 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 31705 invoked from network); 19 Oct 2007 03:55:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Oct 2007 03:55:37 -0000 Received: (qmail 64454 invoked by uid 500); 19 Oct 2007 03:55:25 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 64420 invoked by uid 500); 19 Oct 2007 03:55:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 64411 invoked by uid 99); 19 Oct 2007 03:55:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Oct 2007 20:55:25 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2007 03:55:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 84BB01A983E; Thu, 18 Oct 2007 20:54:46 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r586251 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Date: Fri, 19 Oct 2007 03:54:46 -0000 To: commits@activemq.apache.org From: jlim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071019035446.84BB01A983E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlim Date: Thu Oct 18 20:54:45 2007 New Revision: 586251 URL: http://svn.apache.org/viewvc?rev=586251&view=rev Log: applied patch for AMQ-1440 and AMQ-1439 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=586251&r1=586250&r2=586251&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Oct 18 20:54:45 2007 @@ -301,6 +301,14 @@ reconnectTask.shutdown(); } + public int getMinAckCount() { + return minAckCount; + } + + public void setMinAckCount(int minAckCount) { + this.minAckCount = minAckCount; + } + public long getInitialReconnectDelay() { return initialReconnectDelay; } @@ -338,24 +346,14 @@ try { synchronized (reconnectMutex) { - // If it was a request and it was not being tracked by - // the state tracker, - // then hold it in the requestMap so that we can replay - // it later. - boolean fanout = isFanoutCommand(command); - if (stateTracker.track(command) == null && command.isResponseRequired()) { - int size = fanout ? minAckCount : 1; - requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); - } - // Wait for transport to be connected. - while (connectedCount != minAckCount && !disposed && connectionFailure == null) { + while (connectedCount < minAckCount && !disposed && connectionFailure == null) { LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); reconnectMutex.wait(1000); } // Still not fully connected. - if (connectedCount != minAckCount) { + if (connectedCount < minAckCount) { Exception error; @@ -374,6 +372,16 @@ throw IOExceptionSupport.create(error); } + // If it was a request and it was not being tracked by + // the state tracker, + // then hold it in the requestMap so that we can replay + // it later. + boolean fanout = isFanoutCommand(command); + if (stateTracker.track(command) == null && command.isResponseRequired()) { + int size = fanout ? minAckCount : 1; + requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); + } + // Send the message. if (fanout) { for (Iterator iter = transports.iterator(); iter.hasNext();) { @@ -543,4 +551,5 @@ public boolean isFaultTolerant() { return true; } + }