tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r380209 [10/12] - in /tomcat/container/tc5.5.x/modules: groupcom/ groupcom/etc/ groupcom/src/ groupcom/src/share/ groupcom/src/share/org/ groupcom/src/share/org/apache/ groupcom/src/share/org/apache/catalina/ groupcom/src/share/org/apache/c...
Date Thu, 23 Feb 2006 19:55:25 GMT
Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,651 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.session;
+
+import java.io.IOException;
+
+import org.apache.catalina.LifecycleException;
+import org.apache.catalina.Session;
+import org.apache.catalina.cluster.CatalinaCluster;
+import org.apache.catalina.cluster.ClusterManager;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.realm.GenericPrincipal;
+import org.apache.catalina.session.StandardManager;
+
+/**
+ * Title:        Tomcat Session Replication for Tomcat 4.0 <BR>
+ * Description:  A very simple straight forward implementation of
+ *               session replication of servers in a cluster.<BR>
+ *               This session replication is implemented "live". By live
+ *               I mean, when a session attribute is added into a session on Node A
+ *               a message is broadcasted to other messages and setAttribute is called on the
+ *               replicated sessions.<BR>
+ *               A full description of this implementation can be found under
+ *               <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
+ *
+ * Copyright:    See apache license
+ * Company:      www.filip.net
+ * @author  <a href="mailto:mail@filip.net">Filip Hanik</a>
+ * @author Bela Ban (modifications for synchronous replication)
+ * @version 1.0 for TC 4.0
+ * Description: The InMemoryReplicationManager is a session manager that replicated
+ * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
+ * a communication protocol to ensure guaranteed and ordered message delivery.
+ * JavaGroups also provides a very flexible protocol stack to ensure that the replication
+ * can be used in any environment.
+ * <BR><BR>
+ * The InMemoryReplicationManager extends the StandardManager hence it allows for us
+ * to inherit all the basic session management features like expiration, session listeners etc
+ * <BR><BR>
+ * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
+ * all defined in the SessionMessage class.<BR>
+ * When a session is replicated (not an attribute added/removed) the session is serialized into
+ * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
+ */
+public class SimpleTcpReplicationManager extends StandardManager
+implements ClusterManager
+{
+    public static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( SimpleTcpReplicationManager.class );
+
+    //the channel configuration
+    protected String mChannelConfig = null;
+
+    //the group name
+    protected String mGroupName = "TomcatReplication";
+
+    //somehow start() gets called more than once
+    protected boolean mChannelStarted = false;
+
+    //log to screen
+    protected boolean mPrintToScreen = true;
+
+    protected boolean defaultMode = false;
+
+    protected boolean mManagerRunning = false;
+
+    /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
+     * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
+     * all responses.
+     */
+    protected boolean synchronousReplication=true;
+
+    /** Set to true if we don't want the sessions to expire on shutdown */
+    protected boolean mExpireSessionsOnShutdown = true;
+
+    protected boolean useDirtyFlag = false;
+
+    protected String name;
+
+    protected boolean distributable = true;
+
+    protected CatalinaCluster cluster;
+
+    protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
+
+    /**
+     * Flag to keep track if the state has been transferred or not
+     * Assumes false.
+     */
+    protected boolean stateTransferred = false;
+    private boolean notifyListenersOnReplication;
+    private boolean sendClusterDomainOnly = true ;
+
+    /**
+     * Constructor, just calls super()
+     *
+     */
+    public SimpleTcpReplicationManager()
+    {
+        super();
+    }
+
+    public boolean isSendClusterDomainOnly() {
+        return sendClusterDomainOnly;
+    }
+    
+    /**
+     * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
+     */
+    public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) {
+        this.sendClusterDomainOnly = sendClusterDomainOnly;
+    }
+  
+    /**
+     * @return Returns the defaultMode.
+     */
+    public boolean isDefaultMode() {
+        return defaultMode;
+    }
+    /**
+     * @param defaultMode The defaultMode to set.
+     */
+    public void setDefaultMode(boolean defaultMode) {
+        this.defaultMode = defaultMode;
+    }
+    
+    public boolean isManagerRunning()
+    {
+        return mManagerRunning;
+    }
+
+    public void setUseDirtyFlag(boolean usedirtyflag)
+    {
+        this.useDirtyFlag = usedirtyflag;
+    }
+
+    public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown)
+    {
+        mExpireSessionsOnShutdown = expireSessionsOnShutdown;
+    }
+
+    public void setCluster(CatalinaCluster cluster) {
+        if(log.isDebugEnabled())
+            log.debug("Cluster associated with SimpleTcpReplicationManager");
+        this.cluster = cluster;
+    }
+
+    public boolean getExpireSessionsOnShutdown()
+    {
+        return mExpireSessionsOnShutdown;
+    }
+
+    public void setPrintToScreen(boolean printtoscreen)
+    {
+        if(log.isDebugEnabled())
+            log.debug("Setting screen debug to:"+printtoscreen);
+        mPrintToScreen = printtoscreen;
+    }
+
+    public void setSynchronousReplication(boolean flag)
+    {
+        synchronousReplication=flag;
+    }
+
+    /**
+     * Override persistence since they don't go hand in hand with replication for now.
+     */
+    public void unload() throws IOException {
+        if ( !getDistributable() ) {
+            super.unload();
+        }
+    }
+
+    /**
+     * Creates a HTTP session.
+     * Most of the code in here is copied from the StandardManager.
+     * This is not pretty, yeah I know, but it was necessary since the
+     * StandardManager had hard coded the session instantiation to the a
+     * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
+     * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
+     * nodes in the cluster that this session has been created.
+     * @param notify - if set to true the other nodes in the cluster will be notified.
+     *                 This flag is needed so that we can create a session before we deserialize
+     *                 a replicated one
+     *
+     * @see ReplicatedSession
+     */
+    protected Session createSession(String sessionId, boolean notify, boolean setId)
+    {
+
+        //inherited from the basic manager
+        if ((getMaxActiveSessions() >= 0) &&
+           (sessions.size() >= getMaxActiveSessions()))
+            throw new IllegalStateException(sm.getString("standardManager.createSession.ise"));
+
+
+        Session session = new ReplicatedSession(this);
+
+        // Initialize the properties of the new session and return it
+        session.setNew(true);
+        session.setValid(true);
+        session.setCreationTime(System.currentTimeMillis());
+        session.setMaxInactiveInterval(this.maxInactiveInterval);
+        if(sessionId == null)
+            sessionId = generateSessionId();
+        if ( setId ) session.setId(sessionId);
+        if ( notify && (cluster!=null) ) {
+            ((ReplicatedSession)session).setIsDirty(true);
+        }
+        return (session);
+    }//createSession
+
+    //=========================================================================
+    // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
+    //=========================================================================
+
+    /**
+     * Construct and return a new session object, based on the default
+     * settings specified by this Manager's properties.  The session
+     * id will be assigned by this method, and available via the getId()
+     * method of the returned session.  If a new session cannot be created
+     * for any reason, return <code>null</code>.
+     *
+     * @exception IllegalStateException if a new session cannot be
+     *  instantiated for any reason
+     */
+    public Session createSession(String sessionId)
+    {
+        //create a session and notify the other nodes in the cluster
+        Session session =  createSession(sessionId,getDistributable(),true);
+        add(session);
+        return session;
+    }
+
+    public void sessionInvalidated(String sessionId) {
+        synchronized ( invalidatedSessions ) {
+            invalidatedSessions.put(sessionId, sessionId);
+        }
+    }
+
+    public String[] getInvalidatedSessions() {
+        synchronized ( invalidatedSessions ) {
+            String[] result = new String[invalidatedSessions.size()];
+            invalidatedSessions.values().toArray(result);
+            return result;
+        }
+
+    }
+
+    public ClusterMessage requestCompleted(String sessionId)
+    {
+        if (  !getDistributable() ) {
+            log.warn("Received requestCompleted message, although this context["+
+                     getName()+"] is not distributable. Ignoring message");
+            return null;
+        }
+        //notify javagroups
+        try
+        {
+            if ( invalidatedSessions.get(sessionId) != null ) {
+                synchronized ( invalidatedSessions ) {
+                    invalidatedSessions.remove(sessionId);
+                    SessionMessage msg = new SessionMessageImpl(name,
+                    SessionMessage.EVT_SESSION_EXPIRED,
+                    null,
+                    sessionId,
+                    sessionId);
+                return msg;
+                }
+            } else {
+                ReplicatedSession session = (ReplicatedSession) findSession(
+                    sessionId);
+                if (session != null) {
+                    //return immediately if the session is not dirty
+                    if (useDirtyFlag && (!session.isDirty())) {
+                        //but before we return doing nothing,
+                        //see if we should send
+                        //an updated last access message so that
+                        //sessions across cluster dont expire
+                        long interval = session.getMaxInactiveInterval();
+                        long lastaccdist = System.currentTimeMillis() -
+                            session.getLastAccessWasDistributed();
+                        if ( ((interval*1000) / lastaccdist)< 3 ) {
+                            SessionMessage accmsg = new SessionMessageImpl(name,
+                                SessionMessage.EVT_SESSION_ACCESSED,
+                                null,
+                                sessionId,
+                                sessionId);
+                            session.setLastAccessWasDistributed(System.currentTimeMillis());
+                            return accmsg;
+                        }
+                        return null;
+                    }
+
+                    session.setIsDirty(false);
+                    if (log.isDebugEnabled()) {
+                        try {
+                            log.debug("Sending session to cluster=" + session);
+                        }
+                        catch (Exception ignore) {}
+                    }
+                    SessionMessage msg = new SessionMessageImpl(name,
+                        SessionMessage.EVT_SESSION_CREATED,
+                        writeSession(session),
+                        session.getIdInternal(),
+                        session.getIdInternal());
+                    return msg;
+                } //end if
+            }//end if
+        }
+        catch (Exception x )
+        {
+            log.error("Unable to replicate session",x);
+        }
+        return null;
+    }
+
+    /**
+     * Serialize a session into a byte array<BR>
+     * This method simple calls the writeObjectData method on the session
+     * and returns the byte data from that call
+     * @param session - the session to be serialized
+     * @return a byte array containing the session data, null if the serialization failed
+     */
+    protected byte[] writeSession( Session session )
+    {
+        try
+        {
+            java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
+            java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(session_data);
+            session_out.flush();
+            boolean hasPrincipal = session.getPrincipal() != null;
+            session_out.writeBoolean(hasPrincipal);
+            if ( hasPrincipal )
+            {
+                session_out.writeObject(SerializablePrincipal.createPrincipal((GenericPrincipal)session.getPrincipal()));
+            }//end if
+            ((ReplicatedSession)session).writeObjectData(session_out);
+            return session_data.toByteArray();
+
+        }
+        catch ( Exception x )
+        {
+            log.error("Failed to serialize the session!",x);
+        }
+        return null;
+    }
+
+    /**
+     * Reinstantiates a serialized session from the data passed in.
+     * This will first call createSession() so that we get a fresh instance with all
+     * the managers set and all the transient fields validated.
+     * Then it calls Session.readObjectData(byte[]) to deserialize the object
+     * @param data - a byte array containing session data
+     * @return a valid Session object, null if an error occurs
+     *
+     */
+    protected Session readSession( byte[] data, String sessionId )
+    {
+        try
+        {
+            java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(data);
+            ReplicationStream session_in = new ReplicationStream(session_data,container.getLoader().getClassLoader());
+
+            Session session = sessionId!=null?this.findSession(sessionId):null;
+            boolean isNew = (session==null);
+            //clear the old values from the existing session
+            if ( session!=null ) {
+                ReplicatedSession rs = (ReplicatedSession)session;
+                rs.expire(false);  //cleans up the previous values, since we are not doing removes
+                session = null;
+            }//end if
+
+            if (session==null) {
+                session = createSession(null,false, false);
+                sessions.remove(session.getIdInternal());
+            }
+            
+            
+            boolean hasPrincipal = session_in.readBoolean();
+            SerializablePrincipal p = null;
+            if ( hasPrincipal )
+                p = (SerializablePrincipal)session_in.readObject();
+            ((ReplicatedSession)session).readObjectData(session_in);
+            if ( hasPrincipal )
+                session.setPrincipal(p.getPrincipal(getContainer().getRealm()));
+            ((ReplicatedSession)session).setId(sessionId,isNew);
+            ReplicatedSession rsession = (ReplicatedSession)session; 
+            rsession.setAccessCount(1);
+            session.setManager(this);
+            session.setValid(true);
+            rsession.setLastAccessedTime(System.currentTimeMillis());
+            rsession.setThisAccessedTime(System.currentTimeMillis());
+            ((ReplicatedSession)session).setAccessCount(0);
+            session.setNew(false);
+            if(log.isTraceEnabled())
+                 log.trace("Session loaded id="+sessionId +
+                               " actualId="+session.getId()+ 
+                               " exists="+this.sessions.containsKey(sessionId)+
+                               " valid="+rsession.isValid());
+            return session;
+
+        }
+        catch ( Exception x )
+        {
+            log.error("Failed to deserialize the session!",x);
+        }
+        return null;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+    /**
+     * Prepare for the beginning of active use of the public methods of this
+     * component.  This method should be called after <code>configure()</code>,
+     * and before any of the public methods of the component are utilized.<BR>
+     * Starts the cluster communication channel, this will connect with the other nodes
+     * in the cluster, and request the current session state to be transferred to this node.
+     * @exception IllegalStateException if this component has already been
+     *  started
+     * @exception LifecycleException if this component detects a fatal error
+     *  that prevents this component from being used
+     */
+    public void start() throws LifecycleException {
+        mManagerRunning = true;
+        super.start();
+        //start the javagroups channel
+        try {
+            //the channel is already running
+            if ( mChannelStarted ) return;
+            if(log.isInfoEnabled())
+                log.info("Starting clustering manager...:"+getName());
+            if ( cluster == null ) {
+                log.error("Starting... no cluster associated with this context:"+getName());
+                return;
+            }
+            cluster.addManager(getName(),this);
+
+            if (cluster.getMembers().length > 0) {
+                Member mbr = cluster.getMembers()[0];
+                SessionMessage msg =
+                    new SessionMessageImpl(this.getName(),
+                                       SessionMessage.EVT_GET_ALL_SESSIONS,
+                                       null,
+                                       "GET-ALL",
+                                       "GET-ALL-"+this.getName());
+                cluster.send(msg, mbr);
+                if(log.isWarnEnabled())
+                     log.warn("Manager["+getName()+"], requesting session state from "+mbr+
+                         ". This operation will timeout if no session state has been received within "+
+                         "60 seconds");
+                long reqStart = System.currentTimeMillis();
+                long reqNow = 0;
+                boolean isTimeout=false;
+                do {
+                    try {
+                        Thread.sleep(100);
+                    }catch ( Exception sleep) {}
+                    reqNow = System.currentTimeMillis();
+                    isTimeout=((reqNow-reqStart)>(1000*60));
+                } while ( (!isStateTransferred()) && (!isTimeout));
+                if ( isTimeout || (!isStateTransferred()) ) {
+                    log.error("Manager["+getName()+"], No session state received, timing out.");
+                }else {
+                    if(log.isInfoEnabled())
+                        log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms.");
+                }
+            } else {
+                if(log.isInfoEnabled())
+                    log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group.");
+            }//end if
+            mChannelStarted = true;
+        }  catch ( Exception x ) {
+            log.error("Unable to start SimpleTcpReplicationManager",x);
+        }
+    }
+
+    /**
+     * Gracefully terminate the active use of the public methods of this
+     * component.  This method should be the last one called on a given
+     * instance of this component.<BR>
+     * This will disconnect the cluster communication channel and stop the listener thread.
+     * @exception IllegalStateException if this component has not been started
+     * @exception LifecycleException if this component detects a fatal error
+     *  that needs to be reported
+     */
+    public void stop() throws LifecycleException
+    {
+        mManagerRunning = false;
+        mChannelStarted = false;
+        super.stop();
+        //stop the javagroup channel
+        try
+        {
+            this.sessions.clear();
+            cluster.removeManager(getName(),this);
+//            mReplicationListener.stopListening();
+//            mReplicationTransmitter.stop();
+//            service.stop();
+//            service = null;
+        }
+        catch ( Exception x )
+        {
+            log.error("Unable to stop SimpleTcpReplicationManager",x);
+        }
+    }
+
+    public void setDistributable(boolean dist) {
+        this.distributable = dist;
+    }
+
+    public boolean getDistributable() {
+        return distributable;
+    }
+
+    /**
+     * This method is called by the received thread when a SessionMessage has
+     * been received from one of the other nodes in the cluster.
+     * @param msg - the message received
+     * @param sender - the sender of the message, this is used if we receive a
+     *                 EVT_GET_ALL_SESSION message, so that we only reply to
+     *                 the requesting node
+     */
+    protected void messageReceived( SessionMessage msg, Member sender ) {
+        try  {
+            if(log.isInfoEnabled()) {
+                log.debug("Received SessionMessage of type="+msg.getEventTypeString());
+                log.debug("Received SessionMessage sender="+sender);
+            }
+            switch ( msg.getEventType() ) {
+                case SessionMessage.EVT_GET_ALL_SESSIONS: {
+                    //get a list of all the session from this manager
+                    Object[] sessions = findSessions();
+                    java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
+                    java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(bout);
+                    oout.writeInt(sessions.length);
+                    for (int i=0; i<sessions.length; i++){
+                        ReplicatedSession ses = (ReplicatedSession)sessions[i];
+                        oout.writeUTF(ses.getIdInternal());
+                        byte[] data = writeSession(ses);
+                        oout.writeObject(data);
+                    }//for
+                    //don't send a message if we don't have to
+                    oout.flush();
+                    oout.close();
+                    byte[] data = bout.toByteArray();
+                    SessionMessage newmsg = new SessionMessageImpl(name,
+                        SessionMessage.EVT_ALL_SESSION_DATA,
+                        data, "SESSION-STATE","SESSION-STATE-"+getName());
+                    cluster.send(newmsg, sender);
+                    break;
+                }
+                case SessionMessage.EVT_ALL_SESSION_DATA: {
+                    java.io.ByteArrayInputStream bin =
+                        new java.io.ByteArrayInputStream(msg.getSession());
+                    java.io.ObjectInputStream oin = new java.io.ObjectInputStream(bin);
+                    int size = oin.readInt();
+                    for ( int i=0; i<size; i++) {
+                        String id = oin.readUTF();
+                        byte[] data = (byte[])oin.readObject();
+                        Session session = readSession(data,id);
+                    }//for
+                    stateTransferred=true;
+                    break;
+                }
+                case SessionMessage.EVT_SESSION_CREATED: {
+                    Session session = this.readSession(msg.getSession(),msg.getSessionID());
+                    if ( log.isDebugEnabled() ) {
+                        log.debug("Received replicated session=" + session +
+                            " isValid=" + session.isValid());
+                    }
+                    break;
+                }
+                case SessionMessage.EVT_SESSION_EXPIRED: {
+                    Session session = findSession(msg.getSessionID());
+                    if ( session != null ) {
+                        session.expire();
+                        this.remove(session);
+                    }//end if
+                    break;
+                }
+                case SessionMessage.EVT_SESSION_ACCESSED :{
+                    Session session = findSession(msg.getSessionID());
+                    if ( session != null ) {
+                        session.access();
+                        session.endAccess();
+                    }
+                    break;
+                }
+                default:  {
+                    //we didn't recognize the message type, do nothing
+                    break;
+                }
+            }//switch
+        }
+        catch ( Exception x )
+        {
+            log.error("Unable to receive message through TCP channel",x);
+        }
+    }
+
+    public void messageDataReceived(ClusterMessage cmsg) {
+        try {
+            if ( cmsg instanceof SessionMessage ) {
+                SessionMessage msg = (SessionMessage)cmsg;
+                messageReceived(msg,
+                                msg.getAddress() != null ? (Member) msg.getAddress() : null);
+            }
+        } catch(Throwable ex){
+            log.error("InMemoryReplicationManager.messageDataReceived()", ex);
+        }//catch
+    }
+
+    public boolean isStateTransferred() {
+        return stateTransferred;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+    public boolean isNotifyListenersOnReplication() {
+        return notifyListenersOnReplication;
+    }
+    public void setNotifyListenersOnReplication(boolean notifyListenersOnReplication) {
+        this.notifyListenersOnReplication = notifyListenersOnReplication;
+    }
+
+
+    /* 
+     * @see org.apache.catalina.cluster.ClusterManager#getCluster()
+     */
+    public CatalinaCluster getCluster() {
+        return cluster;
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/mbeans-descriptors.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/mbeans-descriptors.xml?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/mbeans-descriptors.xml (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/mbeans-descriptors.xml Thu Feb 23 11:55:14 2006
@@ -0,0 +1,320 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mbeans-descriptors PUBLIC
+   "-//Apache Software Foundation//DTD Model MBeans Configuration File"
+   "http://jakarta.apache.org/commons/dtds/mbeans-descriptors.dtd">
+<mbeans-descriptors>
+    <mbean name="JvmRouteBinderValve" description="mod_jk jvmRoute jsessionid cookie backup correction" domain="Catalina"
+        group="Valve" type="org.apache.catalina.cluster.session.JvmRouteBinderValve">
+        <attribute name="className"
+               description="Fully qualified class name of the managed object"
+               type="java.lang.String"
+               writeable="false"/>        
+        <attribute name="info" 
+		           description="describe version" type="java.lang.String" writeable="false"/>
+        <attribute name="enabled" 
+		           description="enable a jvm Route check" type="boolean"/>
+        <attribute name="numberOfSessions"
+		           description="number of jvmRoute session corrections" type="long" writeable="false"/>
+        <attribute name="sessionIdAttribute" 
+		    description="Name of attribute with sessionid value before turnover a session" 
+		    type="java.lang.String" 
+		    />
+        <operation name="start" description="Stops the Cluster JvmRouteBinderValve" 
+		           impact="ACTION" returnType="void"/>
+        <operation name="stop" description="Stops the Cluster JvmRouteBinderValve" 
+		           impact="ACTION" returnType="void"/>
+    </mbean>
+	<mbean name="JvmRouteSessionIDBinderListener"
+		description="Monitors the jvmRoute activity"
+		domain="Catalina"
+        group="Listener"
+		type="org.apache.catalina.cluster.session.JvmRouteSessionIDBinderListener">
+        <attribute name="info" 
+		           description="describe version" type="java.lang.String" writeable="false"/>
+        <attribute name="numberOfSessions" 
+		    description="number of jvmRoute session corrections" 
+		    type="long" 
+		    writeable="false"/>
+    </mbean>
+    
+   <mbean        name="DeltaManager"
+          description="Cluster Manager implementation of the Manager interface"
+               domain="Catalina"
+                group="Manager"
+                 type="org.apache.catalina.cluster.session.DeltaManager">
+
+    <attribute   name="info" 
+		  description="describe version"
+		         type="java.lang.String"
+		    writeable="false"/>
+		    
+    <attribute   name="algorithm"
+          description="The message digest algorithm to be used when generating
+                       session identifiers"
+                 type="java.lang.String"/>
+                 
+    <attribute   name="randomFile"
+          description="File source of random - /dev/urandom or a pipe"
+                 type="java.lang.String"/>
+
+    <attribute   name="className"
+          description="Fully qualified class name of the managed object"
+                 type="java.lang.String"
+            writeable="false"/>
+
+    <attribute   name="distributable"
+          description="The distributable flag for Sessions created by this
+                       Manager"
+                 type="boolean"/>
+
+    <attribute   name="entropy"
+          description="A String initialization parameter used to increase the
+                       entropy of the initialization of our random number
+                       generator"
+                 type="java.lang.String"/>
+
+    <attribute   name="maxActiveSessions"
+          description="The maximum number of active Sessions allowed, or -1
+                       for no limit"
+                 type="int"/>
+
+    <attribute   name="maxInactiveInterval"
+          description="The default maximum inactive interval for Sessions
+                       created by this Manager"
+                 type="int"/>
+
+    <attribute name="processExpiresFrequency"
+               description="The frequency of the manager checks (expiration and passivation)"
+               type="int"/>
+               
+    <attribute   name="sessionIdLength"
+          description="The session id length (in bytes) of Sessions
+                       created by this Manager"
+                 type="int"/>
+
+    <attribute   name="name"
+          description="The descriptive name of this Manager implementation
+                       (for logging)"
+                 type="java.lang.String"
+            writeable="false"/>
+
+    <attribute   name="activeSessions"
+          description="Number of active sessions at this moment"
+                 type="int" 
+            writeable="false"/>
+
+    <attribute   name="sessionCounter"
+          description="Total number of sessions created by this manager"
+                 type="int" />
+
+    <attribute   name="sessionReplaceCounter"
+          description="Total number of replaced sessions that load from external nodes"
+                 type="long" />
+
+    <attribute   name="maxActive"
+          description="Maximum number of active sessions so far"
+                 type="int" />
+
+    <attribute   name="sessionMaxAliveTime"
+          description="Longest time an expired session had been alive"
+                 type="int" />
+
+    <attribute   name="sessionAverageAliveTime"
+          description="Average time an expired session had been alive"
+                 type="int" />
+
+    <attribute   name="sendClusterDomainOnly"
+                   is="true"
+          description="The sendClusterDomainOnly flag send sessions only to members as same cluster domain"
+                 type="boolean"/>
+
+    <attribute   name="rejectedSessions"
+          description="Number of sessions we rejected due to maxActive beeing reached"
+                 type="int" />
+
+    <attribute   name="expiredSessions"
+          description="Number of sessions that expired ( doesn't include explicit invalidations )"
+                 type="int" />
+
+    <attribute   name="stateTransferTimeout"
+          description="state transfer timeout in sec"
+                 type="int"/>
+
+    <attribute   name="processingTime"
+          description="Time spent doing housekeeping and expiration"
+                 type="long" />
+
+    <attribute   name="duplicates"
+          description="Number of duplicated session ids generated"
+                 type="int" />
+
+    <attribute   name="counterReceive_EVT_GET_ALL_SESSIONS"
+          description="Count receive EVT_GET_ALL_SESSIONS messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_ALL_SESSION_DATA"
+          description="Count receive EVT_ALL_SESSION_DATA messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_SESSION_CREATED"
+          description="Count receive EVT_SESSION_CREATED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_SESSION_DELTA"
+          description="Count receive EVT_SESSION_DELTA messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_SESSION_ACCESSED"
+          description="Count receive EVT_SESSION_ACCESSED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_SESSION_EXPIRED"
+          description="Count receive EVT_SESSION_EXPIRED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE"
+          description="Count receive EVT_ALL_SESSION_TRANSFERCOMPLETE messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_GET_ALL_SESSIONS"
+          description="Count send EVT_GET_ALL_SESSIONS messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_ALL_SESSION_DATA"
+          description="Count send EVT_ALL_SESSION_DATA messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_SESSION_CREATED"
+          description="Count send EVT_SESSION_CREATED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_SESSION_DELTA"
+          description="Count send EVT_SESSION_DELTA messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_SESSION_ACCESSED"
+          description="Count send EVT_SESSION_ACCESSED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_SESSION_EXPIRED"
+          description="Count send EVT_SESSION_EXPIRED messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE"
+          description="Count send EVT_ALL_SESSION_TRANSFERCOMPLETE messages"
+                 type="long"
+            writeable="false" />
+
+    <attribute   name="counterNoStateTransfered"
+          description="Count the failed session transfers noStateTransfered"
+                 type="int"
+            writeable="false" />
+            
+    <attribute   name="receivedQueueSize"
+          description="length of receive queue size when session received from other node"
+                 type="int"
+            writeable="false" />            
+
+    <attribute   name="expireSessionsOnShutdown"
+                   is="true"
+          description="exipre all sessions cluster wide as one node goes down"
+                 type="boolean" />
+
+    <attribute   name="notifyListenersOnReplication"
+                   is="true"
+          description="Send session attribute change events on backup nodes"
+                 type="boolean" />
+
+    <attribute   name="notifySessionListenersOnReplication"
+                   is="true"
+          description="Send session start/stop events on backup nodes"
+                 type="boolean" />
+
+    <attribute   name="sendAllSessions"
+                   is="true"
+          description="Send all sessions at one big block"
+                 type="boolean" />
+
+    <attribute   name="sendAllSessionsSize"
+          description="session block size when sendAllSessions=false (default=1000)"
+                 type="int" />
+
+    <attribute   name="sendAllSessionsWaitTime"
+          description="wait time between send session block (default 2 sec)"
+                 type="int" />
+
+    <operation   name="listSessionIds"
+          description="Return the list of active session ids"
+               impact="ACTION"
+           returnType="java.lang.String">
+    </operation>
+
+    <operation   name="getSessionAttribute"
+          description="Return a session attribute"
+               impact="ACTION"
+           returnType="java.lang.String">
+      <parameter name="sessionId"
+          description="Id of the session"
+                 type="java.lang.String"/>
+      <parameter name="key"
+          description="key of the attribute"
+                 type="java.lang.String"/>
+    </operation>
+
+    <operation   name="expireSession"
+          description="Expire a session"
+               impact="ACTION"
+           returnType="void">
+      <parameter name="sessionId"
+          description="Id of the session"
+                 type="java.lang.String"/>
+    </operation>
+
+    <operation   name="getLastAccessedTime"
+          description="Get the last access time"
+               impact="ACTION"
+           returnType="java.lang.String">
+      <parameter name="sessionId"
+          description="Id of the session"
+                 type="java.lang.String"/>
+    </operation>
+    
+	<operation name="expireAllLocalSessions"
+               description="Exipre all active local sessions and replicate the invalid sessions"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+
+	<operation name="processExpires"
+               description="force process to expire sessions"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+    
+	<operation name="resetStatistics"
+               description="Reset all statistics"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+	<operation name="getAllClusterSessions"
+               description="send to oldest cluster member that this node need all cluster sessions (resync member)"
+               impact="ACTION"
+               returnType="void">
+    </operation>
+
+  </mbean>
+
+</mbeans-descriptors>

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ClusterReceiverBase.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,346 @@
+/*
+ * Copyright 1999,2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.catalina.Container;
+import org.apache.catalina.cluster.CatalinaCluster;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.core.StandardHost;
+import org.apache.catalina.util.StringManager;
+
+/**
+* @author Filip Hanik
+* @author Peter Rossbach
+* @version $Revision: 379550 $ $Date: 2006-02-21 12:06:35 -0600 (Tue, 21 Feb 2006) $
+*/
+
+public class ClusterReceiverBase extends ReplicationListener {
+    
+    protected static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( ClusterReceiverBase.class );
+
+    /**
+     * The string manager for this package.
+     */
+    protected StringManager sm = StringManager.getManager(Constants.Package);
+
+    private CatalinaCluster cluster;
+    
+
+    /**
+     * total bytes to recevied
+     */
+    protected long totalReceivedBytes = 0;
+    
+    /**
+     * doProcessingStats
+     */
+    protected boolean doReceivedProcessingStats = false;
+
+    /**
+     * proessingTime
+     */
+    protected long receivedProcessingTime = 0;
+    
+    /**
+     * min proessingTime
+     */
+    protected long minReceivedProcessingTime = Long.MAX_VALUE ;
+
+    /**
+     * max proessingTime
+     */
+    protected long maxReceivedProcessingTime = 0;
+    
+    /**
+     * Sending Stats
+     */
+    private long nrOfMsgsReceived = 0;
+
+    private long receivedTime = 0;
+
+    private long lastChecked = System.currentTimeMillis();
+
+
+
+    /**
+     * Transmitter Mbean name
+     */
+    private ObjectName objectName;
+
+    /**
+     * @return Returns the doListen.
+     */
+    public boolean isDoListen() {
+        return doListen;
+    }
+
+    
+    public void setCatalinaCluster(CatalinaCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    public CatalinaCluster getCatalinaCluster() {
+        return (CatalinaCluster) cluster;
+    }
+    
+    /**
+     *  set Receiver ObjectName
+     * 
+     * @param name
+     */
+    public void setObjectName(ObjectName name) {
+        objectName = name;
+    }
+
+    /**
+     * Receiver ObjectName
+     * 
+     */
+    public ObjectName getObjectName() {
+        return objectName;
+    }
+    
+    
+    // ------------------------------------------------------------- stats
+
+    /**
+     * @return Returns the doReceivedProcessingStats.
+     */
+    public boolean isDoReceivedProcessingStats() {
+        return doReceivedProcessingStats;
+    }
+    /**
+     * @param doReceiverProcessingStats The doReceivedProcessingStats to set.
+     */
+    public void setDoReceivedProcessingStats(boolean doReceiverProcessingStats) {
+        this.doReceivedProcessingStats = doReceiverProcessingStats;
+    }
+    /**
+     * @return Returns the maxReceivedProcessingTime.
+     */
+    public long getMaxReceivedProcessingTime() {
+        return maxReceivedProcessingTime;
+    }
+    /**
+     * @return Returns the minReceivedProcessingTime.
+     */
+    public long getMinReceivedProcessingTime() {
+        return minReceivedProcessingTime;
+    }
+    /**
+     * @return Returns the receivedProcessingTime.
+     */
+    public long getReceivedProcessingTime() {
+        return receivedProcessingTime;
+    }
+    /**
+     * @return Returns the totalReceivedBytes.
+     */
+    public long getTotalReceivedBytes() {
+        return totalReceivedBytes;
+    }
+    
+    /**
+     * @return Returns the avg receivedProcessingTime/nrOfMsgsReceived.
+     */
+    public double getAvgReceivedProcessingTime() {
+        return ((double)receivedProcessingTime) / nrOfMsgsReceived;
+    }
+
+    /**
+     * @return Returns the avg totalReceivedBytes/nrOfMsgsReceived.
+     */
+    public long getAvgTotalReceivedBytes() {
+        return ((long)totalReceivedBytes) / nrOfMsgsReceived;
+    }
+
+    /**
+     * @return Returns the receivedTime.
+     */
+    public long getReceivedTime() {
+        return receivedTime;
+    }
+
+    /**
+     * @return Returns the lastChecked.
+     */
+    public long getLastChecked() {
+        return lastChecked;
+    }
+
+    /**
+     * @return Returns the nrOfMsgsReceived.
+     */
+    public long getNrOfMsgsReceived() {
+        return nrOfMsgsReceived;
+    }
+
+    /**
+     * start cluster receiver
+     * 
+     * @see org.apache.catalina.cluster.ClusterReceiver#start()
+     */
+    public void start() {
+        super.start();
+        registerReceiverMBean();
+    }
+
+ 
+    /**
+     * Stop accept
+     * 
+     * @see org.apache.catalina.cluster.ClusterReceiver#stop()
+     * @see #stopListening()
+     */
+    public void stop() {
+        super.stop();
+        unregisterRecevierMBean();
+     
+    }
+    
+    /**
+     * Register Recevier MBean
+     * <domain>:type=ClusterReceiver,host=<host>
+     */
+    protected void registerReceiverMBean() {
+        if (cluster != null && cluster instanceof SimpleTcpCluster) {
+            SimpleTcpCluster scluster = (SimpleTcpCluster) cluster;
+            ObjectName clusterName = scluster.getObjectName();
+            try {
+                MBeanServer mserver = scluster.getMBeanServer();
+                Container container = cluster.getContainer();
+                String name = clusterName.getDomain() + ":type=ClusterReceiver";
+                if (container instanceof StandardHost) {
+                    name += ",host=" + clusterName.getKeyProperty("host");
+                }
+                ObjectName receiverName = new ObjectName(name);
+                if (mserver.isRegistered(receiverName)) {
+                    if (log.isWarnEnabled())
+                        log.warn(sm.getString(
+                                "cluster.mbean.register.already",
+                                receiverName));
+                    return;
+                }
+                setObjectName(receiverName);
+                mserver.registerMBean(scluster.getManagedBean(this),getObjectName());
+            } catch (Exception e) {
+                log.warn("Unable to register JMX bean ClusterReceiverBase",e);
+            }
+        }
+    }
+   
+    /**
+     * UnRegister Recevier MBean
+     * <domain>:type=ClusterReceiver,host=<host>
+     */
+    protected void unregisterRecevierMBean() {
+        if (cluster != null && getObjectName() != null
+                && cluster instanceof SimpleTcpCluster) {
+            SimpleTcpCluster scluster = (SimpleTcpCluster) cluster;
+            try {
+                MBeanServer mserver = scluster.getMBeanServer();
+                mserver.unregisterMBean(getObjectName());
+            } catch (Exception e) {
+                log.error(e);
+            }
+        }
+    }
+
+    
+    
+
+    // --------------------------------------------------------- receiver messages
+
+    /**
+     * receiver Message from other node.
+     * All SessionMessage forward to ClusterManager and other message dispatch to all accept MessageListener.
+     *
+     * @see ClusterSessionListener#messageReceived(ClusterMessage)
+     */
+    public void messageDataReceived(ClusterData data) {
+    //public void messageDataReceived(byte[] data) {
+        long timeSent = 0 ;
+        if (doReceivedProcessingStats) {
+            timeSent = System.currentTimeMillis();
+        }
+        try {
+            ClusterMessage message = deserialize(data);
+            // calc stats really received bytes
+            totalReceivedBytes += data.getMessage().length;
+            //totalReceivedBytes += data.length;
+            nrOfMsgsReceived++;
+            cluster.receive(message);
+        } catch (Exception x) {
+            log
+                    .error(
+                            "Unable to deserialize session message or unexpected exception from message listener.",
+                            x);
+        } finally {
+            if (doReceivedProcessingStats) {
+                addReceivedProcessingStats(timeSent);
+            }
+        }
+    }
+
+    
+    
+    // --------------------------------------------- Performance Stats
+
+    /**
+     * Reset sender statistics
+     */
+    public synchronized void resetStatistics() {
+        nrOfMsgsReceived = 0;
+        totalReceivedBytes = 0;
+        minReceivedProcessingTime = Long.MAX_VALUE ;
+        maxReceivedProcessingTime = 0 ;
+        receivedProcessingTime = 0 ;
+        receivedTime = 0 ;
+    }
+
+    /**
+     * Add receiver processing stats times
+     * @param startTime
+     */
+    protected void addReceivedProcessingStats(long startTime) {
+        long current = System.currentTimeMillis() ;
+        long time = current - startTime ;
+        synchronized(this) {
+            if(time < minReceivedProcessingTime)
+                minReceivedProcessingTime = time ;
+            if( time > maxReceivedProcessingTime)
+                maxReceivedProcessingTime = time ;
+            receivedProcessingTime += time ;
+        }
+        if (log.isDebugEnabled()) {
+            if ((current - lastChecked) > 5000) {
+                log.debug("Calc msg send time total=" + receivedTime
+                        + "ms num request=" + nrOfMsgsReceived
+                        + " average per msg="
+                        + (receivedTime / nrOfMsgsReceived) + "ms.");
+                lastChecked=current ;
+            }
+        }
+    }
+    
+    
+
+}

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/Constants.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/Constants.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/Constants.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/Constants.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.cluster.tcp;
+
+/**
+ * Manifest constants for the <code>org.apache.catalina.cluster.tcp</code>
+ * package.
+ *
+ * @author Peter Rossbach
+ * @version $Revision: 303753 $ $Date: 2005-03-14 15:24:30 -0600 (Mon, 14 Mar 2005) $
+ */
+
+public class Constants {
+
+    public static final String Package = "org.apache.catalina.cluster.tcp";
+
+}

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Thu Feb 23 11:55:14 2006
@@ -0,0 +1,72 @@
+AsyncSocketSender.create.thread=Create sender [{0}:{1,number,integer}] queue thread to tcp background replication
+AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] id=[{2}] size={3}
+AsyncSocketSender.send.error=Unable to asynchronously send session with id=[{0}] - message will be ignored.
+AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] returned null element!
+cluster.mbean.register.already=MBean {0} already registered!
+FastAsyncSocketSender.setThreadPriority=[{0}:{1,number,integer}] set priority to {2}
+FastAsyncSocketSender.min.exception=[{0}:{1,number,integer}] new priority {2} < MIN_PRIORITY
+FastAsyncSocketSender.max.exception=[{0}:{1,number,integer}] new priority {2} > MAX_PRIORITY
+IDataSender.ack.eof=EOF reached at local port [{0}:{1,number,integer}]
+IDataSender.ack.receive=Got ACK at local port [{0}:{1,number,integer}]
+IDataSender.ack.missing=Unable to read acknowledgement from [{0}:{1,number,integer}] in {2,number,integer} ms. Disconnecting socket, and trying again.
+IDataSender.ack.read=Read wait ack char '{2}' [{0}:{1,number,integer}]
+IDataSender.ack.start=Waiting for ACK message [{0}:{1,number,integer}]
+IDataSender.ack.wrong=Missing correct ACK after 10 bytes read at local port [{0}:{1,number,integer}]
+IDataSender.closeSocket=Sender close socket to [{0}:{1,number,integer}] (close count {2,number,integer})
+IDataSender.connect=Sender connect to [{0}:{1,number,integer}] (connect count {2,number,integer})
+IDataSender.create=Create sender [{0}:{1,number,integer}]
+IDataSender.disconnect=Sender disconnect from [{0}:{1,number,integer}] (disconnect count {2,number,integer})
+IDataSender.message.disconnect=Message transfered: Sender can't disconnect from [{0}:{1,number,integer}]
+IDataSender.message.create=Message transfered: Sender can't create current socket [{0}:{1,number,integer}]
+IDataSender.openSocket=Sender open socket to [{0}:{1,number,integer}] (open count {2,number,integer})
+IDataSender.openSocket.failure=Open sender socket [{0}:{1,number,integer}] failure! (open failure count {2,number,integer})
+IDataSender.send.again=Send data again to [{0}:{1,number,integer}]
+IDataSender.send.crash=Send message crashed [{0}:{1,number,integer}] type=[{2}], id=[{3}]
+IDataSender.send.message=Send message to [{0}:{1,number,integer}] id=[{2}] size={3,number,integer}
+IDataSender.send.lost=Message lost: [{0}:{1,number,integer}] type=[{2}], id=[{3}]
+IDataSender.senderModes.Configured=Configured a data replication sender for mode {0}
+IDataSender.senderModes.Instantiate=Can't instantiate a data replication sender of class {0}
+IDataSender.senderModes.Missing=Can't configure a data replication sender for mode {0}
+IDataSender.senderModes.Resources=Can't load data replication sender mapping list
+IDataSender.stats=Send stats from [{0}:{1,number,integer}], Nr of bytes sent={2,number,integer} over {3} = {4,number,integer} bytes/request, processing time {5,number,integer} msec, avg processing time {6,number,integer} msec
+PoolSocketSender.senderQueue.sender.failed=PoolSocketSender create new sender to [{0}:{1,number,integer}] failed
+PoolSocketSender.noMoreSender=No socket sender available for client [{0}:{1,number,integer}] did it disappeared?
+ReplicationTransmitter.getProperty=get property {0}
+ReplicationTransmitter.setProperty=set property {0}: {1} old value {2}
+ReplicationTransmitter.started=Start ClusterSender at cluster {0} with name {1}
+ReplicationTransmitter.stopped=Stopped ClusterSender at cluster {0} with name {1}
+ReplicationValve.crossContext.add=add Cross Context session replication container to replicationValve threadlocal
+ReplicationValve.crossContext.registerSession=register Cross context session id={0} from context {1}
+ReplicationValve.crossContext.remove=remove Cross Context session replication container from replicationValve threadlocal
+ReplicationValve.crossContext.sendDelta=send Cross Context session delta from context {0}.
+ReplicationValve.filter.loading=Loading request filters={0}
+ReplicationValve.filter.token=Request filter={0}
+ReplicationValve.filter.token.failure=Unable to compile filter={0}
+ReplicationValve.invoke.uri=Invoking replication request on {0}
+ReplicationValve.nocluster=No cluster configured for this request.
+ReplicationValve.resetDeltaRequest=Cluster is standalone: reset Session Request Delta at context {0}
+ReplicationValve.send.failure=Unable to perform replication request.
+ReplicationValve.send.invalid.failure=Unable to send session [id={0}] invalid message over cluster.
+ReplicationValve.session.found=Context {0}: Found session {1} but it isn't a ClusterSession.
+ReplicationValve.session.indicator=Context {0}: Primarity of session {0} in request attribute {1} is {2}.
+ReplicationValve.session.invalid=Context {0}: Requested session {1} is invalid, removed or not replicated at this node.
+ReplicationValve.stats=Average request time= {0} ms for Cluster overhead time={1} ms for {2} requests {3} filter requests {4} send requests {5} cross context requests (Request={6} ms Cluster={7} ms).
+SimpleTcpCluster.event.log=Cluster receive listener event {0} with data {1}
+SimpleTcpCluster.getProperty=get property {0}
+SimpleTcpCluster.setProperty=set property {0}: {1} old value {2}
+SimpleTcpCluster.default.addClusterListener=Add Default ClusterListener at cluster {0}
+SimpleTcpCluster.default.addClusterValves=Add Default ClusterValves at cluster {0}
+SimpleTcpCluster.default.addClusterReceiver=Add Default ClusterReceiver at cluster {0}
+SimpleTcpCluster.default.addClusterSender=Add Default ClusterSender at cluster {0}
+SimpleTcpCluster.default.addMembershipService=Add Default Membership Service at cluster {0}
+SimpleTcpCluster.log.receive=RECEIVE {0,date}:{0,time} {1,number} {2}:{3,number,integer} {4} {5}
+SimpleTcpCluster.log.send=SEND {0,date}:{0,time} {1,number} {2}:{3,number,integer} {4} 
+SimpleTcpCluster.log.send.all=SEND {0,date}:{0,time} {1,number} - {2}
+SocketReplictionListener.allreadyExists=ServerSocket [{0}:{1}] allready started!
+SocketReplictionListener.accept.failure=ServerSocket [{0}:{1}] - Exception to start thread or accept server socket
+SocketReplictionListener.open=Open Socket at [{0}:{1}]
+SocketReplictionListener.openclose.failure=ServerSocket [{0}:{1}] - Exception to open or close server socket
+SocketReplictionListener.portbusy=Port busy at [{0}:{i}] - reason [{2}]
+SocketReplictionListener.serverSocket.notExists=Fatal error: Receiver socket not bound address={0} port={1} maxport={2}
+SocketReplictionListener.timeout=Receiver ServerSocket no started [{0}:{1}] - reason: timeout={2} or listen={3}
+SocketReplictionListener.unlockSocket.failure=UnLocksocket failure at ServerSocket [{0:{1}]

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,654 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import java.util.regex.Pattern;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import javax.servlet.ServletException;
+
+import org.apache.catalina.Manager;
+import org.apache.catalina.Session;
+import org.apache.catalina.Context;
+import org.apache.catalina.core.StandardContext;
+import org.apache.catalina.cluster.CatalinaCluster;
+import org.apache.catalina.cluster.ClusterManager;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterSession;
+import org.apache.catalina.cluster.ClusterValve;
+import org.apache.catalina.cluster.session.DeltaManager;
+import org.apache.catalina.cluster.session.DeltaSession;
+import org.apache.catalina.connector.Request;
+import org.apache.catalina.connector.Response;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.valves.ValveBase;
+
+/**
+ * <p>Implementation of a Valve that logs interesting contents from the
+ * specified Request (before processing) and the corresponding Response
+ * (after processing).  It is especially useful in debugging problems
+ * related to headers and cookies.</p>
+ *
+ * <p>This Valve may be attached to any Container, depending on the granularity
+ * of the logging you wish to perform.</p>
+ *
+ * <p>primaryIndicator=true, then the request attribute <i>org.apache.catalina.cluster.tcp.isPrimarySession.</i>
+ * is set true, when request processing is at sessions primary node.
+ * </p>
+ *
+ * @author Craig R. McClanahan
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 375709 $ $Date: 2006-02-07 15:13:25 -0600 (Tue, 07 Feb 2006) $
+ */
+
+public class ReplicationValve
+    extends ValveBase implements ClusterValve {
+    
+    private static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( ReplicationValve.class );
+
+    // ----------------------------------------------------- Instance Variables
+
+    /**
+     * The descriptive information related to this implementation.
+     */
+    private static final String info =
+        "org.apache.catalina.cluster.tcp.ReplicationValve/2.0";
+
+
+    /**
+     * The StringManager for this package.
+     */
+    protected static StringManager sm =
+        StringManager.getManager(Constants.Package);
+
+    private CatalinaCluster cluster = null ;
+
+    /**
+     * holds file endings to not call for like images and others
+     */
+    protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
+    
+    /**
+     * Orginal filter 
+     */
+    protected String filter ;
+    
+    /**
+     * crossContext session container 
+     */
+    protected ThreadLocal crossContextSessions = new ThreadLocal() ;
+    
+    /**
+     * doProcessingStats (default = off)
+     */
+    protected boolean doProcessingStats = false;
+    
+    protected long totalRequestTime = 0;
+    protected long totalSendTime = 0;
+    protected long nrOfRequests = 0;
+    protected long lastSendTime = 0;
+    protected long nrOfFilterRequests = 0;
+    protected long nrOfSendRequests = 0;
+    protected long nrOfCrossContextSendRequests = 0;
+    
+    /**
+     * must primary change indicator set 
+     */
+    protected boolean primaryIndicator = false ;
+    
+    /**
+     * Name of primary change indicator as request attribute
+     */
+    protected String primaryIndicatorName = "org.apache.catalina.cluster.tcp.isPrimarySession";
+   
+    // ------------------------------------------------------------- Properties
+
+    public ReplicationValve() {
+    }
+    
+    /**
+     * Return descriptive information about this Valve implementation.
+     */
+    public String getInfo() {
+
+        return (info);
+
+    }
+    
+    /**
+     * @return Returns the cluster.
+     */
+    public CatalinaCluster getCluster() {
+        return cluster;
+    }
+    
+    /**
+     * @param cluster The cluster to set.
+     */
+    public void setCluster(CatalinaCluster cluster) {
+        this.cluster = cluster;
+    }
+ 
+    /**
+     * @return Returns the filter
+     */
+    public String getFilter() {
+       return filter ;
+    }
+
+    /**
+     * compile filter string to regular expressions
+     * @see Pattern#compile(java.lang.String)
+     * @param filter
+     *            The filter to set.
+     */
+    public void setFilter(String filter) {
+        if (log.isDebugEnabled())
+            log.debug(sm.getString("ReplicationValve.filter.loading", filter));
+        this.filter = filter;
+        StringTokenizer t = new StringTokenizer(filter, ";");
+        this.reqFilters = new Pattern[t.countTokens()];
+        int i = 0;
+        while (t.hasMoreTokens()) {
+            String s = t.nextToken();
+            if (log.isTraceEnabled())
+                log.trace(sm.getString("ReplicationValve.filter.token", s));
+            try {
+                reqFilters[i++] = Pattern.compile(s);
+            } catch (Exception x) {
+                log.error(sm.getString("ReplicationValve.filter.token.failure",
+                        s), x);
+            }
+        }
+    }
+
+    /**
+     * @return Returns the primaryIndicator.
+     */
+    public boolean isPrimaryIndicator() {
+        return primaryIndicator;
+    }
+
+    /**
+     * @param primaryIndicator The primaryIndicator to set.
+     */
+    public void setPrimaryIndicator(boolean primaryIndicator) {
+        this.primaryIndicator = primaryIndicator;
+    }
+    
+    /**
+     * @return Returns the primaryIndicatorName.
+     */
+    public String getPrimaryIndicatorName() {
+        return primaryIndicatorName;
+    }
+    
+    /**
+     * @param primaryIndicatorName The primaryIndicatorName to set.
+     */
+    public void setPrimaryIndicatorName(String primaryIndicatorName) {
+        this.primaryIndicatorName = primaryIndicatorName;
+    }
+    
+    /**
+     * Calc processing stats
+     */
+    public boolean isDoProcessingStats() {
+        return doProcessingStats;
+    }
+
+    /**
+     * Set Calc processing stats
+     * @see #resetStatistics()
+     */
+    public void setDoProcessingStats(boolean doProcessingStats) {
+        this.doProcessingStats = doProcessingStats;
+    }
+
+    /**
+     * @return Returns the lastSendTime.
+     */
+    public long getLastSendTime() {
+        return lastSendTime;
+    }
+    
+    /**
+     * @return Returns the nrOfRequests.
+     */
+    public long getNrOfRequests() {
+        return nrOfRequests;
+    }
+    
+    /**
+     * @return Returns the nrOfFilterRequests.
+     */
+    public long getNrOfFilterRequests() {
+        return nrOfFilterRequests;
+    }
+
+    /**
+     * @return Returns the nrOfCrossContextSendRequests.
+     */
+    public long getNrOfCrossContextSendRequests() {
+        return nrOfCrossContextSendRequests;
+    }
+
+    /**
+     * @return Returns the nrOfSendRequests.
+     */
+    public long getNrOfSendRequests() {
+        return nrOfSendRequests;
+    }
+
+    /**
+     * @return Returns the totalRequestTime.
+     */
+    public long getTotalRequestTime() {
+        return totalRequestTime;
+    }
+    
+    /**
+     * @return Returns the totalSendTime.
+     */
+    public long getTotalSendTime() {
+        return totalSendTime;
+    }
+
+    /**
+     * @return Returns the reqFilters.
+     */
+    protected java.util.regex.Pattern[] getReqFilters() {
+        return reqFilters;
+    }
+    
+    /**
+     * @param reqFilters The reqFilters to set.
+     */
+    protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
+        this.reqFilters = reqFilters;
+    }
+    
+    
+    // --------------------------------------------------------- Public Methods
+    
+    /**
+     * Register all cross context sessions inside endAccess.
+     * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
+     * different applications with session changes.
+     *
+     * @param session cross context session
+     */
+    public void registerReplicationSession(DeltaSession session) {
+        List sessions = (List)crossContextSessions.get();
+        if(sessions != null) {
+            if(!sessions.contains(session)) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.registerSession",
+                        session.getIdInternal(),
+                        session.getManager().getContainer().getName()));
+                sessions.add(session);
+            }
+        }
+    }
+
+    /**
+     * Log the interesting request parameters, invoke the next Valve in the
+     * sequence, and log the interesting response parameters.
+     *
+     * @param request The servlet request to be processed
+     * @param response The servlet response to be created
+     *
+     * @exception IOException if an input/output error occurs
+     * @exception ServletException if a servlet error occurs
+     */
+    public void invoke(Request request, Response response)
+        throws IOException, ServletException
+    {
+        long totalstart = 0;
+
+        //this happens before the request
+        if(isDoProcessingStats()) {
+            totalstart = System.currentTimeMillis();
+        }
+        if (primaryIndicator) {
+            createPrimaryIndicator(request) ;
+        }
+        Context context = request.getContext();
+        boolean isCrossContext = context != null
+                && context instanceof StandardContext
+                && ((StandardContext) context).getCrossContext();
+        try {
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.add"));
+                //FIXME add Pool of Arraylists
+                crossContextSessions.set(new ArrayList());
+            }
+            getNext().invoke(request, response);
+            Manager manager = request.getContext().getManager();
+            if (manager != null && manager instanceof ClusterManager) {
+                ClusterManager clusterManager = (ClusterManager) manager;
+                CatalinaCluster containerCluster = (CatalinaCluster) getContainer().getCluster();
+                if (containerCluster == null) {
+                    if (log.isWarnEnabled())
+                        log.warn(sm.getString("ReplicationValve.nocluster"));
+                    return;
+                }
+                // valve cluster can access manager - other cluster handle replication 
+                // at host level - hopefully!
+                if(containerCluster.getManager(clusterManager.getName()) == null)
+                    return ;
+                if(containerCluster.hasMembers()) {
+                    sendReplicationMessage(request, totalstart, isCrossContext, clusterManager, containerCluster);
+                } else {
+                    resetReplicationRequest(request,isCrossContext);
+                }        
+            }
+        } finally {
+            // Array must be remove: Current master request send endAccess at recycle. 
+            // Don't register this request session again!
+            if(isCrossContext) {
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.remove"));
+                // crossContextSessions.remove() only exist at Java 5
+                // register ArrayList at a pool
+                crossContextSessions.set(null);
+            }
+        }
+    }
+
+    
+    /**
+     * reset the active statitics 
+     */
+    public void resetStatistics() {
+        totalRequestTime = 0 ;
+        totalSendTime = 0 ;
+        lastSendTime = 0 ;
+        nrOfFilterRequests = 0 ;
+        nrOfRequests = 0 ;
+        nrOfSendRequests = 0;
+        nrOfCrossContextSendRequests = 0;
+    }
+    
+    /**
+     * Return a String rendering of this object.
+     */
+    public String toString() {
+
+        StringBuffer sb = new StringBuffer("ReplicationValve[");
+        if (container != null)
+            sb.append(container.getName());
+        sb.append("]");
+        return (sb.toString());
+
+    }
+
+    // --------------------------------------------------------- Protected Methods
+
+    /**
+     * @param request
+     * @param totalstart
+     * @param isCrossContext
+     * @param clusterManager
+     * @param containerCluster
+     */
+    protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, ClusterManager clusterManager, CatalinaCluster containerCluster) {
+        //this happens after the request
+        long start = 0;
+        if(isDoProcessingStats()) {
+            start = System.currentTimeMillis();
+        }
+        try {
+            // send invalid sessions
+            // DeltaManager returns String[0]
+            if (!(clusterManager instanceof DeltaManager))
+                sendInvalidSessions(clusterManager, containerCluster);
+            // send replication
+            sendSessionReplicationMessage(request, clusterManager, containerCluster);
+            if(isCrossContext)
+                sendCrossContextSession(containerCluster);
+        } catch (Exception x) {
+            // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
+            log.error(sm.getString("ReplicationValve.send.failure"), x);
+        } finally {
+            // FIXME this stats update are not cheap!!
+            if(isDoProcessingStats()) {
+                updateStats(totalstart,start);
+            }
+        }
+    }
+
+    /**
+     * Send all changed cross context sessions to backups
+     * @param containerCluster
+     */
+    protected void sendCrossContextSession(CatalinaCluster containerCluster) {
+        Object sessions = crossContextSessions.get();
+        if(sessions != null && sessions instanceof List
+                && ((List)sessions).size() >0) {
+            for(Iterator iter = ((List)sessions).iterator(); iter.hasNext() ;) {          
+                Session session = (Session)iter.next();
+                if(log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.crossContext.sendDelta",  
+                            session.getManager().getContainer().getName() ));
+                sendMessage(session,(ClusterManager)session.getManager(),containerCluster);
+                if(isDoProcessingStats()) {
+                    nrOfCrossContextSendRequests++;
+                }
+            }
+        }
+    }
+  
+    /**
+     * Fix memory leak for long sessions with many changes, when no backup member exists!
+     * @param request current request after responce is generated
+     * @param isCrossContext check crosscontext threadlocal
+     */
+    protected void resetReplicationRequest(Request request, boolean isCrossContext) {
+        Session contextSession = request.getSessionInternal(false);
+        if(contextSession != null & contextSession instanceof DeltaSession){
+            resetDeltaRequest(contextSession);
+        }
+        if(isCrossContext) {
+            Object sessions = crossContextSessions.get();
+            if(sessions != null && sessions instanceof List
+               && ((List)sessions).size() >0) {
+                Iterator iter = ((List)sessions).iterator();
+                for(; iter.hasNext() ;) {          
+                    Session session = (Session)iter.next();
+                    resetDeltaRequest(session);
+                }
+            }
+        }                     
+    }
+
+    /**
+     * Reset DeltaRequest from session
+     * @param session HttpSession from current request or cross context session
+     */
+    protected void resetDeltaRequest(Session session) {
+        if(log.isDebugEnabled()) {
+            log.debug(sm.getString("ReplicationValve.resetDeltaRequest" , 
+                session.getManager().getContainer().getName() ));
+        }
+        ((DeltaSession)session).resetDeltaRequest();
+    }
+
+    /**
+     * Send Cluster Replication Request
+     * @param request current request
+     * @param manager session manager
+     * @param cluster replication cluster
+     */
+    protected void sendSessionReplicationMessage(Request request,
+            ClusterManager manager, CatalinaCluster cluster) {
+        Session session = request.getSessionInternal(false);
+        if (session != null) {
+            String uri = request.getDecodedRequestURI();
+            // request without session change
+            if (!isRequestWithoutSessionChange(uri)) {
+                if (log.isDebugEnabled())
+                    log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
+                sendMessage(session,manager,cluster);
+            } else
+                if(isDoProcessingStats())
+                    nrOfFilterRequests++;
+        }
+
+    }
+
+   /**
+    * Send message delta message from request session 
+    * @param request current request
+    * @param manager session manager
+    * @param cluster replication cluster
+    */
+    protected void sendMessage(Session session,
+             ClusterManager manager, CatalinaCluster cluster) {
+        String id = session.getIdInternal();
+        if (id != null) {
+            send(manager, cluster, id);
+        }
+    }
+
+    /**
+     * send manager requestCompleted message to cluster
+     * @param manager SessionManager
+     * @param cluster replication cluster
+     * @param sessionId sessionid from the manager
+     * @see DeltaManager#requestCompleted(String)
+     * @see SimpleTcpCluster#send(ClusterMessage)
+     */
+    protected void send(ClusterManager manager, CatalinaCluster cluster, String sessionId) {
+        ClusterMessage msg = manager.requestCompleted(sessionId);
+        if (msg != null) {
+            if(manager.isSendClusterDomainOnly()) {
+                cluster.sendClusterDomain(msg);
+            } else {
+                cluster.send(msg);
+            }
+            if(isDoProcessingStats())
+                nrOfSendRequests++;
+        }
+    }
+    
+    /**
+     * check for session invalidations
+     * @param manager
+     * @param cluster
+     */
+    protected void sendInvalidSessions(ClusterManager manager, CatalinaCluster cluster) {
+        String[] invalidIds=manager.getInvalidatedSessions();
+        if ( invalidIds.length > 0 ) {
+            for ( int i=0;i<invalidIds.length; i++ ) {
+                try {
+                    send(manager,cluster,invalidIds[i]);
+                } catch ( Exception x ) {
+                    log.error(sm.getString("ReplicationValve.send.invalid.failure",invalidIds[i]),x);
+                }
+            }
+        }
+    }
+    
+    /**
+     * is request without possible session change
+     * @param uri The request uri
+     * @return True if no session change
+     */
+    protected boolean isRequestWithoutSessionChange(String uri) {
+
+        boolean filterfound = false;
+
+        for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
+            java.util.regex.Matcher matcher = reqFilters[i].matcher(uri);
+            filterfound = matcher.matches();
+        }
+        return filterfound;
+    }
+
+    /**
+     * protocol cluster replications stats
+     * @param requestTime
+     * @param clusterTime
+     */
+    protected  void updateStats(long requestTime, long clusterTime) {
+        synchronized(this) {
+            lastSendTime=System.currentTimeMillis();
+            totalSendTime+=lastSendTime - clusterTime;
+            totalRequestTime+=lastSendTime - requestTime;
+            nrOfRequests++;
+        }
+        if(log.isInfoEnabled()) {
+            if ( (nrOfRequests % 100) == 0 ) {
+                 log.info(sm.getString("ReplicationValve.stats",
+                     new Object[]{
+                         new Long(totalRequestTime/nrOfRequests),
+                         new Long(totalSendTime/nrOfRequests),
+                         new Long(nrOfRequests),
+                         new Long(nrOfSendRequests),
+                         new Long(nrOfCrossContextSendRequests),
+                         new Long(nrOfFilterRequests),
+                         new Long(totalRequestTime),
+                         new Long(totalSendTime)}));
+             }
+        }
+    }
+
+
+    /**
+     * Mark Request that processed at primary node with attribute
+     * primaryIndicatorName
+     * 
+     * @param request
+     * @throws IOException
+     */
+    protected void createPrimaryIndicator(Request request) throws IOException {
+        String id = request.getRequestedSessionId();
+        if ((id != null) && (id.length() > 0)) {
+            Manager manager = request.getContext().getManager();
+            Session session = manager.findSession(id);
+            if (session instanceof ClusterSession) {
+                ClusterSession cses = (ClusterSession) session;
+                if (cses != null) {
+                    Boolean isPrimary = new Boolean(cses.isPrimarySession());
+                    if (log.isDebugEnabled())
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.indicator", request.getContext().getName(),id,
+                                primaryIndicatorName, isPrimary));
+                    request.setAttribute(primaryIndicatorName, isPrimary);
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    if (session != null) {
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.found", request.getContext().getName(),id));
+                    } else {
+                        log.debug(sm.getString(
+                                "ReplicationValve.session.invalid", request.getContext().getName(),id));
+                    }
+                }
+            }
+        }
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java (added)
+++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/tcp/SendMessageData.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,81 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.catalina.cluster.tcp;
+
+import org.apache.catalina.cluster.Member;
+
+/**
+ * @author Peter Rossbach
+ * @version $Revision: 303987 $ $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $
+ */
+public class SendMessageData {
+
+    private Object message ;
+    private Member destination ;
+    private Exception exception ;
+    
+    
+    /**
+     * @param message
+     * @param destination
+     * @param exception
+     */
+    public SendMessageData(Object message, Member destination,
+            Exception exception) {
+        super();
+        this.message = message;
+        this.destination = destination;
+        this.exception = exception;
+    }
+    
+    /**
+     * @return Returns the destination.
+     */
+    public Member getDestination() {
+        return destination;
+    }
+    /**
+     * @param destination The destination to set.
+     */
+    public void setDestination(Member destination) {
+        this.destination = destination;
+    }
+    /**
+     * @return Returns the exception.
+     */
+    public Exception getException() {
+        return exception;
+    }
+    /**
+     * @param exception The exception to set.
+     */
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+    /**
+     * @return Returns the message.
+     */
+    public Object getMessage() {
+        return message;
+    }
+    /**
+     * @param message The message to set.
+     */
+    public void setMessage(Object message) {
+        this.message = message;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message