tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r380209 [4/12] - in /tomcat/container/tc5.5.x/modules: groupcom/ groupcom/etc/ groupcom/src/ groupcom/src/share/ groupcom/src/share/org/ groupcom/src/share/org/apache/ groupcom/src/share/org/apache/catalina/ groupcom/src/share/org/apache/ca...
Date Thu, 23 Feb 2006 19:55:25 GMT
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Thu Feb 23 11:55:14 2006
@@ -0,0 +1,72 @@
+AsyncSocketSender.create.thread=Create sender [{0}:{1,number,integer}] queue thread to tcp background replication
+AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] id=[{2}] size={3}
+AsyncSocketSender.send.error=Unable to asynchronously send session with id=[{0}] - message will be ignored.
+AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] returned null element!
+cluster.mbean.register.already=MBean {0} already registered!
+FastAsyncSocketSender.setThreadPriority=[{0}:{1,number,integer}] set priority to {2}
+FastAsyncSocketSender.min.exception=[{0}:{1,number,integer}] new priority {2} < MIN_PRIORITY
+FastAsyncSocketSender.max.exception=[{0}:{1,number,integer}] new priority {2} > MAX_PRIORITY
+IDataSender.ack.eof=EOF reached at local port [{0}:{1,number,integer}]
+IDataSender.ack.receive=Got ACK at local port [{0}:{1,number,integer}]
+IDataSender.ack.missing=Unable to read acknowledgement from [{0}:{1,number,integer}] in {2,number,integer} ms. Disconnecting socket, and trying again.
+IDataSender.ack.read=Read wait ack char '{2}' [{0}:{1,number,integer}]
+IDataSender.ack.start=Waiting for ACK message [{0}:{1,number,integer}]
+IDataSender.ack.wrong=Missing correct ACK after 10 bytes read at local port [{0}:{1,number,integer}]
+IDataSender.closeSocket=Sender close socket to [{0}:{1,number,integer}] (close count {2,number,integer})
+IDataSender.connect=Sender connect to [{0}:{1,number,integer}] (connect count {2,number,integer})
+IDataSender.create=Create sender [{0}:{1,number,integer}]
+IDataSender.disconnect=Sender disconnect from [{0}:{1,number,integer}] (disconnect count {2,number,integer})
+IDataSender.message.disconnect=Message transfered: Sender can't disconnect from [{0}:{1,number,integer}]
+IDataSender.message.create=Message transfered: Sender can't create current socket [{0}:{1,number,integer}]
+IDataSender.openSocket=Sender open socket to [{0}:{1,number,integer}] (open count {2,number,integer})
+IDataSender.openSocket.failure=Open sender socket [{0}:{1,number,integer}] failure! (open failure count {2,number,integer})
+IDataSender.send.again=Send data again to [{0}:{1,number,integer}]
+IDataSender.send.crash=Send message crashed [{0}:{1,number,integer}] type=[{2}], id=[{3}]
+IDataSender.send.message=Send message to [{0}:{1,number,integer}] id=[{2}] size={3,number,integer}
+IDataSender.send.lost=Message lost: [{0}:{1,number,integer}] type=[{2}], id=[{3}]
+IDataSender.senderModes.Configured=Configured a data replication sender for mode {0}
+IDataSender.senderModes.Instantiate=Can't instantiate a data replication sender of class {0}
+IDataSender.senderModes.Missing=Can't configure a data replication sender for mode {0}
+IDataSender.senderModes.Resources=Can't load data replication sender mapping list
+IDataSender.stats=Send stats from [{0}:{1,number,integer}], Nr of bytes sent={2,number,integer} over {3} = {4,number,integer} bytes/request, processing time {5,number,integer} msec, avg processing time {6,number,integer} msec
+PoolSocketSender.senderQueue.sender.failed=PoolSocketSender create new sender to [{0}:{1,number,integer}] failed
+PoolSocketSender.noMoreSender=No socket sender available for client [{0}:{1,number,integer}] did it disappeared?
+ReplicationTransmitter.getProperty=get property {0}
+ReplicationTransmitter.setProperty=set property {0}: {1} old value {2}
+ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1}
+ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1}
+ReplicationValve.crossContext.add=add Cross Context session replication container to replicationValve threadlocal
+ReplicationValve.crossContext.registerSession=register Cross context session id={0} from context {1}
+ReplicationValve.crossContext.remove=remove Cross Context session replication container from replicationValve threadlocal
+ReplicationValve.crossContext.sendDelta=send Cross Context session delta from context {0}.
+ReplicationValve.filter.loading=Loading request filters={0}
+ReplicationValve.filter.token=Request filter={0}
+ReplicationValve.filter.token.failure=Unable to compile filter={0}
+ReplicationValve.invoke.uri=Invoking replication request on {0}
+ReplicationValve.nocluster=No cluster configured for this request.
+ReplicationValve.resetDeltaRequest=Cluster is standalone: reset Session Request Delta at context {0}
+ReplicationValve.send.failure=Unable to perform replication request.
+ReplicationValve.send.invalid.failure=Unable to send session [id={0}] invalid message over cluster.
+ReplicationValve.session.found=Context {0}: Found session {1} but it isn't a ClusterSession.
+ReplicationValve.session.indicator=Context {0}: Primarity of session {0} in request attribute {1} is {2}.
+ReplicationValve.session.invalid=Context {0}: Requested session {1} is invalid, removed or not replicated at this node.
+ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests {4} send requests {5} cross context requests (Request={6} ms Cluster={7} ms).
+SimpleTcpCluster.event.log=Cluster receive listener event {0} with data {1}
+SimpleTcpCluster.getProperty=get property {0}
+SimpleTcpCluster.setProperty=set property {0}: {1} old value {2}
+SimpleTcpCluster.default.addClusterListener=Add Default ClusterListener at cluster {0}
+SimpleTcpCluster.default.addClusterValves=Add Default ClusterValves at cluster {0}
+SimpleTcpCluster.default.addClusterReceiver=Add Default ClusterReceiver at cluster {0}
+SimpleTcpCluster.default.addClusterSender=Add Default ClusterSender at cluster {0}
+SimpleTcpCluster.default.addMembershipService=Add Default Membership Service at cluster {0}
+SimpleTcpCluster.log.receive=RECEIVE {0,date}:{0,time} {1,number} {2}:{3,number,integer} {4} {5}
+SimpleTcpCluster.log.send=SEND {0,date}:{0,time} {1,number} {2}:{3,number,integer} {4} 
+SimpleTcpCluster.log.send.all=SEND {0,date}:{0,time} {1,number} - {2}
+SocketReplictionListener.allreadyExists=ServerSocket [{0}:{1}] allready started!
+SocketReplictionListener.accept.failure=ServerSocket [{0}:{1}] - Exception to start thread or accept server socket
+SocketReplictionListener.open=Open Socket at [{0}:{1}]
+SocketReplictionListener.openclose.failure=ServerSocket [{0}:{1}] - Exception to open or close server socket
+SocketReplictionListener.portbusy=Port busy at [{0}:{i}] - reason [{2}]
+SocketReplictionListener.serverSocket.notExists=Fatal error: Receiver socket not bound address={0} port={1} maxport={2}
+SocketReplictionListener.timeout=Receiver ServerSocket no started [{0}:{1}] - reason: timeout={2} or listen={3}
+SocketReplictionListener.unlockSocket.failure=UnLocksocket failure at ServerSocket [{0:{1}]

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,268 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.LinkedList;
+
+/**
+ * Send cluster messages with a pool of sockets (25).
+ * 
+ * FIXME support processing stats
+ * 
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version 1.2
+ */
+
+public class PooledSocketSender extends DataSender {
+
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "PooledSocketSender/2.0";
+
+    // ----------------------------------------------------- Instance Variables
+
+    private int maxPoolSocketLimit = 25;
+
+    private SenderQueue senderQueue = null;
+    
+    //  ----------------------------------------------------- Constructor
+
+   /**
+    * @param domain replication cluster domain (session domain)
+    * @param host replication node tcp address
+    * @param port replication node tcp port
+    */
+    public PooledSocketSender(String domain,InetAddress host, int port) {
+        super(domain,host, port);
+        senderQueue = new SenderQueue(this, maxPoolSocketLimit);
+    }
+   
+    //  ----------------------------------------------------- Public Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+
+    public void setMaxPoolSocketLimit(int limit) {
+        maxPoolSocketLimit = limit;
+        senderQueue.setLimit(limit);
+    }
+
+    public int getMaxPoolSocketLimit() {
+        return maxPoolSocketLimit;
+    }
+
+    public int getInPoolSize() {
+        return senderQueue.getInPoolSize();
+    }
+
+    public int getInUsePoolSize() {
+        return senderQueue.getInUsePoolSize();
+    }
+
+    //  ----------------------------------------------------- Public Methode
+
+    public synchronized void connect() throws java.io.IOException {
+        //do nothing, happens in the socket sender itself
+        senderQueue.open();
+        setSocketConnected(true);
+        connectCounter++;
+    }
+
+    public synchronized void disconnect() {
+        senderQueue.close();
+        setSocketConnected(false);
+        disconnectCounter++;
+    }
+
+    /**
+     * send message and use a pool of SocketSenders
+     * 
+     * @param messageId Message unique identifier
+     * @param data Message data
+     * @throws java.io.IOException
+     */
+    public void sendMessage(ClusterData data) throws IOException {
+        //get a socket sender from the pool
+        if(!isConnected()) {
+            synchronized(this) {
+                if(!isConnected())
+                    connect();
+            }
+        }
+        SocketSender sender = senderQueue.getSender(0);
+        if (sender == null) {
+            log.warn(sm.getString("PoolSocketSender.noMoreSender", this.getAddress(), new Integer(this.getPort())));
+            return;
+        }
+        //send the message
+        try {
+            sender.sendMessage(data);
+        } finally {
+            //return the connection to the pool
+            senderQueue.returnSender(sender);
+        }
+        addStats(data.getMessage().length);
+    }
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer("PooledSocketSender[");
+        buf.append(getAddress()).append(":").append(getPort()).append("]");
+        return buf.toString();
+    }
+
+    //  ----------------------------------------------------- Inner Class
+
+    private class SenderQueue {
+        private int limit = 25;
+
+        PooledSocketSender parent = null;
+
+        private LinkedList queue = new LinkedList();
+
+        private LinkedList inuse = new LinkedList();
+
+        private Object mutex = new Object();
+
+        private boolean isOpen = true;
+
+        public SenderQueue(PooledSocketSender parent, int limit) {
+            this.limit = limit;
+            this.parent = parent;
+        }
+
+        /**
+         * @return Returns the limit.
+         */
+        public int getLimit() {
+            return limit;
+        }
+        /**
+         * @param limit The limit to set.
+         */
+        public void setLimit(int limit) {
+            this.limit = limit;
+        }
+        /**
+         * @return
+         */
+        public int getInUsePoolSize() {
+            return inuse.size();
+        }
+
+        /**
+         * @return
+         */
+        public int getInPoolSize() {
+            return queue.size();
+        }
+
+        public SocketSender getSender(long timeout) {
+            SocketSender sender = null;
+            long start = System.currentTimeMillis();
+            long delta = 0;
+            do {
+                synchronized (mutex) {
+                    if (!isOpen)
+                        throw new IllegalStateException(
+                                "Socket pool is closed.");
+                    if (queue.size() > 0) {
+                        sender = (SocketSender) queue.removeFirst();
+                    } else if (inuse.size() < limit) {
+                        sender = getNewSocketSender();
+                    } else {
+                        try {
+                            mutex.wait(timeout);
+                        } catch (Exception x) {
+                            PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new Integer(parent.getPort())),x);
+                        }//catch
+                    }//end if
+                    if (sender != null) {
+                        inuse.add(sender);
+                    }
+                }//synchronized
+                delta = System.currentTimeMillis() - start;
+            } while ((isOpen) && (sender == null)
+                    && (timeout == 0 ? true : (delta < timeout)));
+            //to do
+            return sender;
+        }
+
+        public void returnSender(SocketSender sender) {
+            //to do
+            synchronized (mutex) {
+                queue.add(sender);
+                inuse.remove(sender);
+                mutex.notify();
+            }
+        }
+
+        private SocketSender getNewSocketSender() {
+            //new SocketSender(
+            SocketSender sender = new SocketSender(getDomain(),
+                                                   parent.getAddress(), 
+                                                   parent.getPort(),
+                                                   parent.getSenderState() );
+            sender.setKeepAliveMaxRequestCount(parent
+                    .getKeepAliveMaxRequestCount());
+            sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
+            sender.setAckTimeout(parent.getAckTimeout());
+            sender.setWaitForAck(parent.isWaitForAck());
+            sender.setResend(parent.isResend());
+            return sender;
+
+        }
+
+        public void close() {
+            synchronized (mutex) {
+                for (int i = 0; i < queue.size(); i++) {
+                    SocketSender sender = (SocketSender) queue.get(i);
+                    sender.disconnect();
+                }//for
+                for (int i = 0; i < inuse.size(); i++) {
+                    SocketSender sender = (SocketSender) inuse.get(i);
+                    sender.disconnect();
+                }//for
+                queue.clear();
+                inuse.clear();
+                isOpen = false;
+                mutex.notifyAll();
+            }
+        }
+
+        public void open() {
+            synchronized (mutex) {
+                isOpen = true;
+                mutex.notifyAll();
+            }
+        }
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,430 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.group.ChannelInterceptorBase;
+import org.apache.catalina.cluster.io.ListenCallback;
+import org.apache.catalina.cluster.io.ObjectReader;
+import org.apache.catalina.cluster.io.XByteBuffer;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.MessageListener;
+
+/**
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $
+ */
+public class ReplicationListener
+    implements Runnable, ClusterReceiver, ListenCallback {
+    protected static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class);
+
+    /**
+     * The string manager for this package.
+     */
+    protected StringManager sm = StringManager.getManager(Constants.Package);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "ReplicationListener/1.3";
+
+    private ThreadPool pool = null;
+    private int tcpThreadCount;
+    private long tcpSelectorTimeout;
+    private Selector selector = null;
+
+    private java.net.InetAddress bind;
+    private String tcpListenAddress;
+    private int tcpListenPort;
+    private boolean sendAck;
+    protected boolean doListen = false;
+    /**
+     * Compress message data bytes
+     */
+    private boolean compress = true;
+
+
+    private Object interestOpsMutex = new Object();
+    private MessageListener listener = null;
+    public ReplicationListener() {
+    }
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+
+    public long getTcpSelectorTimeout() {
+        return tcpSelectorTimeout;
+    }
+
+    public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
+        this.tcpSelectorTimeout = tcpSelectorTimeout;
+    }
+
+    public int getTcpThreadCount() {
+        return tcpThreadCount;
+    }
+
+    public void setTcpThreadCount(int tcpThreadCount) {
+        this.tcpThreadCount = tcpThreadCount;
+    }
+
+    public Object getInterestOpsMutex() {
+        return interestOpsMutex;
+    }
+
+    public void stop() {
+        this.stopListening();
+    }
+
+    /**
+     * start cluster receiver
+     * @throws Exception
+     * @see org.apache.catalina.cluster.ClusterReceiver#start()
+     */
+    public void start() {
+        try {
+            pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
+        } catch (Exception e) {
+            log.error("ThreadPool can initilzed. Listener not started", e);
+            return;
+        }
+        try {
+            getBind();
+            Thread t = new Thread(this, "ReplicationListener");
+            t.setDaemon(true);
+            t.start();
+        } catch (Exception x) {
+            log.fatal("Unable to start cluster receiver", x);
+        }
+    }
+
+    /**
+     * get data from channel and store in byte array
+     * send it to cluster
+     * @throws IOException
+     * @throws java.nio.channels.ClosedChannelException
+     */
+    protected void listen() throws Exception {
+        if (doListen) {
+            log.warn("ServerSocketChannel allready started");
+            return;
+        }
+        doListen = true;
+        // allocate an unbound server socket channel
+        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        // Get the associated ServerSocket to bind it with
+        ServerSocket serverSocket = serverChannel.socket();
+        // create a new Selector for use below
+        selector = Selector.open();
+        // set the port the server channel will listen to
+        serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
+        // set non-blocking mode for the listening socket
+        serverChannel.configureBlocking(false);
+        // register the ServerSocketChannel with the Selector
+        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+        while (doListen && selector != null) {
+            // this may block for a long time, upon return the
+            // selected set contains keys of the ready channels
+            try {
+
+                int n = selector.select(tcpSelectorTimeout);
+                if (n == 0) {
+                    //there is a good chance that we got here
+                    //because the TcpReplicationThread called
+                    //selector wakeup().
+                    //if that happens, we must ensure that that
+                    //thread has enough time to call interestOps
+                    synchronized (interestOpsMutex) {
+                        //if we got the lock, means there are no
+                        //keys trying to register for the
+                        //interestOps method
+                    }
+                    continue; // nothing to do
+                }
+                // get an iterator over the set of selected keys
+                Iterator it = selector.selectedKeys().iterator();
+                // look at each key in the selected set
+                while (it.hasNext()) {
+                    SelectionKey key = (SelectionKey) it.next();
+                    // Is a new connection coming in?
+                    if (key.isAcceptable()) {
+                        ServerSocketChannel server =
+                            (ServerSocketChannel) key.channel();
+                        SocketChannel channel = server.accept();
+                        Object attach = new ObjectReader(channel, selector,
+                            this);
+                        registerChannel(selector,
+                                        channel,
+                                        SelectionKey.OP_READ,
+                                        attach);
+                    }
+                    // is there data to read on this channel?
+                    if (key.isReadable()) {
+                        readDataFromSocket(key);
+                    } else {
+                        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                    }
+
+                    // remove key from selected set, it's been handled
+                    it.remove();
+                }
+            } catch (java.nio.channels.ClosedSelectorException cse) {
+                // ignore is normal at shutdown or stop listen socket
+            } catch (java.nio.channels.CancelledKeyException nx) {
+                log.warn(
+                    "Replication client disconnected, error when polling key. Ignoring client.");
+            } catch (Exception x) {
+                log.error("Unable to process request in ReplicationListener", x);
+            }
+
+        }
+        serverChannel.close();
+        if (selector != null)
+            selector.close();
+    }
+
+    /**
+     * Close Selector.
+     *
+     * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#stopListening()
+     */
+    protected void stopListening() {
+        // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
+        doListen = false;
+        if (selector != null) {
+            try {
+                for (int i = 0; i < getTcpThreadCount(); i++) {
+                    selector.wakeup();
+                }
+                selector.close();
+            } catch (Exception x) {
+                log.error("Unable to close cluster receiver selector.", x);
+            } finally {
+                selector = null;
+            }
+        }
+    }
+
+    /**
+     * deserialize the receieve cluster message
+     * @param data uncompress data
+     * @return The message
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    //protected ClusterMessage deserialize(byte[] data)
+    protected ClusterMessage deserialize(ClusterData data) throws IOException, ClassNotFoundException {
+        boolean compress = isCompress() || data.getCompress() == ClusterMessage.FLAG_ALLOWED;
+        ClusterMessage message = null;
+        if (data != null) {
+            message = XByteBuffer.deserialize(data, compress);
+        }
+        return message;
+    }
+
+    // ----------------------------------------------------------
+
+    /**
+     * Register the given channel with the given selector for
+     * the given operations of interest
+     */
+    protected void registerChannel(Selector selector,
+                                   SelectableChannel channel,
+                                   int ops,
+                                   Object attach) throws Exception {
+        if (channel == null)return; // could happen
+        // set the new channel non-blocking
+        channel.configureBlocking(false);
+        // register it with the selector
+        channel.register(selector, ops, attach);
+    }
+
+    /**
+     * Start thread and listen
+     */
+    public void run() {
+        try {
+            listen();
+        } catch (Exception x) {
+            log.error("Unable to start replication listener.", x);
+        }
+    }
+
+    // ----------------------------------------------------------
+
+    /**
+     * Sample data handler method for a channel with data ready to read.
+     * @param key A SelectionKey object associated with a channel
+     *  determined by the selector to be ready for reading.  If the
+     *  channel returns an EOF condition, it is closed here, which
+     *  automatically invalidates the associated key.  The selector
+     *  will then de-register the channel on the next select call.
+     */
+    protected void readDataFromSocket(SelectionKey key) throws Exception {
+        TcpReplicationThread worker = (TcpReplicationThread) pool.getWorker();
+        if (worker == null) {
+            // No threads available, do nothing, the selection
+            // loop will keep calling this method until a
+            // thread becomes available.
+            // FIXME: This design could be improved.
+            if (log.isDebugEnabled())
+                log.debug("No TcpReplicationThread available");
+        } else {
+            // invoking this wakes up the worker thread then returns
+            worker.serviceChannel(key, isSendAck());
+        }
+    }
+
+    public void messageDataReceived(ClusterData data) {
+        if ( this.listener != null ) {
+            try {
+                ClusterMessage msg = deserialize(data);
+                listener.messageReceived(msg);
+            }catch ( java.io.IOException x ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. IOException.",x);
+                }
+            }catch ( java.lang.ClassNotFoundException cx ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. ClassNotFoundException.",cx);
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Returns the bind.
+     */
+    public java.net.InetAddress getBind() {
+        if (bind == null) {
+            try {
+                if ("auto".equals(tcpListenAddress)) {
+                    tcpListenAddress = java.net.InetAddress.getLocalHost()
+                                       .getHostAddress();
+                }
+                if (log.isDebugEnabled())
+                    log.debug("Starting replication listener on address:"
+                              + tcpListenAddress);
+                bind = java.net.InetAddress.getByName(tcpListenAddress);
+            } catch (IOException ioe) {
+                log.error("Failed bind replication listener on address:"
+                          + tcpListenAddress, ioe);
+            }
+        }
+        return bind;
+    }
+
+    /**
+     * @param bind The bind to set.
+     */
+    public void setBind(java.net.InetAddress bind) {
+        this.bind = bind;
+    }
+
+    /**
+     * @return Returns the compress.
+     */
+    public boolean isCompress() {
+        return compress;
+    }
+
+    /**
+     * @param compressMessageData The compress to set.
+     */
+    public void setCompress(boolean compressMessageData) {
+        this.compress = compressMessageData;
+    }
+
+    /**
+     * Send ACK to sender
+     *
+     * @return True if sending ACK
+     */
+    public boolean isSendAck() {
+        return sendAck;
+    }
+
+    /**
+     * set ack mode or not!
+     *
+     * @param sendAck
+     */
+    public void setSendAck(boolean sendAck) {
+        this.sendAck = sendAck;
+    }
+
+    public String getTcpListenAddress() {
+        return tcpListenAddress;
+    }
+
+    public void setTcpListenAddress(String tcpListenAddress) {
+        this.tcpListenAddress = tcpListenAddress;
+    }
+
+    public int getTcpListenPort() {
+        return tcpListenPort;
+    }
+
+    public MessageListener getMessageListener() {
+        return listener;
+    }
+
+    public void setTcpListenPort(int tcpListenPort) {
+        this.tcpListenPort = tcpListenPort;
+    }
+
+    public void setMessageListener(MessageListener listener) {
+        this.listener = listener;
+    }
+
+    public String getHost() {
+        return getTcpListenAddress();
+    }
+
+    public int getPort() {
+        return getTcpListenPort();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.catalina.cluster.io.ListenCallback#sendAck()
+     */
+    public void sendAck() throws IOException {
+        // do nothing
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.jbx
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.jbx?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.jbx (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.jbx Thu Feb 23 11:55:14 2006
@@ -0,0 +1,24 @@
+[PropertyInfo]
+bind,java.net.InetAddress,false,false, , ,true,<default>
+compress,boolean,false,false, , ,true,<default>
+coordinator,org.apache.catalina.cluster.MessageListener,false,false, , ,true,<default>
+coordinator,org.apache.catalina.cluster.group.ChannelInterceptorBase,false,false,coordinator,coordinator,true,<default>
+doListen,boolean,false,false, , ,false,<default>
+host,String,false,false, , ,true,<default>
+info,String,false,false, , ,true,<default>
+interestOpsMutex,Object,false,false, , ,true,<default>
+log,org.apache.commons.logging.Log,false,false, , ,false,<default>
+pool,org.apache.catalina.cluster.tcp.ThreadPool,false,false, , ,false,<default>
+port,int,false,false, , ,true,<default>
+selector,java.nio.channels.Selector,false,false, , ,false,<default>
+sendAck,boolean,false,false, , ,true,<default>
+sm,org.apache.catalina.util.StringManager,false,false, , ,false,<default>
+tcpListenAddress,String,false,false, , ,true,<default>
+tcpListenPort,int,false,false, , ,true,<default>
+tcpSelectorTimeout,long,false,false, , ,true,<default>
+tcpThreadCount,int,false,false, , ,true,<default>
+[IconNames]
+
+
+
+

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,703 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.catalina.Container;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterSender;
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.cluster.util.IDynamicProperty;
+import org.apache.catalina.core.StandardHost;
+import org.apache.catalina.util.StringManager;
+import org.apache.tomcat.util.IntrospectionUtils;
+import org.apache.catalina.cluster.io.XByteBuffer;
+
+/**
+ * Transmit message to other cluster members
+ * Actual senders are created based on the replicationMode
+ * type 
+ * FIXME i18n log messages
+ * FIXME compress data depends on message type and size 
+ * TODO pause and resume senders
+ * 
+ * @author Peter Rossbach
+ * @author Filip Hanik
+ * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 2006) $
+ */
+public class ReplicationTransmitter implements ClusterSender,IDynamicProperty {
+    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
+            .getLog(ReplicationTransmitter.class);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "ReplicationTransmitter/3.0";
+
+    /**
+     * The string manager for this package.
+     */
+    protected StringManager sm = StringManager.getManager(Constants.Package);
+
+    private Map map = new HashMap();
+
+    public ReplicationTransmitter() {
+    }
+
+    /**
+     * number of transmitted messages>
+     */
+    private long nrOfRequests = 0;
+
+    /**
+     * number of transmitted bytes
+     */
+    private long totalBytes = 0;
+
+    /**
+     * number of failure
+     */
+    private long failureCounter = 0;
+
+    /**
+     * Iteration count for background processing.
+     */
+    private int count = 0;
+
+    /**
+     * Frequency of the check sender keepAlive Socket Status.
+     */
+    protected int processSenderFrequency = 2;
+
+    /**
+     * current sender replication mode
+     */
+    private String replicationMode;
+
+    /**
+     * sender default ackTimeout
+     */
+    private long ackTimeout = 15000; //15 seconds by default
+
+    /**
+     * enabled wait for ack
+     */
+    private boolean waitForAck = true;
+
+    /**
+     * autoConnect sender when next message send
+     */
+    private boolean autoConnect = false;
+
+    /**
+     * Compress message data bytes
+     */
+    private boolean compress = false;
+
+    /**
+     * doTransmitterProcessingStats
+     */
+    protected boolean doTransmitterProcessingStats = false;
+
+    /**
+     * proessingTime
+     */
+    protected long processingTime = 0;
+    
+    /**
+     * min proessingTime
+     */
+    protected long minProcessingTime = Long.MAX_VALUE ;
+
+    /**
+     * max proessingTime
+     */
+    protected long maxProcessingTime = 0;
+   
+    /**
+     * dynamic sender <code>properties</code>
+     */
+    private Map properties = new HashMap();
+
+
+    /**
+     * Transmitter Mbean name
+     */
+    private ObjectName objectName;
+
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+        return (info);
+    }
+
+    /**
+     * @return Returns the nrOfRequests.
+     */
+    public long getNrOfRequests() {
+        return nrOfRequests;
+    }
+
+    /**
+     * @return Returns the totalBytes.
+     */
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    /**
+     * @return Returns the failureCounter.
+     */
+    public long getFailureCounter() {
+        return failureCounter;
+    }
+
+    /**
+     * current replication mode
+     * 
+     * @return The mode
+     */
+    public String getReplicationMode() {
+        return replicationMode;
+    }
+
+    /**
+     * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
+     * 
+     * @see IDataSenderFactory#validateMode(String)
+     * @param mode
+     */
+    public void setReplicationMode(String mode) {
+        String msg = IDataSenderFactory.validateMode(mode);
+        if (msg == null) {
+            if (log.isDebugEnabled())
+                log.debug("Setting replication mode to " + mode);
+            this.replicationMode = mode;
+        } else
+            throw new IllegalArgumentException(msg);
+
+    }
+
+    /**
+     * @return Returns the avg processingTime/nrOfRequests.
+     */
+    public double getAvgProcessingTime() {
+        return ((double)processingTime) / nrOfRequests;
+    }
+ 
+    /**
+     * @return Returns the maxProcessingTime.
+     */
+    public long getMaxProcessingTime() {
+        return maxProcessingTime;
+    }
+    
+    /**
+     * @return Returns the minProcessingTime.
+     */
+    public long getMinProcessingTime() {
+        return minProcessingTime;
+    }
+    
+    /**
+     * @return Returns the processingTime.
+     */
+    public long getProcessingTime() {
+        return processingTime;
+    }
+    
+    /**
+     * @return Returns the doTransmitterProcessingStats.
+     */
+    public boolean isDoTransmitterProcessingStats() {
+        return doTransmitterProcessingStats;
+    }
+    
+    /**
+     * @param doProcessingStats The doTransmitterProcessingStats to set.
+     */
+    public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
+        this.doTransmitterProcessingStats = doProcessingStats;
+    }
+ 
+
+    /**
+     * Transmitter ObjectName
+     * 
+     * @param name
+     */
+    public void setObjectName(ObjectName name) {
+        objectName = name;
+    }
+
+    public ObjectName getObjectName() {
+        return objectName;
+    }
+
+    /**
+     * @return Returns the compress.
+     */
+    public boolean isCompress() {
+        return compress;
+    }
+
+    /**
+     * @param compressMessageData
+     *            The compress to set.
+     */
+    public void setCompress(boolean compressMessageData) {
+        this.compress = compressMessageData;
+    }
+
+    /**
+     * @return Returns the autoConnect.
+     */
+    public boolean isAutoConnect() {
+        return autoConnect;
+    }
+
+    /**
+     * @param autoConnect
+     *            The autoConnect to set.
+     */
+    public void setAutoConnect(boolean autoConnect) {
+        this.autoConnect = autoConnect;
+        setProperty("autoConnect", String.valueOf(autoConnect));
+
+    }
+
+    /**
+     * @return The ack timeout
+     */
+    public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    /**
+     * @param ackTimeout
+     */
+    public void setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+        setProperty("ackTimeout", String.valueOf(ackTimeout));
+    }
+
+    /**
+     * @return Returns the waitForAck.
+     */
+    public boolean isWaitForAck() {
+        return waitForAck;
+    }
+
+    /**
+     * @param waitForAck
+     *            The waitForAck to set.
+     */
+    public void setWaitForAck(boolean waitForAck) {
+        this.waitForAck = waitForAck;
+        setProperty("waitForAck", String.valueOf(waitForAck));
+    }
+
+    
+    /**
+     * @return Returns the processSenderFrequency.
+     */
+    public int getProcessSenderFrequency() {
+        return processSenderFrequency;
+    }
+    
+    /**
+     * @param processSenderFrequency The processSenderFrequency to set.
+     */
+    public void setProcessSenderFrequency(int processSenderFrequency) {
+        this.processSenderFrequency = processSenderFrequency;
+    }
+    
+
+    /**
+     * @return True if synchronized sender
+     * @deprecated since version 5.5.7
+     */
+    public boolean getIsSenderSynchronized() {
+        return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
+                || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
+    }
+
+    // ------------------------------------------------------------- dynamic
+    // sender property handling
+
+    /**
+     * set config attributes with reflect
+     * 
+     * @param name
+     * @param value
+     */
+    public void setProperty(String name, Object value) {
+        if (log.isTraceEnabled())
+            log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
+                    value, properties.get(name)));
+
+        properties.put(name, value);
+    }
+
+    /**
+     * get current config
+     * 
+     * @param key
+     * @return The property
+     */
+    public Object getProperty(String key) {
+        if (log.isTraceEnabled())
+            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
+        return properties.get(key);
+    }
+
+    /**
+     * Get all properties keys
+     * 
+     * @return An iterator over the propery name set
+     */
+    public Iterator getPropertyNames() {
+        return properties.keySet().iterator();
+    }
+
+    /**
+     * remove a configured property.
+     * 
+     * @param key
+     */
+    public void removeProperty(String key) {
+        properties.remove(key);
+    }
+
+    // ------------------------------------------------------------- public
+    
+    /**
+     * Send data to one member
+     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage, org.apache.catalina.cluster.Member)
+     */
+    public void sendMessage(ClusterMessage message, Member member) throws IOException {       
+        long time = 0 ;
+        if(doTransmitterProcessingStats) {
+            time = System.currentTimeMillis();
+        }
+        try {
+            ClusterData data = serialize(message);
+            String key = getKey(member);
+            IDataSender sender = (IDataSender) map.get(key);
+            sendMessageData(data, sender);
+        } finally {
+            if (doTransmitterProcessingStats) {
+                addProcessingStats(time);
+            }
+        }
+    }
+    
+    /**
+     * Send to all senders at same cluster domain as message from address
+     * @param message Cluster message to send
+     * @since 5.5.10
+     */
+    public void sendMessageClusterDomain(ClusterMessage message) throws IOException {
+        sendMessage(message,true);
+    
+    }
+
+    public void sendMessage(ClusterMessage message) throws IOException {
+        sendMessage(message,false);
+    }
+
+    /**
+     * send message to all senders (broadcast)
+     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage)
+     */
+    public void sendMessage(ClusterMessage message, boolean domainOnly) throws IOException {
+        long time = 0;
+        if (doTransmitterProcessingStats) {
+            time = System.currentTimeMillis();
+        }
+        try {
+            ClusterData data = serialize(message);
+            IDataSender[] senders = getSenders();
+            for (int i = 0; i < senders.length; i++) {
+
+                IDataSender sender = senders[i];
+                //domain filter
+                String domain = message.getAddress().getDomain();
+                if ( domainOnly && !(domain.equals(sender.getDomain())) ) continue;
+                sendMessageData(data, sender);
+            }
+        } finally {
+            if (doTransmitterProcessingStats) {
+                addProcessingStats(time);
+            }
+        }
+    }
+        
+    
+
+    /**
+     * start the sender and register transmitter mbean
+     * 
+     * @see org.apache.catalina.cluster.ClusterSender#start()
+     */
+    public void start() throws java.io.IOException {
+    }
+
+    /*
+     * stop the sender and deregister mbeans (transmitter, senders)
+     * 
+     * @see org.apache.catalina.cluster.ClusterSender#stop()
+     */
+    public synchronized void stop() {
+        Iterator i = map.entrySet().iterator();
+        while (i.hasNext()) {
+            IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
+                    .getValue();
+            try {
+                sender.disconnect();
+            } catch (Exception x) {
+            }
+            i.remove();
+        }
+    }
+
+    /**
+     * Call transmitter to check for sender socket status
+     * 
+     * @see SimpleTcpCluster#backgroundProcess()
+     */
+    public void backgroundProcess() {
+        count = (count + 1) % processSenderFrequency;
+        if (count == 0) {
+            checkKeepAlive();
+        }
+    }
+
+    /**
+     * Check all DataSender Socket to close socket at keepAlive mode
+     * @see DataSender#checkKeepAlive()
+     */
+    public void checkKeepAlive() {
+        if (map.size() > 0) {
+            java.util.Iterator iter = map.entrySet().iterator();
+            while (iter.hasNext()) {
+                IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
+                        .next()).getValue();
+                if (sender != null)
+                    sender.checkKeepAlive();
+            }
+        }
+    }
+
+    /**
+     * get all current senders
+     * 
+     * @return The senders
+     */
+    public IDataSender[] getSenders() {
+        java.util.Iterator iter = map.entrySet().iterator();
+        IDataSender[] array = new IDataSender[map.size()];
+        int i = 0;
+        while (iter.hasNext()) {
+            IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
+                    .next()).getValue();
+            if (sender != null)
+                array[i] = sender;
+            i++;
+        }
+        return array;
+    }
+
+    /**
+     * Reset sender statistics
+     */
+    public synchronized void resetStatistics() {
+        nrOfRequests = 0;
+        totalBytes = 0;
+        failureCounter = 0;
+        processingTime = 0;
+        minProcessingTime = Long.MAX_VALUE;
+        maxProcessingTime = 0;
+    }
+
+    /**
+     * add new cluster member and create sender ( s. replicationMode) transfer
+     * current properties to sender
+     * 
+     * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
+     */
+    public synchronized void add(Member member) {
+        try {
+            String key = getKey(member);
+            if (!map.containsKey(key)) {
+                IDataSender sender = IDataSenderFactory.getIDataSender(
+                        replicationMode, member);
+                transferSenderProperty(sender);
+                map.put(key, sender);
+            }
+        } catch (java.io.IOException x) {
+            log.error("Unable to create and add a IDataSender object.", x);
+        }
+    }
+
+    /**
+     * remove sender from transmitter. ( deregister mbean and disconnect sender )
+     * 
+     * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
+     */
+    public synchronized void remove(Member member) {
+        String key = getKey(member);
+        IDataSender toberemoved = (IDataSender) map.get(key);
+        if (toberemoved == null)
+            return;
+        toberemoved.disconnect();
+        map.remove(key);
+
+    }
+
+    // ------------------------------------------------------------- protected
+
+    /**
+     * calc number of requests and transfered bytes. Log stats all 100 requets
+     * 
+     * @param length
+     */
+    protected synchronized void addStats(int length) {
+        nrOfRequests++;
+        totalBytes += length;
+        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
+            log.debug("Nr of bytes sent=" + totalBytes + " over "
+                    + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
+                    + " bytes/request; failures=" + failureCounter);
+        }
+
+    }
+
+    /**
+     * Transfer all properties from transmitter to concrete sender
+     * 
+     * @param sender
+     */
+    protected void transferSenderProperty(IDataSender sender) {
+        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
+            String pkey = (String) iter.next();
+            Object value = getProperty(pkey);
+            IntrospectionUtils.setProperty(sender, pkey, value.toString());
+        }
+    }
+
+    /**
+     * set unique key to find sender
+     * 
+     * @param member
+     * @return concat member.host:member.port
+     */
+    protected String getKey(Member member) {
+        return member.getHost() + ":" + member.getPort();
+    }
+
+    
+    
+
+    /**
+     * serialize message and add timestamp from message
+     * handle compression
+     * @see GZIPOutputStream
+     * @param msg cluster message
+     * @return cluster message as byte array
+     * @throws IOException
+     * @since 5.5.10
+     */
+    protected ClusterData serialize(ClusterMessage msg) throws IOException {
+        boolean compress = ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
+                             || msg.getCompress() == ClusterMessage.FLAG_ALLOWED);
+        return XByteBuffer.serialize(msg,compress);
+    }
+ 
+
+    /**
+     * Send message to concrete sender. If autoConnect is true, check is
+     * connection broken and the reconnect the complete sender.
+     * <ul>
+     * <li>failure the suspect flag is set true. After successfully sending the
+     * suspect flag is set to false.</li>
+     * <li>Stats is only update after sussesfull sending</li>
+     * </ul>
+     * 
+     * @param data message Data
+     * @param sender concrete message sender
+     * @return true if the message got sent, false otherwise
+     * @throws java.io.IOException If an error occurs
+     */
+    protected boolean sendMessageData(ClusterData data,
+                                      IDataSender sender) {
+        if (sender == null)
+            throw new RuntimeException("Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
+        try {
+            // deprecated not needed DataSender#pushMessage can handle connection
+            if (autoConnect) {
+                synchronized(sender) {
+                    if(!sender.isConnected())
+                        sender.connect();
+                }
+            }
+            sender.sendMessage(data);
+            sender.setSuspect(false);
+            addStats(data.getMessage().length);
+            return true;
+        } catch (Exception x) {
+            if (!sender.getSuspect()) {
+                if (log.isErrorEnabled() ) log.error("Unable to send replicated message, is member ["+sender.toString()+"] down?",x);
+            } else if (log.isDebugEnabled() ) {
+                log.debug("Unable to send replicated message, is member ["+sender.toString()+"] down?",x);
+            }
+            sender.setSuspect(true);
+            failureCounter++;
+            return false;
+        }
+
+    }
+    /**
+     * Add processing stats times
+     * @param startTime
+     */
+    protected void addProcessingStats(long startTime) {
+        long time = System.currentTimeMillis() - startTime ;
+        if(time < minProcessingTime)
+            minProcessingTime = time ;
+        if( time > maxProcessingTime)
+            maxProcessingTime = time ;
+        processingTime += time ;
+    }
+ 
+ 
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,81 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import org.apache.catalina.cluster.Member;
+
+/**
+ * @author Peter Rossbach
+ * @version $Revision: 303987 $ $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $
+ */
+public class SendMessageData {
+
+    private Object message ;
+    private Member destination ;
+    private Exception exception ;
+    
+    
+    /**
+     * @param message
+     * @param destination
+     * @param exception
+     */
+    public SendMessageData(Object message, Member destination,
+            Exception exception) {
+        super();
+        this.message = message;
+        this.destination = destination;
+        this.exception = exception;
+    }
+    
+    /**
+     * @return Returns the destination.
+     */
+    public Member getDestination() {
+        return destination;
+    }
+    /**
+     * @param destination The destination to set.
+     */
+    public void setDestination(Member destination) {
+        this.destination = destination;
+    }
+    /**
+     * @return Returns the exception.
+     */
+    public Exception getException() {
+        return exception;
+    }
+    /**
+     * @param exception The exception to set.
+     */
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+    /**
+     * @return Returns the message.
+     */
+    public Object getMessage() {
+        return message;
+    }
+    /**
+     * @param message The message to set.
+     */
+    public void setMessage(Object message) {
+        this.message = message;
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SenderState.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SenderState.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SenderState.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SenderState.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,82 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+
+/**
+ * Send cluster messages with a pool of sockets (25).
+ * 
+ * FIXME support processing stats
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ * @since 5.5.16
+ */
+
+public class SenderState {
+    
+    public static final int READY = 0;
+    public static final int SUSPECT = 1;
+    public static final int FAILING = 2;
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "SenderState/1.0";
+
+    // ----------------------------------------------------- Instance Variables
+
+    private int state = READY;
+
+    //  ----------------------------------------------------- Constructor
+
+    
+    public SenderState() {
+        this(READY);
+    }
+
+    public SenderState(int state) {
+        this.state = state;
+    }
+    
+    public boolean isSuspect() {
+        return state == SUSPECT;
+    }
+
+    public void setSuspect() {
+        state = SUSPECT;
+    }
+    
+    public boolean isReady() {
+        return state == READY;
+    }
+    
+    public void setReady() {
+        state = READY;
+    }
+    
+    public boolean isFailing() {
+        return state == FAILING;
+    }
+    
+    public void setFailing() {
+        state = FAILING;
+    }
+    
+
+    //  ----------------------------------------------------- Public Properties
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SocketSender.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SocketSender.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/SocketSender.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,71 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.net.InetAddress;
+
+/**
+ * Send cluster messages sync to request with only one socket.
+ * 
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version 1.2
+ */
+
+public class SocketSender extends DataSender {
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "SocketSender/1.2";
+
+    // ------------------------------------------------------------- Constructor
+   
+   /**
+    * @param domain replication cluster domain (session domain)
+    * @param host replication node tcp address
+    * @param port replication node tcp port
+    */
+    public SocketSender(String domain,InetAddress host, int port) {
+        super(domain,host, port);
+    }
+
+    public SocketSender(String domain,InetAddress host, int port, SenderState state) {
+        super(domain,host, port, state);
+    }
+
+    // ------------------------------------------------------------- Properties
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+
+    public String toString() {
+        StringBuffer buf = new StringBuffer("SocketSender[");
+        buf.append(getAddress()).append(":").append(getPort()).append("]");
+        return buf.toString();
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,186 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.cluster.io.ObjectReader;
+
+/**
+ * A worker thread class which can drain channels and echo-back the input. Each
+ * instance is constructed with a reference to the owning thread pool object.
+ * When started, the thread loops forever waiting to be awakened to service the
+ * channel associated with a SelectionKey object. The worker is tasked by
+ * calling its serviceChannel() method with a SelectionKey object. The
+ * serviceChannel() method stores the key reference in the thread object then
+ * calls notify() to wake it up. When the channel has been drained, the worker
+ * thread returns itself to its parent pool.
+ * 
+ * @author Filip Hanik
+ * 
+ * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $
+ */
+public class TcpReplicationThread extends WorkerThread {
+    public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
+    private static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
+    private ByteBuffer buffer = ByteBuffer.allocate (1024);
+    private SelectionKey key;
+    private boolean sendAck=true;
+
+    
+    TcpReplicationThread ()
+    {
+    }
+
+    // loop forever waiting for work to do
+    public synchronized void run()
+    {
+        while (doRun) {
+            try {
+                // sleep and release object lock
+                this.wait();
+            } catch (InterruptedException e) {
+                if(log.isInfoEnabled())
+                    log.info("TCP worker thread interrupted in cluster",e);
+                // clear interrupt status
+                Thread.interrupted();
+            }
+            if (key == null) {
+                continue;	// just in case
+            }
+            try {
+                drainChannel (key);
+            } catch (Exception e) {
+                //this is common, since the sockets on the other
+                //end expire after a certain time.
+                if ( e instanceof IOException ) {
+                    //dont spew out stack traces for IO exceptions unless debug is enabled.
+                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e);
+                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.");
+                } else if ( log.isErrorEnabled() ) {
+                    //this is a real error, log it.
+                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+                } 
+
+                // close channel and nudge selector
+                try {
+                    key.channel().close();
+                } catch (IOException ex) {
+                    log.error("Unable to close channel.",ex);
+                }
+                key.selector().wakeup();
+            }
+            key = null;
+            // done, ready for more, return to pool
+            this.pool.returnWorker (this);
+        }
+    }
+
+    /**
+     * Called to initiate a unit of work by this worker thread
+     * on the provided SelectionKey object.  This method is
+     * synchronized, as is the run() method, so only one key
+     * can be serviced at a given time.
+     * Before waking the worker thread, and before returning
+     * to the main selection loop, this key's interest set is
+     * updated to remove OP_READ.  This will cause the selector
+     * to ignore read-readiness for this channel while the
+     * worker thread is servicing it.
+     */
+    synchronized void serviceChannel (SelectionKey key, boolean sendAck)
+    {
+        this.key = key;
+        this.sendAck=sendAck;
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
+        key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
+        this.notify();		// awaken the thread
+    }
+
+    /**
+     * The actual code which drains the channel associated with
+     * the given key.  This method assumes the key has been
+     * modified prior to invocation to turn off selection
+     * interest in OP_READ.  When this method completes it
+     * re-enables OP_READ and calls wakeup() on the selector
+     * so the selector will resume watching this channel.
+     */
+    protected void drainChannel (SelectionKey key)
+        throws Exception
+    {
+        boolean packetReceived=false;
+        SocketChannel channel = (SocketChannel) key.channel();
+        int count;
+        buffer.clear();			// make buffer empty
+        ObjectReader reader = (ObjectReader)key.attachment();
+        // loop while data available, channel is non-blocking
+        while ((count = channel.read (buffer)) > 0) {
+            buffer.flip();		// make buffer readable
+            reader.append(buffer.array(),0,count);
+            buffer.clear();		// make buffer empty
+        }
+        //check to see if any data is available
+        int pkgcnt = reader.execute();
+        if (log.isTraceEnabled()) {
+            log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() );
+        }
+
+        
+        if (sendAck) {
+            while ( pkgcnt > 0 ) {
+                sendAck(key,channel);
+                pkgcnt--;
+            }
+        }
+        
+        if (count < 0) {
+            // close channel on EOF, invalidates the key
+            channel.close();
+            return;
+        }
+        
+        //acquire the interestOps mutex
+        Object mutex = this.getPool().getInterestOpsMutex();
+        synchronized (mutex) {
+            // cycle the selector so this key is active again
+            key.selector().wakeup();
+            // resume interest in OP_READ, OP_WRITE
+            int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+            key.interestOps(resumeOps);
+        }
+        
+    }
+
+    /**
+     * send a reply-acknowledgement (6,2,3)
+     * @param key
+     * @param channel
+     */
+    protected void sendAck(SelectionKey key, SocketChannel channel) {
+        
+        try {
+            channel.write(ByteBuffer.wrap(ACK_COMMAND));
+            if (log.isTraceEnabled()) {
+                log.trace("ACK sent to " + channel.socket().getPort());
+            }
+        } catch ( java.io.IOException x ) {
+            log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
+        }
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/ThreadPool.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,95 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * @author not attributable
+ * @version 1.0
+ */
+
+public class ThreadPool
+{
+    /**
+     * A very simple thread pool class.  The pool size is set at
+     * construction time and remains fixed.  Threads are cycled
+     * through a FIFO idle queue.
+     */
+
+    List idle = new LinkedList();
+    Object mutex = new Object();
+    Object interestOpsMutex = null;
+
+    ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex) throws Exception {
+        // fill up the pool with worker threads
+        this.interestOpsMutex = interestOpsMutex;
+        for (int i = 0; i < poolSize; i++) {
+            WorkerThread thread = (WorkerThread)threadClass.newInstance();
+            thread.setPool(this);
+
+            // set thread name for debugging, start it
+            thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
+            thread.setDaemon(true);
+            thread.setPriority(Thread.MAX_PRIORITY);
+            thread.start();
+
+            idle.add (thread);
+        }
+    }
+
+    /**
+     * Find an idle worker thread, if any.  Could return null.
+     */
+    WorkerThread getWorker()
+    {
+        WorkerThread worker = null;
+
+        
+        synchronized (mutex) {
+            while ( worker == null ) {
+                if (idle.size() > 0) {
+                    try {
+                        worker = (WorkerThread) idle.remove(0);
+                    } catch (java.util.NoSuchElementException x) {
+                        //this means that there are no available workers
+                        worker = null;
+                    }
+                } else {
+                    try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {}
+                }
+            }
+        }
+
+        return (worker);
+    }
+
+    /**
+     * Called by the worker thread to return itself to the
+     * idle pool.
+     */
+    void returnWorker (WorkerThread worker)
+    {
+        synchronized (mutex) {
+            idle.add (worker);
+            mutex.notify();
+        }
+    }
+    public Object getInterestOpsMutex() {
+        return interestOpsMutex;
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/tcp/WorkerThread.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,44 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+
+/**
+ * @author Filip Hanik
+ * @version $Revision: 366253 $ $Date: 2006-01-05 13:30:42 -0600 (Thu, 05 Jan 2006) $
+ */
+public class WorkerThread extends Thread
+{
+    protected ThreadPool pool;
+    protected boolean doRun = true;
+
+
+    public void setPool(ThreadPool pool) {
+        this.pool = pool;
+    }
+    
+    public ThreadPool getPool() {
+        return pool;
+    }
+
+    public void close()
+    {
+        doRun = false;
+        notify();
+
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message