Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 95193 invoked from network); 27 Feb 2006 15:43:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 27 Feb 2006 15:43:42 -0000 Received: (qmail 45226 invoked by uid 500); 27 Feb 2006 15:43:34 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 45177 invoked by uid 500); 27 Feb 2006 15:43:33 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 45166 invoked by uid 500); 27 Feb 2006 15:43:33 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 45163 invoked by uid 99); 27 Feb 2006 15:43:33 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Feb 2006 07:43:33 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 27 Feb 2006 07:43:31 -0800 Received: (qmail 93682 invoked by uid 65534); 27 Feb 2006 15:43:11 -0000 Message-ID: <20060227154311.93675.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r381364 - in /tomcat/container/tc5.5.x/modules: groupcom/src/share/org/apache/catalina/tribes/ groupcom/src/share/org/apache/catalina/tribes/group/ groupcom/src/share/org/apache/catalina/tribes/io/ groupcom/src/share/org/apache/catalina/tri... Date: Mon, 27 Feb 2006 15:43:05 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: fhanik Date: Mon Feb 27 07:43:00 2006 New Revision: 381364 URL: http://svn.apache.org/viewcvs?rev=381364&view=rev Log: Created a load test, also implemented a "synchronized/asynchronized" option, since we are using TCP. Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java?rev=381364&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ByteMessage.java Mon Feb 27 07:43:00 2006 @@ -0,0 +1,44 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes; + +import java.io.Serializable; + +/** + * A byte message is not serialized and deserialized by the channel + * @author Filip Hanik + * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ + */ + +public class ByteMessage implements Serializable { + private byte[] message; + + public ByteMessage() { + + } + public ByteMessage(byte[] data) { + message = data; + } + + public byte[] getMessage() { + return message; + } + + public void setMessage(byte[] message) { + this.message = message; + } + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Mon Feb 27 07:43:00 2006 @@ -74,7 +74,7 @@ * @param options int - sender options, see class documentation * @return ClusterMessage[] - the replies from the members, if any. */ - public void send(Member[] destination, Serializable msg, int options) throws ChannelException; + public void send(Member[] destination, Serializable msg) throws ChannelException; /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelReceiver.java Mon Feb 27 07:43:00 2006 @@ -20,7 +20,6 @@ /** * Cluster Receiver Interface * @author Filip Hanik - * @author Peter Rossbach * @version $Revision: 379904 $, $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ */ public interface ChannelReceiver { @@ -35,8 +34,11 @@ */ public void stop(); - - public boolean isSendAck(); + /** + * returns true of the receiver is sending acks when it receives messages + * @return boolean + */ + public boolean getSendAck(); /** * set ack mode @@ -44,9 +46,6 @@ */ public void setSendAck(boolean isSendAck); - public boolean isCompress() ; - public void setCompress(boolean compress); - /** * get the listing ip interface * @return The host Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java Mon Feb 27 07:43:00 2006 @@ -40,10 +40,6 @@ public void sendMessage(ChannelMessage message) throws java.io.IOException; - public boolean isWaitForAck(); + public boolean getWaitForAck(); public void setWaitForAck(boolean isWaitForAck); - - public boolean isCompress() ; - public void setCompress(boolean compress); - } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Mon Feb 27 07:43:00 2006 @@ -83,13 +83,10 @@ try { //synchronize, big time FIXME membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort()); - clusterReceiver.setSendAck(clusterSender.isWaitForAck()); - clusterReceiver.setCompress(clusterSender.isCompress()); //end FIXME - - if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX); if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.start(); if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.start(); + if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX); if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) membershipService.start(MembershipService.MBR_TX); }catch ( ChannelException cx ) { throw cx; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Feb 27 07:43:00 2006 @@ -33,6 +33,7 @@ import org.apache.catalina.tribes.ManagedChannel; import java.util.Iterator; import java.util.UUID; +import org.apache.catalina.tribes.ByteMessage; /** * The GroupChannel manages the replication channel. It coordinates @@ -43,6 +44,8 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel { + public static final int BYTE_MESSAGE = 0x0001; + private ChannelCoordinator coordinator = new ChannelCoordinator(); private ChannelInterceptor interceptors = null; private MembershipListener membershipListener; @@ -86,15 +89,23 @@ * @param options int - sender options, see class documentation * @return ClusterMessage[] - the replies from the members, if any. */ - public void send(Member[] destination, Serializable msg, int options) throws ChannelException { + public void send(Member[] destination, Serializable msg) throws ChannelException { if ( msg == null ) return; try { + int options = 0; ClusterData data = new ClusterData(); data.setAddress(getLocalMember()); data.setUniqueId(UUID.randomUUID().toString()); data.setTimestamp(System.currentTimeMillis()); + byte[] b = null; + if ( msg instanceof ByteMessage ){ + b = ((ByteMessage)msg).getMessage(); + options = options | BYTE_MESSAGE; + } else { + b = XByteBuffer.serialize(msg); + } data.setOptions(options); - byte[] b = XByteBuffer.serialize(msg); + data.setMessage(b); getFirstInterceptor().sendMessage(destination, data, null); }catch ( Exception x ) { @@ -105,7 +116,13 @@ public void messageReceived(ChannelMessage msg) { if ( msg == null ) return; try { - Serializable fwd = XByteBuffer.deserialize(msg.getMessage()); + + Serializable fwd = null; + if ( (msg.getOptions() & BYTE_MESSAGE) == BYTE_MESSAGE ) { + fwd = new ByteMessage(msg.getMessage()); + } else { + fwd = XByteBuffer.deserialize(msg.getMessage()); + } if ( channelListener != null ) channelListener.messageReceived(fwd,msg.getAddress()); }catch ( Exception x ) { log.error("Unable to deserialize channel message.",x); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ListenCallback.java Mon Feb 27 07:43:00 2006 @@ -25,7 +25,6 @@ * when data has been received. The interface does not care about * objects and marshalling and just passes the bytes straight through. * @author Filip Hanik - * @author Peter Rossbach * @version $Revision: 303987 $, $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $ */ public interface ListenCallback @@ -37,13 +36,4 @@ */ public void messageDataReceived(ChannelMessage data); - /** receiver must be send ack - */ - public boolean isSendAck() ; - - /** send ack - * - */ - public void sendAck() throws java.io.IOException ; - } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Mon Feb 27 07:43:00 2006 @@ -110,6 +110,14 @@ } /** + * Returns the number of packages that the reader has read + * @return int + */ + public int count() { + return buffer.countPackages(); + } + + /** * Write Ack to sender * @param buf * @return The bytes written count Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Mon Feb 27 07:43:00 2006 @@ -39,16 +39,12 @@ * Transfer package: *
    *
  • START_DATA/b> - 7 bytes - FLT2002
  • - *
  • COMPRESS - 4 bytes - is message compressed flag
  • + *
  • OPTIONS - 4 bytes - message options, implementation specific
  • *
  • SIZE - 4 bytes - size of the data package
  • *
  • DATA - should be as many bytes as the prev SIZE
  • *
  • END_DATA - 7 bytes - TLF2003
  • *
- * FIXME: Why we not use a list of byte buffers? - * FIXME: Used a pool of buffers instead, every time new generation - * * @author Filip Hanik - * @author Peter Rossbach * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $ */ public class XByteBuffer Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Mon Feb 27 07:43:00 2006 @@ -256,7 +256,7 @@ * String representation of this object */ public String toString() { - return "org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+host+","+port+", alive="+memberAliveTime+"]"; + return "org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+getHostname()+","+port+", alive="+memberAliveTime+"]"; } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Mon Feb 27 07:43:00 2006 @@ -738,8 +738,8 @@ protected void addStats(int length) { nrOfRequests++; totalBytes += length; - if (log.isInfoEnabled() && (nrOfRequests % 1000) == 0) { - log.info(sm.getString("IDataSender.stats", new Object[] { + if (log.isDebugEnabled() && (nrOfRequests % 1000) == 0) { + log.debug(sm.getString("IDataSender.stats", new Object[] { getAddress().getHostAddress(), new Integer(getPort()), new Long(totalBytes), new Long(nrOfRequests), new Long(totalBytes / nrOfRequests), Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java Mon Feb 27 07:43:00 2006 @@ -29,12 +29,9 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.MessageListener; - import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ObjectReader; -import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.util.StringManager; -import java.io.Serializable; /** * @author Filip Hanik @@ -74,6 +71,7 @@ private Object interestOpsMutex = new Object(); private MessageListener listener = null; + private boolean sync; public ReplicationListener() { } @@ -119,7 +117,7 @@ */ public void start() { try { - pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex); + pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex, getWorkerThreadOptions()); } catch (Exception e) { log.error("ThreadPool can initilzed. Listener not started", e); return; @@ -290,7 +288,7 @@ log.debug("No TcpReplicationThread available"); } else { // invoking this wakes up the worker thread then returns - worker.serviceChannel(key, isSendAck()); + worker.serviceChannel(key); } } @@ -348,7 +346,7 @@ * * @return True if sending ACK */ - public boolean isSendAck() { + public boolean getSendAck() { return sendAck; } @@ -373,12 +371,32 @@ return tcpListenPort; } + public boolean isSync() { + return sync; + } + public MessageListener getMessageListener() { return listener; } public void setTcpListenPort(int tcpListenPort) { this.tcpListenPort = tcpListenPort; + } + + + public void setSynchronized(boolean sync) { + this.sync = sync; + } + + public boolean getSynchronized() { + return this.sync; + } + + public int getWorkerThreadOptions() { + int options = 0; + if ( getSynchronized() ) options = options |TcpReplicationThread.OPTION_SYNCHRONIZED; + if ( getSendAck() ) options = options |TcpReplicationThread.OPTION_SEND_ACK; + return options; } public void setMessageListener(MessageListener listener) { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx Mon Feb 27 07:43:00 2006 @@ -1,18 +1,20 @@ [PropertyInfo] bind,java.net.InetAddress,false,false, , ,true, compress,boolean,false,false, , ,true, -coordinator,org.apache.catalina.cluster.MessageListener,false,false, , ,true, -coordinator,org.apache.catalina.cluster.group.ChannelInterceptorBase,false,false,coordinator,coordinator,true, doListen,boolean,false,false, , ,false, host,String,false,false, , ,true, info,String,false,false, , ,true, interestOpsMutex,Object,false,false, , ,true, +listener,org.apache.catalina.tribes.MessageListener,false,false, , ,false, log,org.apache.commons.logging.Log,false,false, , ,false, -pool,org.apache.catalina.cluster.tcp.ThreadPool,false,false, , ,false, +messageListener,org.apache.catalina.tribes.MessageListener,false,false, , ,true, +pool,org.apache.catalina.tribes.tcp.ThreadPool,false,false, , ,false, port,int,false,false, , ,true, selector,java.nio.channels.Selector,false,false, , ,false, sendAck,boolean,false,false, , ,true, sm,org.apache.catalina.util.StringManager,false,false, , ,false, +sync,boolean,false,false,sync,sync,true, +synchronized,boolean,false,false, , ,true, tcpListenAddress,String,false,false, , ,true, tcpListenPort,int,false,false, , ,true, tcpSelectorTimeout,long,false,false, , ,true, Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Mon Feb 27 07:43:00 2006 @@ -16,38 +16,26 @@ package org.apache.catalina.tribes.tcp; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.zip.GZIPOutputStream; - -import javax.management.MBeanServer; import javax.management.ObjectName; -import org.apache.catalina.Container; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.util.IDynamicProperty; -import org.apache.catalina.core.StandardHost; import org.apache.catalina.util.StringManager; import org.apache.tomcat.util.IntrospectionUtils; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.io.*; /** * Transmit message to other cluster members * Actual senders are created based on the replicationMode * type - * FIXME i18n log messages - * FIXME compress data depends on message type and size - * TODO pause and resume senders * - * @author Peter Rossbach * @author Filip Hanik + * @author Peter Rossbach * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 2006) $ */ public class ReplicationTransmitter implements ChannelSender,IDynamicProperty { @@ -112,14 +100,7 @@ /** * autoConnect sender when next message send */ - private boolean autoConnect = false; - - /** - * Compress message data bytes - */ - private boolean compress = false; - - /** + private boolean autoConnect = false; /** * doTransmitterProcessingStats */ protected boolean doTransmitterProcessingStats = false; @@ -263,22 +244,6 @@ public ObjectName getObjectName() { return objectName; } - - /** - * @return Returns the compress. - */ - public boolean isCompress() { - return compress; - } - - /** - * @param compressMessageData - * The compress to set. - */ - public void setCompress(boolean compressMessageData) { - this.compress = compressMessageData; - } - /** * @return Returns the autoConnect. */ @@ -314,7 +279,7 @@ /** * @return Returns the waitForAck. */ - public boolean isWaitForAck() { + public boolean getWaitForAck() { return waitForAck; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java Mon Feb 27 07:43:00 2006 @@ -37,14 +37,14 @@ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ */ public class TcpReplicationThread extends WorkerThread { + public static final int OPTION_SEND_ACK = 0x0001; + public static final int OPTION_SYNCHRONIZED = 0x0002; + public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3}; private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); private ByteBuffer buffer = ByteBuffer.allocate (1024); private SelectionKey key; - private boolean sendAck=true; - - TcpReplicationThread () { } @@ -104,10 +104,9 @@ * to ignore read-readiness for this channel while the * worker thread is servicing it. */ - synchronized void serviceChannel (SelectionKey key, boolean sendAck) + synchronized void serviceChannel (SelectionKey key) { this.key = key; - this.sendAck=sendAck; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); this.notify(); // awaken the thread @@ -135,19 +134,42 @@ reader.append(buffer.array(),0,count); buffer.clear(); // make buffer empty } + + int pkgcnt = reader.count(); + + + + /** + * Use send ack here if you want to ack the request to the remote + * server before completing the request + * This is considered an asynchronized request + */ + if (sendAckAsync()) { + while ( pkgcnt > 0 ) { + sendAck(key,channel); + pkgcnt--; + } + } + //check to see if any data is available - int pkgcnt = reader.execute(); + pkgcnt = reader.execute(); + if (log.isTraceEnabled()) { log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() ); } - - if (sendAck) { + /** + * Use send ack here if you want the request to complete on this + * server before sending the ack to the remote server + * This is considered a synchronized request + */ + if (sendAckSync()) { while ( pkgcnt > 0 ) { sendAck(key,channel); pkgcnt--; } - } + } + if (count < 0) { // close channel on EOF, invalidates the key @@ -166,6 +188,20 @@ } } + + + public boolean sendAckSync() { + int options = getOptions(); + return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) && + ((OPTION_SYNCHRONIZED & options) == OPTION_SYNCHRONIZED); + } + + public boolean sendAckAsync() { + int options = getOptions(); + return ((OPTION_SEND_ACK & options) == OPTION_SEND_ACK) && + ((OPTION_SYNCHRONIZED & options) != OPTION_SYNCHRONIZED); + } + /** * send a reply-acknowledgement (6,2,3) Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java Mon Feb 27 07:43:00 2006 @@ -35,7 +35,7 @@ Object mutex = new Object(); Object interestOpsMutex = null; - ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex) throws Exception { + ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex, int threadOptions) throws Exception { // fill up the pool with worker threads this.interestOpsMutex = interestOpsMutex; for (int i = 0; i < poolSize; i++) { @@ -46,6 +46,7 @@ thread.setName (threadClass.getName()+"[" + (i + 1)+"]"); thread.setDaemon(true); thread.setPriority(Thread.MAX_PRIORITY); + thread.setOptions(threadOptions); thread.start(); idle.add (thread); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/WorkerThread.java Mon Feb 27 07:43:00 2006 @@ -25,14 +25,22 @@ { protected ThreadPool pool; protected boolean doRun = true; - + private int options; public void setPool(ThreadPool pool) { this.pool = pool; } - + + public void setOptions(int options) { + this.options = options; + } + public ThreadPool getPool() { return pool; + } + + public int getOptions() { + return options; } public void close() Modified: tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml?rev=381364&r1=381363&r2=381364&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml (original) +++ tomcat/container/tc5.5.x/modules/ha/etc/cluster-server.xml Mon Feb 27 07:43:00 2006 @@ -298,14 +298,15 @@ tcpListenAddress="auto" tcpListenPort="4002" tcpSelectorTimeout="100" - tcpThreadCount="6"/> + tcpThreadCount="6" + sendAck="true" + synchronized="true"/> + ackTimeout="15000" + waitForAck="true"/> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org