Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 77645 invoked from network); 20 Feb 2008 17:13:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2008 17:13:08 -0000 Received: (qmail 75737 invoked by uid 500); 20 Feb 2008 17:12:58 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 75687 invoked by uid 500); 20 Feb 2008 17:12:58 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 75676 invoked by uid 500); 20 Feb 2008 17:12:58 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 75673 invoked by uid 99); 20 Feb 2008 17:12:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Feb 2008 09:12:58 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Feb 2008 17:12:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5F1A61A9842; Wed, 20 Feb 2008 09:12:42 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r629539 - in /tomcat/trunk: java/org/apache/catalina/tribes/membership/ java/org/apache/catalina/tribes/transport/ java/org/apache/catalina/tribes/transport/nio/ test/org/apache/catalina/tribes/test/channel/ webapps/docs/config/ Date: Wed, 20 Feb 2008 17:12:29 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080220171242.5F1A61A9842@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: fhanik Date: Wed Feb 20 09:12:27 2008 New Revision: 629539 URL: http://svn.apache.org/viewvc?rev=629539&view=rev Log: Add buffer sizes to the UDP sockets Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java tomcat/trunk/webapps/docs/config/cluster-receiver.xml tomcat/trunk/webapps/docs/config/cluster-sender.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Wed Feb 20 09:12:27 2008 @@ -421,12 +421,15 @@ if ( log.isDebugEnabled() ) log.debug("Invalid member mcast package.",ax); } catch ( Exception x ) { - if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); - else log.debug("Error receiving mcast package. Sleeping 500ms",x); - try { Thread.sleep(500); } catch ( Exception ignore ){} - if ( (++errorCounter)>=recoveryCounter ) { - errorCounter=0; - new RecoveryThread(McastServiceImpl.this); + if (x instanceof InterruptedException) interrupted(); + else { + if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); + else log.debug("Error receiving mcast package. Sleeping 500ms",x); + try { Thread.sleep(500); } catch ( Exception ignore ){} + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java Wed Feb 20 09:12:27 2008 @@ -38,6 +38,8 @@ private boolean connected = false; private int rxBufSize = 25188; private int txBufSize = 43800; + private int udpRxBufSize = 25188; + private int udpTxBufSize = 43800; private boolean directBuffer = false; private int keepAliveCount = -1; private int requestCount = 0; @@ -330,6 +332,26 @@ public void setUdpPort(int udpPort) { this.udpPort = udpPort; + } + + + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Feb 20 09:12:27 2008 @@ -56,6 +56,9 @@ private int securePort = -1; private int rxBufSize = 43800; private int txBufSize = 25188; + private int udpRxBufSize = 43800; + private int udpTxBufSize = 25188; + private boolean listen = false; private RxTaskPool pool; private boolean direct = true; @@ -518,6 +521,22 @@ public void setUdpPort(int udpPort) { this.udpPort = udpPort; + } + + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Wed Feb 20 09:12:27 2008 @@ -250,6 +250,11 @@ setListen(true); if (selector!=null && datagramChannel!=null) { ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet + datagramChannel.socket().setSendBufferSize(getUdpTxBufSize()); + datagramChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); + datagramChannel.socket().setReuseAddress(getSoReuseAddress()); + datagramChannel.socket().setSoTimeout(getTimeout()); + datagramChannel.socket().setTrafficClass(getSoTrafficClass()); registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Wed Feb 20 09:12:27 2008 @@ -308,8 +308,12 @@ int total = 0; if (channel instanceof DatagramChannel) { DatagramChannel dchannel = (DatagramChannel)channel; - while ( total < command.length ) { - total += dchannel.send(buf, udpaddr); + //were using a shared channel, it's not thread safe + //TODO check optimization, one channel per thread + synchronized (dchannel) { + while ( total < command.length ) { + total += dchannel.send(buf, udpaddr); + } } } else { while ( total < command.length ) { Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java Wed Feb 20 09:12:27 2008 @@ -149,8 +149,8 @@ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); } else if (dataChannel!=null) { - dataChannel.socket().setSendBufferSize(getTxBufSize()); - dataChannel.socket().setReceiveBufferSize(getRxBufSize()); + dataChannel.socket().setSendBufferSize(getUdpTxBufSize()); + dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); dataChannel.socket().setSoTimeout((int)getTimeout()); dataChannel.socket().setReuseAddress(getSoReuseAddress()); dataChannel.socket().setTrafficClass(getSoTrafficClass()); Modified: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java (original) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java Wed Feb 20 09:12:27 2008 @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Random; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelListener; @@ -33,6 +35,7 @@ import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.io.XByteBuffer; /** */ @@ -81,11 +84,18 @@ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); Thread.sleep(500); System.err.println("Finished Single package NO_ACK ["+listener1.count+"]"); - assertEquals("Checking success messages.",1,listener1.count); + assertEquals("Checking success messages.",1,listener1.count.get()); } public void testDataSendNO_ACK() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpRxBufSize(1024*1024*10); + rb2.setUdpRxBufSize(1024*1024*10); + rb1.setUdpTxBufSize(1024*1024*10); + rb2.setUdpTxBufSize(1024*1024*10); System.err.println("Starting NO_ACK"); Thread[] threads = new Thread[threadCount]; for (int x=0; x=0 && nr0 && d.data.length>=4) { + //populate number + d.hasNr = true; + XByteBuffer.toBytes(number,d.data, 0); + } return d; } + + public int getNumber() { + if (!hasNr) return -1; + return XByteBuffer.toInt(this.data, 0); + } public static boolean verify(Data d) { boolean result = (d.length == d.data.length); - for ( int i=0; result && (i The sending buffer size on the receiving sockets. Value is in bytes, the default value is 25188 bytes. + + The receive buffer size on the datagram socket. + Default value is 25188 bytes. + + + The send buffer size on the datagram socket. + Default value is 43800 bytes. + Boolean value for the socket SO_KEEPALIVE option. Possible values are true or false. Modified: tomcat/trunk/webapps/docs/config/cluster-sender.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-sender.xml?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/config/cluster-sender.xml (original) +++ tomcat/trunk/webapps/docs/config/cluster-sender.xml Wed Feb 20 09:12:27 2008 @@ -90,6 +90,14 @@ The send buffer size on the socket. Default value is 43800 bytes. + + The receive buffer size on the datagram socket. + Default value is 25188 bytes. + + + The send buffer size on the datagram socket. + Default value is 43800 bytes. + Possible values are true or false. Set to true if you want the receiver to use direct bytebuffers when reading data --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org