tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r380209 [3/12] - in /tomcat/container/tc5.5.x/modules: groupcom/ groupcom/etc/ groupcom/src/ groupcom/src/share/ groupcom/src/share/org/ groupcom/src/share/org/apache/ groupcom/src/share/org/apache/catalina/ groupcom/src/share/org/apache/ca...
Date Thu, 23 Feb 2006 19:55:25 GMT
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,282 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.mcast;
+
+
+import java.net.MulticastSocket;
+import java.io.IOException;
+import java.net.InetAddress ;
+import java.net.DatagramPacket;
+import org.apache.catalina.cluster.MembershipListener;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast membership service.
+ * This class is responsible for maintaining a list of active cluster nodes in the cluster.
+ * If a node fails to send out a heartbeat, the node will be dismissed.
+ * This is the low level implementation that handles the multicasting sockets.
+ * Need to fix this, could use java.nio and only need one thread to send and receive, or
+ * just use a timeout on the receive
+ * @author Filip Hanik
+ * @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec 2005) $
+ */
+public class McastServiceImpl
+{
+    private static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( McastService.class );
+    /**
+     * Internal flag used for the listen thread that listens to the multicasting socket.
+     */
+    protected boolean doRun = false;
+    /**
+     * Socket that we intend to listen to
+     */
+    protected MulticastSocket socket;
+    /**
+     * The local member that we intend to broad cast over and over again
+     */
+    protected McastMember member;
+    /**
+     * The multicast address
+     */
+    protected InetAddress address;
+    /**
+     * The multicast port
+     */
+    protected int port;
+    /**
+     * The time it takes for a member to expire.
+     */
+    protected long timeToExpiration;
+    /**
+     * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration
+     */
+    protected long sendFrequency;
+    /**
+     * Reuse the sendPacket, no need to create a new one everytime
+     */
+    protected DatagramPacket sendPacket;
+    /**
+     * Reuse the receivePacket, no need to create a new one everytime
+     */
+    protected DatagramPacket receivePacket;
+    /**
+     * The membership, used so that we calculate memberships when they arrive or don't arrive
+     */
+    protected McastMembership membership;
+    /**
+     * The actual listener, for callback when shits goes down
+     */
+    protected MembershipListener service;
+    /**
+     * Thread to listen for pings
+     */
+    protected ReceiverThread receiver;
+    /**
+     * Thread to send pings
+     */
+    protected SenderThread sender;
+
+    /**
+     * When was the service started
+     */
+    protected long serviceStartTime = System.currentTimeMillis();
+    
+    protected int mcastTTL = -1;
+    protected int mcastSoTimeout = -1;
+    protected InetAddress mcastBindAddress = null;
+
+    /**
+     * Create a new mcast service impl
+     * @param member - the local member
+     * @param sendFrequency - the time (ms) in between pings sent out
+     * @param expireTime - the time (ms) for a member to expire
+     * @param port - the mcast port
+     * @param bind - the bind address (not sure this is used yet)
+     * @param mcastAddress - the mcast address
+     * @param service - the callback service
+     * @throws IOException
+     */
+    public McastServiceImpl(
+        McastMember member,
+        long sendFrequency,
+        long expireTime,
+        int port,
+        InetAddress bind,
+        InetAddress mcastAddress,
+        int ttl,
+        int soTimeout,
+        MembershipListener service)
+    throws IOException {
+        this.member = member;
+        address = mcastAddress;
+        this.port = port;
+        this.mcastSoTimeout = soTimeout;
+        this.mcastTTL = ttl;
+        this.mcastBindAddress = bind;
+        setupSocket();
+        sendPacket = new DatagramPacket(new byte[1000],1000);
+        sendPacket.setAddress(address);
+        sendPacket.setPort(port);
+        receivePacket = new DatagramPacket(new byte[1000],1000);
+        receivePacket.setAddress(address);
+        receivePacket.setPort(port);
+        membership = new McastMembership(member.getName());
+        timeToExpiration = expireTime;
+        this.service = service;
+        this.sendFrequency = sendFrequency;
+    }
+    
+    protected void setupSocket() throws IOException {
+        if (mcastBindAddress != null) socket = new MulticastSocket(new java.net.
+            InetSocketAddress(mcastBindAddress, port));
+        else socket = new MulticastSocket(port);
+        if (mcastBindAddress != null) {
+			if(log.isInfoEnabled())
+                log.info("Setting multihome multicast interface to:" +
+                         mcastBindAddress);
+            socket.setInterface(mcastBindAddress);
+        } //end if
+        if ( mcastSoTimeout >= 0 ) {
+ 			if(log.isInfoEnabled())
+                log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
+            socket.setSoTimeout(mcastSoTimeout);
+        }
+        if ( mcastTTL >= 0 ) {
+			if(log.isInfoEnabled())
+                log.info("Setting cluster mcast TTL to " + mcastTTL);
+            socket.setTimeToLive(mcastTTL);
+        }
+    }
+
+    /**
+     * Start the service
+     * @param level 1 starts the receiver, level 2 starts the sender
+     * @throws IOException if the service fails to start
+     * @throws IllegalStateException if the service is already started
+     */
+    public synchronized void start(int level) throws IOException {
+        if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running.");
+        if ( level == 1 ) {
+            socket.joinGroup(address);
+            doRun = true;
+            receiver = new ReceiverThread();
+            receiver.setDaemon(true);
+            receiver.start();
+        }
+        if ( level==2 ) {
+            serviceStartTime = System.currentTimeMillis();
+            sender = new SenderThread(sendFrequency);
+            sender.setDaemon(true);
+            sender.start();
+            
+        }
+    }
+
+    /**
+     * Stops the service
+     * @throws IOException if the service fails to disconnect from the sockets
+     */
+    public synchronized void stop() throws IOException {
+        socket.leaveGroup(address);
+        doRun = false;
+        sender = null;
+        receiver = null;
+        serviceStartTime = Long.MAX_VALUE;
+    }
+
+    /**
+     * Receive a datagram packet, locking wait
+     * @throws IOException
+     */
+    public void receive() throws IOException {
+        socket.receive(receivePacket);
+        byte[] data = new byte[receivePacket.getLength()];
+        System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
+        McastMember m = McastMember.getMember(data);
+        if(log.isDebugEnabled())
+            log.debug("Mcast receive ping from member " + m);
+        if ( membership.memberAlive(m) ) {
+            if(log.isDebugEnabled())
+                log.debug("Mcast add member " + m);
+            service.memberAdded(m);
+        }
+        McastMember[] expired = membership.expire(timeToExpiration);
+        for ( int i=0; i<expired.length; i++) {
+            if(log.isDebugEnabled())
+                log.debug("Mcast exipre  member " + m);
+            service.memberDisappeared(expired[i]);
+        }
+    }
+
+    /**
+     * Send a ping
+     * @throws Exception
+     */
+    public void send() throws Exception{
+        member.inc();
+        if(log.isDebugEnabled())
+            log.debug("Mcast send ping from member " + member);
+        byte[] data = member.getData(this.serviceStartTime);
+        DatagramPacket p = new DatagramPacket(data,data.length);
+        p.setAddress(address);
+        p.setPort(port);
+        socket.send(p);
+    }
+
+    public long getServiceStartTime() {
+       return this.serviceStartTime;
+    }
+
+
+    public class ReceiverThread extends Thread {
+        public ReceiverThread() {
+            super();
+            setName("Cluster-MembershipReceiver");
+        }
+        public void run() {
+            while ( doRun ) {
+                try {
+                    receive();
+                } catch ( Exception x ) {
+                    log.warn("Error receiving mcast package. Sleeping 500ms",x);
+                    try { Thread.sleep(500); } catch ( Exception ignore ){}
+                    
+                }
+            }
+        }
+    }//class ReceiverThread
+
+    public class SenderThread extends Thread {
+        long time;
+        public SenderThread(long time) {
+            this.time = time;
+            setName("Cluster-MembershipSender");
+
+        }
+        public void run() {
+            while ( doRun ) {
+                try {
+                    send();
+                } catch ( Exception x ) {
+                    log.warn("Unable to send mcast message.",x);
+                }
+                try { Thread.sleep(time); } catch ( Exception ignore ) {}
+            }
+        }
+    }//class SenderThread
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml Thu Feb 23 11:55:14 2006
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mbeans-descriptors PUBLIC
+   "-//Apache Software Foundation//DTD Model MBeans Configuration File"
+   "http://jakarta.apache.org/commons/dtds/mbeans-descriptors.dtd">
+<mbeans-descriptors>
+
+  <mbean         name="McastService"
+           description="Cluster Membership service implementation"
+               domain="Catalina"
+                group="Cluster"
+                 type="org.apache.catalina.cluster.mcast.McastService">
+    <attribute   name="info"
+          description="Class version info"
+                 type="java.lang.String"
+                 writeable="false"/>
+    <attribute   name="mcastAddr"
+          description="Multicast IP Address"
+                 type="java.lang.String"/>
+    <attribute   name="mcastBindAddress"
+          description="Multicast IP Interface address (default auto)"
+                 type="java.lang.String"/>
+    <attribute   name="mcastPort"
+          description="Multicast UDP Port"
+                 type="int"/>
+    <attribute   name="mcastFrequency"
+          description="Ping Frequency at msec"
+                 type="long"/>
+    <attribute   name="mcastClusterDomain"
+          description="Cluster Domain of this member"
+                 type="java.lang.String"/>
+    <attribute   name="mcastDropTime"
+          description="Timeout from frequency ping after member disapper notify"
+                 type="long"/>
+    <attribute   name="mcastSoTimeout"
+          description="Multicast Socket Timeout"
+                 type="int"/>
+    <attribute   name="mcastTTL"
+          description=""
+                 type="int"/>
+    <attribute   name="localMemberName"
+          description="Complete local receiver information"
+                 type="java.lang.String"
+                 writeable="false"/>
+    <attribute   name="membersByName"
+          description="Complete remote sender information"
+                 type="[Ljava.lang.String;"
+                 writeable="false"/>
+
+    <operation   name="start"
+               description="Start the cluster membership"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+    
+    <operation name="stop"
+               description="Stop the cluster membership"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+                 
+  </mbean>
+
+</mbeans-descriptors>

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/package.html
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/package.html?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/package.html (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/package.html Thu Feb 23 11:55:14 2006
@@ -0,0 +1,11 @@
+<body>
+
+<p>This package contains code for Clustering, the base class
+of a Cluster is <code>org.apache.catalina.Cluster</code> implementations
+of this class is done when implementing a new Cluster protocol</p>
+
+<p>The only Cluster protocol currently implemented is a JavaGroups based<br>
+&nbsp;&nbsp;&nbsp;&nbsp;<b>JGCluster.java</b>
+</p>
+
+</body>

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,282 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.net.InetAddress;
+
+import org.apache.catalina.cluster.util.SmartQueue;
+
+/**
+ * Send cluster messages from a Message queue with only one socket. Ack and keep
+ * Alive Handling is supported.
+ * <ul>
+ * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
+ * sender and all messages are queued. Only use this for small maintaince
+ * isuses!</li>
+ * <li>waitForAck=true, means that receiver ack the transfer</li>
+ * <li>after one minute idle time, or number of request (100) the connection is
+ * reconnected with next request. Change this for production use!</li>
+ * <li>default ackTimeout is 15 sec: this is very low for big all session replication messages after restart a node</li>
+ * <li>disable keepAlive: keepAliveTimeout="-1" and keepAliveMaxRequestCount="-1"</li>
+ * </ul>
+ * 
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
+ */
+public class AsyncSocketSender extends DataSender {
+    
+    private static int threadCounter = 1;
+
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(AsyncSocketSender.class);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "AsyncSocketSender/2.0";
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * Message Queue
+     */
+    private SmartQueue queue = new SmartQueue();
+
+    /**
+     * Active thread to push messages asynchronous to the other replication node
+     */
+    private QueueThread queueThread = null;
+
+    /**
+     * Count number of queue message
+     */
+    private long inQueueCounter = 0;
+
+    /**
+     * Count all successfull push messages from queue
+     */
+    private long outQueueCounter = 0;
+
+    // ------------------------------------------------------------- Constructor
+
+    /**
+     * start background thread to push incomming cluster messages to replication
+     * node
+     * 
+     * @param domain replication cluster domain (session domain)
+     * @param host replication node tcp address
+     * @param port replication node tcp port
+     */
+    public AsyncSocketSender(String domain,InetAddress host, int port) {
+        super(domain,host, port);
+        checkThread();
+    }
+
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+
+    /**
+     * @return Returns the inQueueCounter.
+     */
+    public long getInQueueCounter() {
+        return inQueueCounter;
+    }
+
+    /**
+     * @return Returns the outQueueCounter.
+     */
+    public long getOutQueueCounter() {
+        return outQueueCounter;
+    }
+
+    /**
+     * @return Returns the queueSize.
+     */
+    public int getQueueSize() {
+        return queue.size();
+    }
+
+    /**
+     * @return Returns the queuedNrOfBytes.
+     */
+    public long getQueuedNrOfBytes() {
+        if(queueThread != null)
+            return queueThread.getQueuedNrOfBytes();
+        return 0l ;
+    }
+
+    // --------------------------------------------------------- Public Methods
+
+    /*
+     * Connect to socket and start background thread to ppush queued messages
+     * 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
+     */
+    public void connect() throws java.io.IOException {
+        super.connect();
+        checkThread();
+    }
+
+    /**
+     * Disconnect socket ad stop queue thread
+     * 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
+     */
+    public void disconnect() {
+        stopThread();
+        super.disconnect();
+    }
+
+    /**
+     * Send message to queue for later sending
+     * 
+     * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
+     */
+    public void sendMessage(ClusterData data)
+            throws java.io.IOException {
+        SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(data.getUniqueId(), data);
+        queue.add(entry);
+        synchronized (this) {
+            inQueueCounter++;
+            if(queueThread != null)
+                queueThread.incQueuedNrOfBytes(data.getMessage().length);
+       }
+        if (log.isTraceEnabled())
+            log.trace(sm.getString("AsyncSocketSender.queue.message",
+                    getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
+                            data.getMessage().length)));
+    }
+
+    /*
+     * Reset sender statistics
+     */
+    public synchronized void resetStatistics() {
+        super.resetStatistics();
+        inQueueCounter = queue.size();
+        outQueueCounter = 0;
+
+    }
+
+    /**
+     * Name of this SockerSender
+     */
+    public String toString() {
+        StringBuffer buf = new StringBuffer("AsyncSocketSender[");
+        buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
+        return buf.toString();
+    }
+
+    // --------------------------------------------------------- Public Methods
+
+    /**
+     * Start Queue thread as daemon
+     */
+    protected void checkThread() {
+        if (queueThread == null) {
+            if (log.isInfoEnabled())
+                log.info(sm.getString("AsyncSocketSender.create.thread",
+                        getAddress(), new Integer(getPort())));
+            queueThread = new QueueThread(this);
+            queueThread.setDaemon(true);
+            queueThread.start();
+        }
+    }
+
+    /**
+     * stop queue worker thread
+     */
+    protected void stopThread() {
+        if (queueThread != null) {
+            queueThread.stopRunning();
+            queueThread = null;
+        }
+    }
+
+    // -------------------------------------------------------- Inner Class
+
+    private class QueueThread extends Thread {
+        AsyncSocketSender sender;
+
+        private boolean keepRunning = true;
+
+        /**
+         * Current number of bytes from all queued messages
+         */
+        private long queuedNrOfBytes = 0;
+
+        public QueueThread(AsyncSocketSender sender) {
+            this.sender = sender;
+            setName("Cluster-AsyncSocketSender-" + (threadCounter++));
+        }
+
+        protected long getQueuedNrOfBytes() {
+            return queuedNrOfBytes ;
+        }
+        
+        protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes) {
+            this.queuedNrOfBytes = queuedNrOfBytes;
+        }
+
+        protected synchronized void incQueuedNrOfBytes(long size) {
+            queuedNrOfBytes += size;
+        }
+        
+        protected synchronized void decQueuedNrOfBytes(long size) {
+            queuedNrOfBytes -= size;
+        }
+
+        public void stopRunning() {
+            keepRunning = false;
+        }
+
+        /**
+         * Get one queued message and push it to the replication node
+         * 
+         * @see DataSender#pushMessage(String, byte[])
+         */
+        public void run() {
+            while (keepRunning) {
+                SmartQueue.SmartEntry entry = sender.queue.remove(5000);
+                if (entry != null) {
+                    int messagesize = 0;
+                    try {
+                        ClusterData data = (ClusterData) entry.getValue();
+                        messagesize = data.getMessage().length;
+                        sender.pushMessage(data);
+                    } catch (Exception x) {
+                        log.warn(sm.getString("AsyncSocketSender.send.error",
+                                entry.getKey()));
+                    } finally {
+                        outQueueCounter++;
+                        decQueuedNrOfBytes(messagesize);
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ClusterData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ClusterData.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ClusterData.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ClusterData.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,127 @@
+/*
+ * Copyright 1999,2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.tcp;
+
+import org.apache.catalina.cluster.ClusterMessage;
+
+
+/**
+ * The cluster data class is used to transport around the byte array from
+ * a ClusterMessage object. This is just a utility class to avoid having to 
+ * serialize and deserialize the ClusterMessage more than once. 
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ * @since 5.5.10
+ */
+public class ClusterData  {
+
+    private int resend = ClusterMessage.FLAG_DEFAULT ;
+    private int compress = ClusterMessage.FLAG_DEFAULT ;
+    private byte[] message ;
+    private long timestamp ;
+    private String uniqueId ;
+    private String type ;
+    
+    public ClusterData() {}
+    
+    /**
+     * @param type message type (class)
+     * @param uniqueId unique message id
+     * @param message message data
+     * @param timestamp message creation date
+     */
+    public ClusterData(String type, String uniqueId, byte[] message, long timestamp
+            ) {
+        this.uniqueId = uniqueId;
+        this.message = message;
+        this.timestamp = timestamp;
+    }
+    
+    
+    /**
+     * @return Returns the type.
+     */
+    public String getType() {
+        return type;
+    }
+    /**
+     * @param type The type to set.
+     */
+    public void setType(String type) {
+        this.type = type;
+    }
+    /**
+     * @return Returns the message.
+     */
+    public byte[] getMessage() {
+        return message;
+    }
+    /**
+     * @param message The message to set.
+     */
+    public void setMessage(byte[] message) {
+        this.message = message;
+    }
+    /**
+     * @return Returns the timestamp.
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+    /**
+     * @param timestamp The timestamp to set.
+     */
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+    /**
+     * @return Returns the uniqueId.
+     */
+    public String getUniqueId() {
+        return uniqueId;
+    }
+    /**
+     * @param uniqueId The uniqueId to set.
+     */
+    public void setUniqueId(String uniqueId) {
+        this.uniqueId = uniqueId;
+    }
+    /**
+     * @return Returns the compress.
+     */
+    public int getCompress() {
+        return compress;
+    }
+    /**
+     * @param compress The compress to set.
+     */
+    public void setCompress(int compress) {
+        this.compress = compress;
+    }
+    /**
+     * @return Returns the resend.
+     */
+    public int getResend() {
+        return resend;
+    }
+    /**
+     * @param resend The resend to set.
+     */
+    public void setResend(int resend) {
+        this.resend = resend;
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/Constants.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/Constants.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/Constants.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/Constants.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.cluster.tcp;
+
+/**
+ * Manifest constants for the <code>org.apache.catalina.cluster.tcp</code>
+ * package.
+ *
+ * @author Peter Rossbach
+ * @version $Revision: 303753 $ $Date: 2005-03-14 15:24:30 -0600 (Mon, 14 Mar 2005) $
+ */
+
+public class Constants {
+
+    public static final String Package = "org.apache.catalina.cluster.tcp";
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,932 @@
+/*
+ * Copyright 1999,2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.io.XByteBuffer;
+import org.apache.catalina.util.StringManager;
+
+/**
+ * Send cluster messages with only one socket. Ack and keep Alive Handling is
+ * supported
+ * 
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision: 377484 $ $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ * @since 5.5.7
+ */
+public class DataSender implements IDataSender {
+
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(DataSender.class);
+
+    /**
+     * The string manager for this package.
+     */
+    protected static StringManager sm = StringManager
+            .getManager(Constants.Package);
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "DataSender/2.1";
+
+    /**
+     * receiver address
+     */
+    private InetAddress address;
+
+    /**
+     * receiver port
+     */
+    private int port;
+
+    
+    /**
+     * cluster domain
+     */
+    private String domain;
+
+    /**
+     * current sender socket
+     */
+    private Socket socket = null;
+
+    /**
+     * is Socket really connected
+     */
+    private boolean isSocketConnected = false;
+
+    /**
+     * Message transfer over socket ?
+     */
+    private boolean isMessageTransferStarted = false;
+
+    /**
+     * sender is in suspect state (last transfer failed)
+     */
+    private SenderState senderState = new SenderState();
+
+    /**
+     * wait time for ack
+     */
+    private long ackTimeout;
+
+    /**
+     * number of requests
+     */
+    protected long nrOfRequests = 0;
+
+    /**
+     * total bytes to transfer
+     */
+    protected long totalBytes = 0;
+
+    /**
+     * number of connects
+     */
+    protected long connectCounter = 0;
+
+    /**
+     * number of explizit disconnects
+     */
+    protected long disconnectCounter = 0;
+
+    /**
+     * number of failing acks
+     */
+    protected long missingAckCounter = 0;
+
+    /**
+     * number of data resends (second trys after socket failure)
+     */
+    protected long dataResendCounter = 0;
+
+    /**
+     * number of data failure sends 
+     */
+    protected long dataFailureCounter = 0;
+    
+    /**
+     * doProcessingStats
+     */
+    protected boolean doProcessingStats = false;
+
+    /**
+     * proessingTime
+     */
+    protected long processingTime = 0;
+    
+    /**
+     * min proessingTime
+     */
+    protected long minProcessingTime = Long.MAX_VALUE ;
+
+    /**
+     * max proessingTime
+     */
+    protected long maxProcessingTime = 0;
+   
+    /**
+     * doWaitAckStats
+     */
+    protected boolean doWaitAckStats = false;
+
+    /**
+     * waitAckTime
+     */
+    protected long waitAckTime = 0;
+    
+    /**
+     * min waitAckTime
+     */
+    protected long minWaitAckTime = Long.MAX_VALUE ;
+
+    /**
+     * max waitAckTime
+     */
+    protected long maxWaitAckTime = 0;
+
+    /**
+     * keep socket open for no more than one min
+     */
+    private long keepAliveTimeout = 60 * 1000;
+
+    /**
+     * max requests before reconnecting (default -1 unlimited)
+     */
+    private int keepAliveMaxRequestCount = -1;
+
+    /**
+     * Last connect timestamp
+     */
+    protected long keepAliveConnectTime = 0;
+
+    /**
+     * keepalive counter
+     */
+    protected int keepAliveCount = 0;
+
+    /**
+     * wait for receiver Ack
+     */
+    private boolean waitForAck = false;
+
+    /**
+     * number of socket close
+     */
+    private int socketCloseCounter = 0 ;
+
+    /**
+     * number of socket open
+     */
+    private int socketOpenCounter = 0 ;
+
+    /**
+     * number of socket open failures
+     */
+    private int socketOpenFailureCounter = 0 ;
+
+    /**
+     * After failure make a resend
+     */
+    private boolean resend = false ;
+    
+    // ------------------------------------------------------------- Constructor
+    
+    public DataSender(String domain,InetAddress host, int port) {
+        this.address = host;
+        this.port = port;
+        this.domain = domain;
+        if (log.isDebugEnabled())
+            log.debug(sm.getString("IDataSender.create",address, new Integer(port)));
+    }
+
+    public DataSender(String domain,InetAddress host, int port, SenderState state) {
+        this(domain,host,port);
+        if ( state != null ) this.senderState = state;
+    }
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+
+    /**
+     * @return Returns the nrOfRequests.
+     */
+    public long getNrOfRequests() {
+        return nrOfRequests;
+    }
+
+    /**
+     * @return Returns the totalBytes.
+     */
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    /**
+     * @return Returns the avg totalBytes/nrOfRequests.
+     */
+    public long getAvgMessageSize() {
+        return totalBytes / nrOfRequests;
+    }
+
+    /**
+     * @return Returns the avg processingTime/nrOfRequests.
+     */
+    public double getAvgProcessingTime() {
+        return ((double)processingTime) / nrOfRequests;
+    }
+ 
+    /**
+     * @return Returns the maxProcessingTime.
+     */
+    public long getMaxProcessingTime() {
+        return maxProcessingTime;
+    }
+    
+    /**
+     * @return Returns the minProcessingTime.
+     */
+    public long getMinProcessingTime() {
+        return minProcessingTime;
+    }
+    
+    /**
+     * @return Returns the processingTime.
+     */
+    public long getProcessingTime() {
+        return processingTime;
+    }
+    
+    /**
+     * @return Returns the doProcessingStats.
+     */
+    public boolean isDoProcessingStats() {
+        return doProcessingStats;
+    }
+    
+    /**
+     * @param doProcessingStats The doProcessingStats to set.
+     */
+    public void setDoProcessingStats(boolean doProcessingStats) {
+        this.doProcessingStats = doProcessingStats;
+    }
+ 
+ 
+    /**
+     * @return Returns the doWaitAckStats.
+     */
+    public boolean isDoWaitAckStats() {
+        return doWaitAckStats;
+    }
+    
+    /**
+     * @param doWaitAckStats The doWaitAckStats to set.
+     */
+    public void setDoWaitAckStats(boolean doWaitAckStats) {
+        this.doWaitAckStats = doWaitAckStats;
+    }
+    
+    /**
+     * @return Returns the avg waitAckTime/nrOfRequests.
+     */
+    public double getAvgWaitAckTime() {
+        return ((double)waitAckTime) / nrOfRequests;
+    }
+ 
+    /**
+     * @return Returns the maxWaitAckTime.
+     */
+    public long getMaxWaitAckTime() {
+        return maxWaitAckTime;
+    }
+    
+    /**
+     * @return Returns the minWaitAckTime.
+     */
+    public long getMinWaitAckTime() {
+        return minWaitAckTime;
+    }
+    
+    /**
+     * @return Returns the waitAckTime.
+     */
+    public long getWaitAckTime() {
+        return waitAckTime;
+    }
+    
+    /**
+     * @return Returns the connectCounter.
+     */
+    public long getConnectCounter() {
+        return connectCounter;
+    }
+
+    /**
+     * @return Returns the disconnectCounter.
+     */
+    public long getDisconnectCounter() {
+        return disconnectCounter;
+    }
+
+    /**
+     * @return Returns the missingAckCounter.
+     */
+    public long getMissingAckCounter() {
+        return missingAckCounter;
+    }
+
+    /**
+     * @return Returns the socketOpenCounter.
+     */
+    public int getSocketOpenCounter() {
+        return socketOpenCounter;
+    }
+    
+    /**
+     * @return Returns the socketOpenFailureCounter.
+     */
+    public int getSocketOpenFailureCounter() {
+        return socketOpenFailureCounter;
+    }
+
+    /**
+     * @return Returns the socketCloseCounter.
+     */
+    public int getSocketCloseCounter() {
+        return socketCloseCounter;
+    }
+
+    /**
+     * @return Returns the dataResendCounter.
+     */
+    public long getDataResendCounter() {
+        return dataResendCounter;
+    }
+
+    /**
+     * @return Returns the dataFailureCounter.
+     */
+    public long getDataFailureCounter() {
+        return dataFailureCounter;
+    }
+    
+    /**
+     * @param address The address to set.
+     */
+    public void setAddress(InetAddress address) {
+        this.address = address;
+    }
+
+    public InetAddress getAddress() {
+        return address;
+    }
+
+    
+    /**
+     * @param port The port to set.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+    
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * @return Returns the domain.
+     */
+    public String getDomain() {
+        return domain;
+    }
+    
+    /**
+     * @param domain The domain to set.
+     */
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+    
+    public boolean isConnected() {
+        return isSocketConnected;
+    }
+
+    /**
+     * @return Is DataSender send a message
+     */
+    public boolean isMessageTransferStarted() {
+        return isMessageTransferStarted;
+    }
+    
+    /**
+     * @param isSocketConnected
+     *            The isSocketConnected to set.
+     */
+    protected void setSocketConnected(boolean isSocketConnected) {
+        this.isSocketConnected = isSocketConnected;
+    }
+
+    public boolean isSuspect() {
+        return senderState.isSuspect() || senderState.isFailing();
+    }
+
+    public boolean getSuspect() {
+        return isSuspect();
+    }
+
+    public void setSuspect(boolean suspect) {
+        if ( suspect ) 
+            this.senderState.setSuspect();
+        else
+            this.senderState.setReady();
+    }
+
+    public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    public void setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+    }
+
+    public long getKeepAliveTimeout() {
+        return keepAliveTimeout;
+    }
+
+    public void setKeepAliveTimeout(long keepAliveTimeout) {
+        this.keepAliveTimeout = keepAliveTimeout;
+    }
+
+    public int getKeepAliveMaxRequestCount() {
+        return keepAliveMaxRequestCount;
+    }
+
+    public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
+        this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
+    }
+
+    /**
+     * @return Returns the keepAliveConnectTime.
+     */
+    public long getKeepAliveConnectTime() {
+        return keepAliveConnectTime;
+    }
+
+    /**
+     * @return Returns the keepAliveCount.
+     */
+    public int getKeepAliveCount() {
+        return keepAliveCount;
+    }
+
+    /**
+     * @return Returns the waitForAck.
+     */
+    public boolean isWaitForAck() {
+        return waitForAck;
+    }
+
+    /**
+     * @param waitForAck
+     *            The waitForAck to set.
+     */
+    public void setWaitForAck(boolean waitForAck) {
+        this.waitForAck = waitForAck;
+    }
+
+    /**
+     * @return Returns the resend.
+     */
+    public boolean isResend() {
+        return resend;
+    }
+    /**
+     * @param resend The resend to set.
+     */
+    public void setResend(boolean resend) {
+        this.resend = resend;
+    }
+    /**
+     * @return Returns the socket.
+     */
+    public Socket getSocket() {
+        return socket;
+    }
+
+    public SenderState getSenderState() {
+        return senderState;
+    }
+
+    /**
+     * @param socket The socket to set.
+     */
+    public void setSocket(Socket socket) {
+        this.socket = socket;
+    }
+    // --------------------------------------------------------- Public Methods
+
+    /**
+     * Connect other cluster member receiver 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
+     */
+    public synchronized void connect() throws java.io.IOException {
+        if(!isMessageTransferStarted) {
+            openSocket();
+            if(isConnected()) {
+                connectCounter++;
+                if (log.isDebugEnabled())
+                    log.debug(sm.getString("IDataSender.connect", address.getHostAddress(),
+                            new Integer(port),new Long(connectCounter)));
+            }
+        } else 
+            if (log.isWarnEnabled())
+               log.warn(sm.getString("IDataSender.message.create", address.getHostAddress(),new Integer(port)));
+   }
+
+ 
+    /**
+     * disconnect and close socket
+     * 
+     * @see IDataSender#disconnect()
+     */
+    public synchronized void disconnect() {
+        if(!isMessageTransferStarted) {
+            boolean connect = isConnected() ;
+            closeSocket();
+            if(connect) {
+                disconnectCounter++;
+                if (log.isDebugEnabled())
+                    log.debug(sm.getString("IDataSender.disconnect", address.getHostAddress(),
+                        new Integer(port),new Long(disconnectCounter)));
+            }
+        } else 
+            if (log.isWarnEnabled())
+               log.warn(sm.getString("IDataSender.message.disconnect", address.getHostAddress(),new Integer(port)));
+        
+    }
+
+    /**
+     * Check, if time to close socket! Important for AsyncSocketSender that
+     * replication thread is not fork again! <b>Only work when keepAliveTimeout
+     * or keepAliveMaxRequestCount greater -1 </b>
+     * FIXME Can we close a socket when a message wait for ack?
+     * @return true, is socket close
+     * @see DataSender#closeSocket()
+     */
+    public synchronized boolean checkKeepAlive() {
+        boolean isCloseSocket = true ;
+        if(!isMessageTransferStarted) {
+            if(isConnected()) {
+                if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
+                    || (keepAliveMaxRequestCount > -1 && keepAliveCount >= keepAliveMaxRequestCount)) {
+                        closeSocket();
+               } else
+                    isCloseSocket = false ;
+            }
+        } else
+            isCloseSocket = false ;
+        
+        return isCloseSocket;
+    }
+
+    /**
+     * Send message
+     * 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(,
+     *      ClusterData)
+     */
+    public synchronized void sendMessage(ClusterData data)
+            throws java.io.IOException {
+        pushMessage(data);
+    }
+
+    /**
+     * Reset sender statistics
+     */
+    public synchronized void resetStatistics() {
+        nrOfRequests = 0;
+        totalBytes = 0;
+        disconnectCounter = 0;
+        connectCounter = isConnected() ? 1 : 0;
+        missingAckCounter = 0;
+        dataResendCounter = 0;
+        dataFailureCounter = 0 ;
+        socketOpenCounter =isConnected() ? 1 : 0;
+        socketOpenFailureCounter = 0 ;
+        socketCloseCounter = 0;
+        processingTime = 0 ;
+        minProcessingTime = Long.MAX_VALUE ;
+        maxProcessingTime = 0 ;
+        waitAckTime = 0 ;
+        minWaitAckTime = Long.MAX_VALUE ;
+        maxWaitAckTime = 0 ;
+    }
+
+    /**
+     * Name of this SockerSender
+     */
+    public String toString() {
+        StringBuffer buf = new StringBuffer("DataSender[");
+        buf.append(getAddress()).append(":").append(getPort()).append("]");
+        return buf.toString();
+    }
+
+    // --------------------------------------------------------- Protected Methods
+ 
+    /**
+     * open real socket and set time out when waitForAck is enabled
+     * is socket open return directly
+     * @throws IOException
+     * @throws SocketException
+     */
+    protected void openSocket() throws IOException, SocketException {
+       if(isConnected())
+           return ;
+       try {
+            createSocket();
+            if (isWaitForAck())
+                socket.setSoTimeout((int) ackTimeout);
+            isSocketConnected = true;
+            socketOpenCounter++;
+            this.keepAliveCount = 0;
+            this.keepAliveConnectTime = System.currentTimeMillis();
+            if (log.isDebugEnabled())
+                log.debug(sm.getString("IDataSender.openSocket", address
+                        .getHostAddress(), new Integer(port),new Long(socketOpenCounter)));
+      } catch (IOException ex1) {
+            socketOpenFailureCounter++ ;
+            if (log.isDebugEnabled())
+                log.debug(sm.getString("IDataSender.openSocket.failure",
+                        address.getHostAddress(), new Integer(port),new Long(socketOpenFailureCounter)), ex1);
+            throw ex1;
+        }
+        
+     }
+
+    /**
+     * @throws IOException
+     * @throws SocketException
+     */
+    protected void createSocket() throws IOException, SocketException {
+        socket = new Socket(getAddress(), getPort());
+    }
+
+    /**
+     * close socket
+     * 
+     * @see DataSender#disconnect()
+     * @see DataSender#closeSocket()
+     */
+    protected void closeSocket() {
+        if(isConnected()) {
+             if (socket != null) {
+                try {
+                    socket.close();
+                } catch (IOException x) {
+                } finally {
+                    socket = null;
+                }
+            }
+            this.keepAliveCount = 0;
+            isSocketConnected = false;
+            socketCloseCounter++;
+            if (log.isDebugEnabled())
+                log.debug(sm.getString("IDataSender.closeSocket",
+                        address.getHostAddress(), new Integer(port),new Long(socketCloseCounter)));
+       }
+    }
+
+    /**
+     * Add statistic for this socket instance
+     * 
+     * @param length
+     */
+    protected void addStats(int length) {
+        nrOfRequests++;
+        totalBytes += length;
+        if (log.isInfoEnabled() && (nrOfRequests % 1000) == 0) {
+            log.info(sm.getString("IDataSender.stats", new Object[] {
+                    getAddress().getHostAddress(), new Integer(getPort()),
+                    new Long(totalBytes), new Long(nrOfRequests),
+                    new Long(totalBytes / nrOfRequests),
+                    new Long(getProcessingTime()),
+                    new Double(getAvgProcessingTime())}));
+        }
+    }
+
+    /**
+     * Add processing stats times
+     * @param startTime
+     */
+    protected void addProcessingStats(long startTime) {
+        long time = System.currentTimeMillis() - startTime ;
+        if(time < minProcessingTime)
+            minProcessingTime = time ;
+        if( time > maxProcessingTime)
+            maxProcessingTime = time ;
+        processingTime += time ;
+    }
+    
+    /**
+     * Add waitAck stats times
+     * @param startTime
+     */
+    protected void addWaitAckStats(long startTime) {
+        long time = System.currentTimeMillis() - startTime ;
+        if(time < minWaitAckTime)
+            minWaitAckTime = time ;
+        if( time > maxWaitAckTime)
+            maxWaitAckTime = time ;
+        waitAckTime += time ;
+    }
+    /**
+     * Push messages with only one socket at a time
+     * Wait for ack is needed and make auto retry when write message is failed.
+     * After sending error close and reopen socket again.
+     * 
+     * After successfull sending update stats
+     * 
+     * WARNING: Subclasses must be very carefull that only one thread call this pushMessage at once!!!
+     * 
+     * @see #closeSocket()
+     * @see #openSocket()
+     * @see #writeData(ClusterData)
+     * 
+     * @param data
+     *            data to send
+     * @throws java.io.IOException
+     * @since 5.5.10
+     */
+    protected void pushMessage( ClusterData data)
+            throws java.io.IOException {
+        long time = 0 ;
+        if(doProcessingStats) {
+            time = System.currentTimeMillis();
+        }
+        boolean messageTransfered = false ;
+        synchronized(this) {
+            checkKeepAlive();
+            if (!isConnected())
+                openSocket();
+            else if(keepAliveTimeout > -1)
+                this.keepAliveConnectTime = System.currentTimeMillis();
+        }
+        IOException exception = null;
+        try {
+             writeData(data);
+             messageTransfered = true ;
+        } catch (java.io.IOException x) {
+            if(data.getResend() == ClusterMessage.FLAG_ALLOWED || 
+                    (data.getResend() == ClusterMessage.FLAG_DEFAULT && isResend() )) {
+                // second try with fresh connection
+                dataResendCounter++;
+                if (log.isTraceEnabled())
+                    log.trace(sm.getString("IDataSender.send.again", address.getHostAddress(),
+                            new Integer(port)),x);
+                synchronized(this) {
+                    closeSocket();
+                    openSocket();
+                }
+                try {
+                    writeData(data);
+                    messageTransfered = true;
+                } catch (IOException xx) {
+                    exception = xx;
+                    throw xx ;
+                }
+            } else {
+                synchronized(this) {
+                    closeSocket();
+                }
+                exception = x;
+                throw x ;
+            }
+        } finally {
+            this.keepAliveCount++;
+            checkKeepAlive();
+            if(doProcessingStats) {
+                addProcessingStats(time);
+            }
+            if(messageTransfered) {
+                addStats(data.getMessage().length);
+                if (log.isTraceEnabled()) {
+                    log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),
+                        new Integer(port), data.getUniqueId(), new Long(data.getMessage().length)));
+                }
+            } else {
+                dataFailureCounter++;
+                throw exception;
+            }
+        }
+    }
+
+    /**
+     * Sent real cluster Message to socket stream
+     * FIXME send compress
+     * @param data
+     * @throws IOException
+     * @since 5.5.10
+     */
+    protected void writeData(ClusterData data) throws IOException { 
+        synchronized(this) {
+            isMessageTransferStarted = true ;
+        }
+        try {
+            OutputStream out = socket.getOutputStream();
+            out.write(XByteBuffer.createDataPackage(data));
+            out.flush();
+            if (isWaitForAck())
+                waitForAck(ackTimeout);
+        } finally {
+            synchronized(this) {
+                isMessageTransferStarted = false ;
+            }
+        }
+    }
+
+    /**
+     * Wait for Acknowledgement from other server
+     * FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.
+     * @param timeout
+     * @throws java.io.IOException
+     * @throws java.net.SocketTimeoutException
+     */
+    protected void waitForAck(long timeout) throws java.io.IOException {
+        long time = 0 ;
+        if(doWaitAckStats) {
+            time = System.currentTimeMillis();
+        }
+        try {
+            int bytesRead = 0;
+            if ( log.isTraceEnabled() ) 
+                log.trace(sm.getString("IDataSender.ack.start",getAddress(), new Integer(socket.getLocalPort())));
+            int i = socket.getInputStream().read();
+            while ((i != -1) && (i != 3) && bytesRead < 10) {
+                if ( log.isTraceEnabled() ) 
+                    log.trace(sm.getString("IDataSender.ack.read",getAddress(), new Integer(socket.getLocalPort()),new Character((char) i)));
+                bytesRead++;
+                i = socket.getInputStream().read();
+            }
+            if (i != 3) {
+                if (i == -1) {
+                    throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort())));
+                } else {
+                    throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
+                }
+            } else {
+                if (log.isTraceEnabled()) {
+                    log.trace(sm.getString("IDataSender.ack.receive", getAddress(),new Integer(socket.getLocalPort())));
+                }
+            }
+        } catch (IOException x) {
+            missingAckCounter++;
+            String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),
+                                         new Integer(socket.getLocalPort()), 
+                                         new Long(this.ackTimeout));
+            if ( !this.isSuspect() ) {
+                this.setSuspect(true);
+                if ( log.isWarnEnabled() ) log.warn(errmsg, x);
+            } else {
+                if ( log.isDebugEnabled() )log.debug(errmsg, x);
+            }
+            throw x;
+        } finally {
+            if(doWaitAckStats) {
+                addWaitAckStats(time);
+            }
+        }
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSenders.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSenders.properties?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSenders.properties (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/DataSenders.properties Thu Feb 23 11:55:14 2006
@@ -0,0 +1,4 @@
+fastasyncqueue=org.apache.catalina.cluster.tcp.FastAsyncSocketSender
+asynchronous=org.apache.catalina.cluster.tcp.AsyncSocketSender
+synchronous=org.apache.catalina.cluster.tcp.SocketSender
+pooled=org.apache.catalina.cluster.tcp.PooledSocketSender

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,491 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.net.InetAddress;
+
+import org.apache.catalina.cluster.util.FastQueue;
+import org.apache.catalina.cluster.util.LinkObject;
+import org.apache.catalina.cluster.util.IQueue;
+
+/**
+ * Send cluster messages from a Message queue with only one socket. Ack and keep
+ * Alive Handling is supported. Fast Queue can limit queue size and consume all messages at queue at one block.<br/>
+ * Limit the queue lock contention under high load!<br/>
+ * <ul>
+ * <li>With autoConnect=false at ReplicationTransmitter, you can disconnect the
+ * sender and all messages are queued. Only use this for small maintaince
+ * isuses!</li>
+ * <li>waitForAck=true, means that receiver ack the transfer</li>
+ * <li>after one minute idle time, or number of request (100) the connection is
+ * reconnected with next request. Change this for production use!</li>
+ * <li>default ackTimeout is 15 sec: this is very low for big all session
+ * replication messages after restart a node</li>
+ * <li>disable keepAlive: keepAliveTimeout="-1" and
+ * keepAliveMaxRequestCount="-1"</li>
+ * <li>maxQueueLength: Limit the sender queue length (membership goes well, but transfer is failure!!)</li>
+ * </ul>
+ * FIXME: refactor code duplications with AsyncSocketSender => configurable or extract super class 
+ * @author Peter Rossbach ( idea comes form Rainer Jung)
+ * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
+ * @since 5.5.9
+ */
+public class FastAsyncSocketSender extends DataSender {
+
+    private static int threadCounter = 1;
+
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(FastAsyncSocketSender.class);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "FastAsyncSocketSender/3.0";
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * Message Queue
+     */
+    private FastQueue queue = new FastQueue();
+
+    /**
+     * Active thread to push messages asynchronous to the other replication node
+     */
+    private FastQueueThread queueThread = null;
+
+    /**
+     * Count number of queue message
+     */
+    private long inQueueCounter = 0;
+
+    /**
+     * Count all successfull push messages from queue
+     */
+    private long outQueueCounter = 0;
+
+    private int threadPriority = Thread.NORM_PRIORITY;;
+
+    // ------------------------------------------------------------- Constructor
+
+    /**
+     * start background thread to push incomming cluster messages to replication
+     * node
+     * 
+     * @param domain replication cluster domain (session domain)
+     * @param host replication node tcp address
+     * @param port replication node tcp port
+     */
+    public FastAsyncSocketSender(String domain,InetAddress host, int port) {
+        super(domain,host, port);
+        checkThread();
+    }
+  
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+ 
+
+    /**
+     * get current add wait timeout 
+     * @return current wait timeout
+     */
+    public long getQueueAddWaitTimeout() {
+        
+        return queue.getAddWaitTimeout();
+    }
+
+    /**
+     * Set add wait timeout (default 10000 msec)
+     * @param timeout
+     */
+    public void setQueueAddWaitTimeout(long timeout) {
+        queue.setAddWaitTimeout(timeout);
+    }
+
+    /**
+     * get current remove wait timeout
+     * @return The timeout
+     */
+    public long getQueueRemoveWaitTimeout() {
+        return queue.getRemoveWaitTimeout();
+    }
+
+    /**
+     * set remove wait timeout ( default 30000 msec)
+     * @param timeout
+     */
+    public void setRemoveWaitTimeout(long timeout) {
+        queue.setRemoveWaitTimeout(timeout);
+    }
+
+    /**
+     * @return Returns the checkLock.
+     */
+    public boolean isQueueCheckLock() {
+        return queue.isCheckLock();
+    }
+    /**
+     * @param checkLock The checkLock to set.
+     */
+    public void setQueueCheckLock(boolean checkLock) {
+        queue.setCheckLock(checkLock);
+    }
+    /**
+     * @return Returns the doStats.
+     */
+    public boolean isQueueDoStats() {
+        return queue.isDoStats();
+    }
+    /**
+     * @param doStats The doStats to set.
+     */
+    public void setQueueDoStats(boolean doStats) {
+        queue.setDoStats(doStats);
+    }
+    /**
+     * @return Returns the timeWait.
+     */
+    public boolean isQueueTimeWait() {
+        return queue.isTimeWait();
+    }
+    /**
+     * @param timeWait The timeWait to set.
+     */
+    public void setQueueTimeWait(boolean timeWait) {
+        queue.setTimeWait(timeWait);
+    }
+        
+    /**
+     * @return Returns the inQueueCounter.
+     */
+    public int getMaxQueueLength() {
+        return queue.getMaxQueueLength();
+    }
+
+    /**
+     * @param length max queue length
+     */
+    public void setMaxQueueLength(int length) {
+        queue.setMaxQueueLength(length);
+    }
+
+    /**
+     * @return Returns the add wait times.
+     */
+    public long getQueueAddWaitTime() {
+        return queue.getAddWait();
+    }
+
+    /**
+     * @return Returns the add wait times.
+     */
+    public long getQueueRemoveWaitTime() {
+        return queue.getRemoveWait();
+    }
+
+    /**
+     * @return Returns the inQueueCounter.
+     */
+    public long getInQueueCounter() {
+        return inQueueCounter;
+    }
+
+    /**
+     * @return Returns the outQueueCounter.
+     */
+    public long getOutQueueCounter() {
+        return outQueueCounter;
+    }
+
+    /**
+     * @return Returns the queueSize.
+     */
+    public int getQueueSize() {
+        return queue.getSize();
+    }
+
+    /**
+     * change active the queue Thread priority 
+     * @param threadPriority value must be between MIN and MAX Thread Priority
+     * @exception IllegalArgumentException
+     */
+    public void setThreadPriority(int threadPriority) {
+        if (log.isDebugEnabled())
+            log.debug(sm.getString("FastAsyncSocketSender.setThreadPriority",
+                    getAddress().getHostAddress(), new Integer(getPort()),
+                    new Integer(threadPriority)));
+        if (threadPriority < Thread.MIN_PRIORITY) {
+            throw new IllegalArgumentException(sm.getString(
+                    "FastAsyncSocketSender.min.exception", getAddress()
+                            .getHostAddress(), new Integer(getPort()),
+                    new Integer(threadPriority)));
+        } else if (threadPriority > Thread.MAX_PRIORITY) {
+            throw new IllegalArgumentException(sm.getString(
+                    "FastAsyncSocketSender.max.exception", getAddress()
+                            .getHostAddress(), new Integer(getPort()),
+                    new Integer(threadPriority)));
+        }
+        this.threadPriority = threadPriority;
+        if (queueThread != null)
+            queueThread.setPriority(threadPriority);
+    }
+
+    /**
+     * Get the current threadPriority
+     * @return The thread priority
+     */
+    public int getThreadPriority() {
+        return threadPriority;
+    }
+
+    /**
+     * @return Returns the queuedNrOfBytes.
+     */
+    public long getQueuedNrOfBytes() {
+        if(queueThread != null)
+            return queueThread.getQueuedNrOfBytes();
+        return 0l ;
+    }
+
+    // --------------------------------------------------------- Public Methods
+
+    /**
+     * Connect to socket and start background thread to push queued messages
+     * 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
+     */
+    public void connect() throws java.io.IOException {
+        super.connect();
+        checkThread();
+        if(!queue.isEnabled())
+            queue.start() ;
+    }
+
+    /**
+     * Disconnect socket ad stop queue thread
+     * 
+     * @see org.apache.catalina.cluster.tcp.IDataSender#disconnect()
+     */
+    public void disconnect() {
+        stopThread();
+        // delete "remove" lock at queue
+        queue.stop() ;
+        // enable that sendMessage can add new messages
+        queue.start() ;
+        // close socket
+        super.disconnect();
+    }
+
+    /**
+     * Send message to queue for later sending.
+     * 
+     * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
+     */
+    public void sendMessage(ClusterData data)
+            throws java.io.IOException {
+        queue.add(data.getUniqueId(), data);
+        synchronized (this) {
+            inQueueCounter++;
+            if(queueThread != null)
+                queueThread.incQueuedNrOfBytes(data.getMessage().length);
+        }
+        if (log.isTraceEnabled())
+            log.trace(sm.getString("AsyncSocketSender.queue.message",
+                    getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
+                            data.getMessage().length)));
+    }
+
+    /**
+     * Reset sender statistics
+     */
+    public synchronized void resetStatistics() {
+        super.resetStatistics();
+        inQueueCounter = queue.getSize();
+        outQueueCounter = 0;
+        queue.resetStatistics();
+    }
+
+    /**
+     * Name of this SockerSender
+     */
+    public String toString() {
+        StringBuffer buf = new StringBuffer("FastAsyncSocketSender[");
+        buf.append(getAddress().getHostAddress()).append(":").append(getPort()).append("]");
+        return buf.toString();
+    }
+
+    // --------------------------------------------------------- Public Methods
+
+    /**
+     * Start Queue thread as daemon
+     */
+    protected void checkThread() {
+        if (queueThread == null) {
+            if (log.isInfoEnabled())
+                log.info(sm.getString("AsyncSocketSender.create.thread",
+                        getAddress(), new Integer(getPort())));
+            queueThread = new FastQueueThread(this, queue);
+            queueThread.setDaemon(true);
+            queueThread.setPriority(getThreadPriority());
+            queueThread.start();
+        }
+    }
+
+    /**
+     * stop queue worker thread
+     */
+    protected void stopThread() {
+        if (queueThread != null) {
+            queueThread.stopRunning();
+            queueThread = null;
+        }
+    }
+
+    // -------------------------------------------------------- Inner Class
+
+    private class FastQueueThread extends Thread {
+
+        
+        /**
+         * Sender queue
+         */
+        private IQueue queue = null;
+
+        /**
+         * Active sender
+         */
+        private FastAsyncSocketSender sender = null;
+
+        /**
+         * Thread is active
+         */
+        private boolean keepRunning = true;
+
+        /**
+         * Current number of bytes from all queued messages
+         */
+        private long queuedNrOfBytes = 0;
+
+       
+
+        /**
+         * Only use inside FastAsyncSocketSender
+         * @param sender
+         * @param queue
+         */
+        private FastQueueThread(FastAsyncSocketSender sender, IQueue queue) {
+            setName("Cluster-FastAsyncSocketSender-" + (threadCounter++));
+            this.queue = queue;
+            this.sender = sender;
+        }
+        
+        /**
+         * @return Returns the queuedNrOfBytes.
+         */
+        public long getQueuedNrOfBytes() {
+            return queuedNrOfBytes;
+        }
+        
+        protected synchronized void setQueuedNrOfBytes(long queuedNrOfBytes) {
+            this.queuedNrOfBytes = queuedNrOfBytes;
+        }
+
+        protected synchronized void incQueuedNrOfBytes(long size) {
+            queuedNrOfBytes += size;
+        }
+        
+        protected synchronized void decQueuedNrOfBytes(long size) {
+            queuedNrOfBytes -= size;
+        }
+
+
+        /**
+         * Stop backend thread!
+         */
+         public void stopRunning() {
+            keepRunning = false;
+        }
+        
+        
+        /**
+         * Get the objects from queue and send all mesages to the sender.
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            while (keepRunning) {
+                LinkObject entry = getQueuedMessage();
+                if (entry != null) {
+                    pushQueuedMessages(entry);
+                } else {
+                    if (keepRunning) {
+                        log.warn(sm.getString("AsyncSocketSender.queue.empty",
+                                sender.getAddress(), new Integer(sender
+                                        .getPort())));
+                    }
+                }
+            }
+        }
+
+        /**
+         * Get List of queue cluster messages
+         * @return list of cluster messages
+         */
+        protected LinkObject getQueuedMessage() {
+            // get a link list of all queued objects
+            if (log.isTraceEnabled())
+                log.trace("Queuesize before=" + ((FastQueue) queue).getSize());
+            LinkObject entry = queue.remove();
+            if (log.isTraceEnabled())
+                log.trace("Queuesize after=" + ((FastQueue) queue).getSize());
+            return entry;
+        }
+
+        /**
+         * @param entry
+         */
+        protected void pushQueuedMessages(LinkObject entry) {
+            do {
+                int messagesize = 0;
+                try {
+                    ClusterData data = (ClusterData) entry.data();
+                    messagesize = data.getMessage().length;
+                    sender.pushMessage(data);
+                } catch (Exception x) {
+                    log.warn(sm.getString(
+                            "AsyncSocketSender.send.error", entry
+                                    .getKey()), x);
+                } finally {
+                    outQueueCounter++;
+                    decQueuedNrOfBytes(messagesize);
+                }
+                entry = entry.next();
+            } while (entry != null);
+        }
+
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,45 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+/**
+ * @author Peter Rossbach
+ * @version $Revision: 303993 $ $Date: 2005-07-16 16:05:54 -0500 (Sat, 16 Jul 2005) $
+ * @since 5.5.7
+ */
+
+public interface IDataSender
+{
+    public void setAddress(java.net.InetAddress address);
+    public java.net.InetAddress getAddress();
+    public void setPort(int port);
+    public int getPort();
+    public void connect() throws java.io.IOException;
+    public void disconnect();
+    public void sendMessage(ClusterData data) throws java.io.IOException;
+    public boolean isConnected();
+    public void setSuspect(boolean suspect);
+    public boolean getSuspect();
+    public void setAckTimeout(long timeout);
+    public long getAckTimeout();
+    public boolean isWaitForAck();
+    public void setWaitForAck(boolean isWaitForAck);
+    public boolean checkKeepAlive();
+    public String getDomain() ;
+    public void setDomain(String domain) ;
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/IDataSenderFactory.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,185 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.util.StringManager;
+
+/**
+ * Create DataSender for different modes. DataSender factory load mode list from 
+ * <code>org/apache/catalina/cluster/tcp/DataSenders.properties</code> resource.
+ * 
+ * @author Peter Rossbach
+ * @version $Revision: 304032 $ $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ * @since 5.5.7
+ */
+public class IDataSenderFactory {
+
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(IDataSenderFactory.class);
+    
+    private static final String DATASENDERS_PROPERTIES = "org/apache/catalina/cluster/tcp/DataSenders.properties";
+    public static final String SYNC_MODE = "synchronous";
+    public static final String ASYNC_MODE = "asynchronous";
+    public static final String POOLED_SYNC_MODE = "pooled";
+    public static final String FAST_ASYNC_QUEUE_MODE = "fastasyncqueue";
+
+    /**
+     * The string manager for this package.
+     */
+    protected static StringManager sm = StringManager
+            .getManager(Constants.Package);
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "IDataSenderFactory/2.0";
+
+    private IDataSenderFactory() {
+    }
+
+    private Properties senderModes;
+
+    private static IDataSenderFactory factory ;
+
+    static {
+        factory = new IDataSenderFactory();
+        factory.loadSenderModes();
+    }
+
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public static String getInfo() {
+        return (info);
+    }
+    
+    // ------------------------------------------------------------- static
+
+    /**
+     * Create a new DataSender
+     * @param mode replicaton mode 
+     * @param mbr sender target
+     * @return new sender object
+     * @throws java.io.IOException
+     */
+    public synchronized static IDataSender getIDataSender(String mode,
+            Member mbr) throws java.io.IOException {
+       // Identify the class name of the DataSender we should configure
+       IDataSender sender = factory.getSender(mode,mbr);
+       if(sender == null)
+           throw new java.io.IOException("Invalid replication mode=" + mode);          
+       return sender ;    
+    }
+
+    /**
+     * Check that mode is valid
+     * @param mode
+     * @return The replication mode (may be null if sender mode)
+     */
+    public static String validateMode(String mode) {
+        if(factory.isSenderMode(mode))
+            return null ;
+        else {
+            StringBuffer buffer = new StringBuffer("Replication mode has to be '");
+            for (Iterator iter = factory.senderModes.keySet().iterator(); iter.hasNext();) {
+                String key = (String) iter.next();  
+                buffer.append(key);
+                if(iter.hasNext())
+                    buffer.append("', '");
+            }
+            return buffer.toString();        
+        }
+    }
+    
+    // ------------------------------------------------------------- private
+
+    private boolean isSenderMode(String mode){
+        return senderModes != null && senderModes.containsKey(mode) ;           
+    }
+
+    private IDataSender getSender(String mode,Member mbr) {
+        IDataSender sender = null;
+        String senderName = null;
+        senderName = senderModes.getProperty(mode);
+        if (senderName != null) {
+
+            // Instantiate and install a data replication sender of the requested class
+            try {
+                Class senderClass = Class.forName(senderName);
+                Class paramTypes[] = new Class[3];
+                paramTypes[0] = Class.forName("java.lang.String");
+                paramTypes[1] = Class.forName("java.net.InetAddress");
+                paramTypes[2] = Integer.TYPE ;
+                Constructor constructor = senderClass.getConstructor(paramTypes);
+                if (constructor != null) {
+                    Object paramValues[] = new Object[3];
+                    paramValues[0] = mbr.getDomain();
+                    paramValues[1] = InetAddress.getByName(mbr.getHost());
+                    paramValues[2] = new Integer(mbr.getPort());
+                    sender = (IDataSender) constructor.newInstance(paramValues);
+                } else {
+                    log.error(sm.getString("IDataSender.senderModes.Instantiate",
+                            senderName));
+                }
+            } catch (Throwable t) {
+                log.error(sm.getString("IDataSender.senderModes.Instantiate",
+                        senderName), t);
+            }
+        } else {
+            log.error(sm.getString("IDataSender.senderModes.Missing", mode));
+        }
+        return sender;
+    }
+
+    private synchronized void loadSenderModes() {
+        // Load our mapping properties if necessary
+        if (senderModes == null) {
+            try {
+                InputStream is = IDataSender.class
+                        .getClassLoader()
+                        .getResourceAsStream(
+                                DATASENDERS_PROPERTIES);
+                if (is != null) {
+                    senderModes = new Properties();
+                    senderModes.load(is);
+                } else {
+                    log.error(sm.getString("IDataSender.senderModes.Resources"));
+                    return;
+                }
+            } catch (IOException e) {
+                log.error(sm.getString("IDataSender.senderModes.Resources"), e);
+                return;
+            }
+        }
+
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message