Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 59884 invoked from network); 23 Feb 2006 20:01:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 23 Feb 2006 20:01:20 -0000 Received: (qmail 61328 invoked by uid 500); 23 Feb 2006 19:56:42 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 60785 invoked by uid 500); 23 Feb 2006 19:56:38 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 60233 invoked by uid 500); 23 Feb 2006 19:56:33 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 60015 invoked by uid 99); 23 Feb 2006 19:56:32 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2006 11:56:32 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 23 Feb 2006 11:56:16 -0800 Received: (qmail 57377 invoked by uid 65534); 23 Feb 2006 19:55:55 -0000 Message-ID: <20060223195555.57376.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r380209 [7/12] - in /tomcat/container/tc5.5.x/modules: groupcom/ groupcom/etc/ groupcom/src/ groupcom/src/share/ groupcom/src/share/org/ groupcom/src/share/org/apache/ groupcom/src/share/org/apache/catalina/ groupcom/src/share/org/apache/ca... Date: Thu, 23 Feb 2006 19:55:25 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/deploy/WarWatcher.java Thu Feb 23 11:55:14 2006 @@ -0,0 +1,238 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.cluster.deploy; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Iterator; + +/** + *

+ * The WarWatcher watches the deployDir for changes made to the + * directory (adding new WAR files->deploy or remove WAR files->undeploy) And + * notifies a listener of the changes made + *

+ * + * @author Filip Hanik + * @author Peter Rossbach + * @version 1.1 + */ + +public class WarWatcher { + + /*--Static Variables----------------------------------------*/ + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(WarWatcher.class); + + /*--Instance Variables--------------------------------------*/ + /** + * Directory to watch for war files + */ + protected File watchDir = null; + + /** + * Parent to be notified of changes + */ + protected FileChangeListener listener = null; + + /** + * Currently deployed files + */ + protected Map currentStatus = new HashMap(); + + /*--Constructor---------------------------------------------*/ + + public WarWatcher() { + } + + public WarWatcher(FileChangeListener listener, File watchDir) { + this.listener = listener; + this.watchDir = watchDir; + } + + /*--Logic---------------------------------------------------*/ + + /** + * check for modification and send notifcation to listener + */ + public void check() { + if (log.isInfoEnabled()) + log.info("check cluster wars at " + watchDir); + File[] list = watchDir.listFiles(new WarFilter()); + if (list == null) + list = new File[0]; + //first make sure all the files are listed in our current status + for (int i = 0; i < list.length; i++) { + addWarInfo(list[i]); + }//for + + //check all the status codes and update the FarmDeployer + for (Iterator i = currentStatus.entrySet().iterator(); i.hasNext();) { + Map.Entry entry = (Map.Entry) i.next(); + WarInfo info = (WarInfo) entry.getValue(); + int check = info.check(); + if (check == 1) { + listener.fileModified(info.getWar()); + } else if (check == -1) { + listener.fileRemoved(info.getWar()); + //no need to keep in memory + currentStatus.remove(info.getWar()); + }//end if + }//for + + } + + /** + * add cluster war to the watcher state + * @param warfile + */ + protected void addWarInfo(File warfile) { + WarInfo info = (WarInfo) currentStatus.get(warfile.getAbsolutePath()); + if (info == null) { + info = new WarInfo(warfile); + info.setLastState(-1); //assume file is non existent + currentStatus.put(warfile.getAbsolutePath(), info); + } + } + + /** + * clear watcher state + */ + public void clear() { + currentStatus.clear(); + } + + /** + * @return Returns the watchDir. + */ + public File getWatchDir() { + return watchDir; + } + + /** + * @param watchDir + * The watchDir to set. + */ + public void setWatchDir(File watchDir) { + this.watchDir = watchDir; + } + + /** + * @return Returns the listener. + */ + public FileChangeListener getListener() { + return listener; + } + + /** + * @param listener + * The listener to set. + */ + public void setListener(FileChangeListener listener) { + this.listener = listener; + } + + /*--Inner classes-------------------------------------------*/ + + /** + * File name filter for war files + */ + protected class WarFilter implements java.io.FilenameFilter { + public boolean accept(File path, String name) { + if (name == null) + return false; + return name.endsWith(".war"); + } + } + + /** + * File information on existing WAR files + */ + protected class WarInfo { + protected File war = null; + + protected long lastChecked = 0; + + protected long lastState = 0; + + public WarInfo(File war) { + this.war = war; + this.lastChecked = war.lastModified(); + if (!war.exists()) + lastState = -1; + } + + public boolean modified() { + return war.exists() && war.lastModified() > lastChecked; + } + + public boolean exists() { + return war.exists(); + } + + /** + * Returns 1 if the file has been added/modified, 0 if the file is + * unchanged and -1 if the file has been removed + * + * @return int 1=file added; 0=unchanged; -1=file removed + */ + public int check() { + //file unchanged by default + int result = 0; + + if (modified()) { + //file has changed - timestamp + result = 1; + lastState = result; + } else if ((!exists()) && (!(lastState == -1))) { + //file was removed + result = -1; + lastState = result; + } else if ((lastState == -1) && exists()) { + //file was added + result = 1; + lastState = result; + } + this.lastChecked = System.currentTimeMillis(); + return result; + } + + public File getWar() { + return war; + } + + public int hashCode() { + return war.getAbsolutePath().hashCode(); + } + + public boolean equals(Object other) { + if (other instanceof WarInfo) { + WarInfo wo = (WarInfo) other; + return wo.getWar().equals(getWar()); + } else { + return false; + } + } + + protected void setLastState(int lastState) { + this.lastState = lastState; + } + + } + +} \ No newline at end of file Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml Thu Feb 23 11:55:14 2006 @@ -0,0 +1,94 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/package.html URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/package.html?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/package.html (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/package.html Thu Feb 23 11:55:14 2006 @@ -0,0 +1,11 @@ + + +

This package contains code for Clustering, the base class +of a Cluster is org.apache.catalina.Cluster implementations +of this class is done when implementing a new Cluster protocol

+ +

The only Cluster protocol currently implemented is a JavaGroups based
+    JGCluster.java +

+ + Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/ClusterSessionListener.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/ClusterSessionListener.java?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/ClusterSessionListener.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/ClusterSessionListener.java Thu Feb 23 11:55:14 2006 @@ -0,0 +1,106 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.cluster.session; + +import java.util.Map; + +import org.apache.catalina.cluster.ClusterManager; +import org.apache.catalina.cluster.ClusterMessage; +import org.apache.catalina.cluster.*; + +/** + * Receive replicated SessionMessage form other cluster node. + * @author Filip Hanik + * @author Peter Rossbach + * @version $Revision: 378258 $ $Date: 2006-02-16 08:42:35 -0600 (Thu, 16 Feb 2006) $ + */ +public class ClusterSessionListener extends ClusterListener { + + /** + * The descriptive information about this implementation. + */ + protected static final String info = "org.apache.catalina.session.ClusterSessionListener/1.1"; + + //--Constructor--------------------------------------------- + + public ClusterSessionListener() { + } + + //--Logic--------------------------------------------------- + + /** + * Return descriptive information about this implementation. + */ + public String getInfo() { + + return (info); + + } + + /** + * Callback from the cluster, when a message is received, The cluster will + * broadcast it invoking the messageReceived on the receiver. + * + * @param myobj + * ClusterMessage - the message received from the cluster + */ + public void messageReceived(ClusterMessage myobj) { + if (myobj != null && myobj instanceof SessionMessage) { + SessionMessage msg = (SessionMessage) myobj; + String ctxname = msg.getContextName(); + //check if the message is a EVT_GET_ALL_SESSIONS, + //if so, wait until we are fully started up + Map managers = cluster.getManagers() ; + if (ctxname == null) { + java.util.Iterator i = managers.keySet().iterator(); + while (i.hasNext()) { + String key = (String) i.next(); + ClusterManager mgr = (ClusterManager) managers.get(key); + if (mgr != null) + mgr.messageDataReceived(msg); + else { + //this happens a lot before the system has started + // up + if (log.isDebugEnabled()) + log.debug("Context manager doesn't exist:" + + key); + } + } + } else { + ClusterManager mgr = (ClusterManager) managers.get(ctxname); + if (mgr != null) + mgr.messageDataReceived(msg); + else if (log.isWarnEnabled()) + log.warn("Context manager doesn't exist:" + ctxname); + } + } + } + + /** + * Accept only SessionMessage + * + * @param msg + * ClusterMessage + * @return boolean - returns true to indicate that messageReceived should be + * invoked. If false is returned, the messageReceived method will + * not be invoked. + */ + public boolean accept(ClusterMessage msg) { + return (msg instanceof SessionMessage); + } +} + Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/Constants.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/Constants.java?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/Constants.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/Constants.java Thu Feb 23 11:55:14 2006 @@ -0,0 +1,31 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.catalina.cluster.session; + +/** + * Manifest constants for the org.apache.catalina.cluster.session + * package. + * + * @author Peter Rossbach Pero + */ + +public class Constants { + + public static final String Package = "org.apache.catalina.cluster.session"; + +} Added: tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/DeltaManager.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/DeltaManager.java?rev=380209&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/DeltaManager.java (added) +++ tomcat/container/tc5.5.x/modules/ha/src/share/org/apache/catalina/cluster/session/DeltaManager.java Thu Feb 23 11:55:14 2006 @@ -0,0 +1,1743 @@ +/* + * Copyright 1999,2004-2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.cluster.session; + +import java.beans.PropertyChangeEvent; +import java.beans.PropertyChangeListener; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; + +import org.apache.catalina.Cluster; +import org.apache.catalina.Container; +import org.apache.catalina.Context; +import org.apache.catalina.Engine; +import org.apache.catalina.Host; +import org.apache.catalina.Valve; +import org.apache.catalina.Lifecycle; +import org.apache.catalina.LifecycleException; +import org.apache.catalina.LifecycleListener; +import org.apache.catalina.Loader; +import org.apache.catalina.Session; +import org.apache.catalina.cluster.CatalinaCluster; +import org.apache.catalina.cluster.ClusterManager; +import org.apache.catalina.cluster.ClusterMessage; +import org.apache.catalina.cluster.Member; +import org.apache.catalina.session.ManagerBase; +import org.apache.catalina.cluster.tcp.ReplicationValve; +import org.apache.catalina.util.CustomObjectInputStream; +import org.apache.catalina.util.LifecycleSupport; +import org.apache.catalina.util.StringManager; +import org.apache.catalina.core.StandardContext; + +/** + * The DeltaManager manages replicated sessions by only replicating the deltas + * in data. For applications written to handle this, the DeltaManager is the + * optimal way of replicating data. + * + * This code is almost identical to StandardManager with a difference in how it + * persists sessions and some modifications to it. + * + * IMPLEMENTATION NOTE : Correct behavior of session storing and + * reloading depends upon external calls to the start() and + * stop() methods of this class at the correct times. + * + * @author Filip Hanik + * @author Craig R. McClanahan + * @author Jean-Francois Arcand + * @author Peter Rossbach + * @version $Revision: 380100 $ $Date: 2006-02-23 06:08:14 -0600 (Thu, 23 Feb 2006) $ + */ + +public class DeltaManager extends ManagerBase implements Lifecycle, + PropertyChangeListener, ClusterManager { + + // ---------------------------------------------------- Security Classes + + public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(DeltaManager.class); + + /** + * The string manager for this package. + */ + protected static StringManager sm = StringManager + .getManager(Constants.Package); + + // ----------------------------------------------------- Instance Variables + + /** + * The descriptive information about this implementation. + */ + private static final String info = "DeltaManager/2.1"; + + /** + * Has this component been started yet? + */ + private boolean started = false; + + /** + * The descriptive name of this Manager implementation (for logging). + */ + protected static String managerName = "DeltaManager"; + + protected String name = null; + + protected boolean defaultMode = false; + + private CatalinaCluster cluster = null; + + /** + * cached replication valve cluster container! + */ + private ReplicationValve replicationValve = null ; + + /** + * The lifecycle event support for this component. + */ + protected LifecycleSupport lifecycle = new LifecycleSupport(this); + + /** + * The maximum number of active Sessions allowed, or -1 for no limit. + */ + private int maxActiveSessions = -1; + + private boolean expireSessionsOnShutdown = false; + + private boolean notifyListenersOnReplication = true; + + private boolean notifySessionListenersOnReplication = true; + + private boolean stateTransfered = false ; + + private int stateTransferTimeout = 60; + + private boolean sendAllSessions = true; + + private boolean sendClusterDomainOnly = true ; + + private int sendAllSessionsSize = 1000 ; + + /** + * wait time between send session block (default 2 sec) + */ + private int sendAllSessionsWaitTime = 2 * 1000 ; + + private ArrayList receivedMessageQueue = new ArrayList() ; + + private boolean receiverQueue = false ; + + private boolean stateTimestampDrop = true ; + + private long stateTransferCreateSendTime; + + // ------------------------------------------------------------------ stats attributes + + int rejectedSessions = 0; + + private long sessionReplaceCounter = 0 ; + + long processingTime = 0; + + private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ; + + private long counterSend_EVT_ALL_SESSION_DATA = 0 ; + + private long counterReceive_EVT_ALL_SESSION_DATA = 0 ; + + private long counterReceive_EVT_SESSION_CREATED = 0 ; + + private long counterReceive_EVT_SESSION_EXPIRED = 0; + + private long counterReceive_EVT_SESSION_ACCESSED = 0 ; + + private long counterReceive_EVT_SESSION_DELTA = 0; + + private long counterSend_EVT_GET_ALL_SESSIONS = 0 ; + + private long counterSend_EVT_SESSION_CREATED = 0; + + private long counterSend_EVT_SESSION_DELTA = 0 ; + + private long counterSend_EVT_SESSION_ACCESSED = 0; + + private long counterSend_EVT_SESSION_EXPIRED = 0; + + private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ; + + private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ; + + private int counterNoStateTransfered = 0 ; + + + // ------------------------------------------------------------- Constructor + + public DeltaManager() { + super(); + } + + // ------------------------------------------------------------- Properties + + /** + * Return descriptive information about this Manager implementation and the + * corresponding version number, in the format + * <description>/<version>. + */ + public String getInfo() { + + return (info); + + } + + public void setName(String name) { + this.name = name; + } + + /** + * Return the descriptive short name of this Manager implementation. + */ + public String getName() { + + return (name); + + } + + /** + * @return Returns the counterSend_EVT_GET_ALL_SESSIONS. + */ + public long getCounterSend_EVT_GET_ALL_SESSIONS() { + return counterSend_EVT_GET_ALL_SESSIONS; + } + + /** + * @return Returns the counterSend_EVT_SESSION_ACCESSED. + */ + public long getCounterSend_EVT_SESSION_ACCESSED() { + return counterSend_EVT_SESSION_ACCESSED; + } + + /** + * @return Returns the counterSend_EVT_SESSION_CREATED. + */ + public long getCounterSend_EVT_SESSION_CREATED() { + return counterSend_EVT_SESSION_CREATED; + } + + /** + * @return Returns the counterSend_EVT_SESSION_DELTA. + */ + public long getCounterSend_EVT_SESSION_DELTA() { + return counterSend_EVT_SESSION_DELTA; + } + + /** + * @return Returns the counterSend_EVT_SESSION_EXPIRED. + */ + public long getCounterSend_EVT_SESSION_EXPIRED() { + return counterSend_EVT_SESSION_EXPIRED; + } + + /** + * @return Returns the counterSend_EVT_ALL_SESSION_DATA. + */ + public long getCounterSend_EVT_ALL_SESSION_DATA() { + return counterSend_EVT_ALL_SESSION_DATA; + } + + /** + * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE. + */ + public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() { + return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE; + } + + /** + * @return Returns the counterReceive_EVT_ALL_SESSION_DATA. + */ + public long getCounterReceive_EVT_ALL_SESSION_DATA() { + return counterReceive_EVT_ALL_SESSION_DATA; + } + + /** + * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS. + */ + public long getCounterReceive_EVT_GET_ALL_SESSIONS() { + return counterReceive_EVT_GET_ALL_SESSIONS; + } + + /** + * @return Returns the counterReceive_EVT_SESSION_ACCESSED. + */ + public long getCounterReceive_EVT_SESSION_ACCESSED() { + return counterReceive_EVT_SESSION_ACCESSED; + } + + /** + * @return Returns the counterReceive_EVT_SESSION_CREATED. + */ + public long getCounterReceive_EVT_SESSION_CREATED() { + return counterReceive_EVT_SESSION_CREATED; + } + + /** + * @return Returns the counterReceive_EVT_SESSION_DELTA. + */ + public long getCounterReceive_EVT_SESSION_DELTA() { + return counterReceive_EVT_SESSION_DELTA; + } + + /** + * @return Returns the counterReceive_EVT_SESSION_EXPIRED. + */ + public long getCounterReceive_EVT_SESSION_EXPIRED() { + return counterReceive_EVT_SESSION_EXPIRED; + } + + + /** + * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE. + */ + public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() { + return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE; + } + + /** + * @return Returns the processingTime. + */ + public long getProcessingTime() { + return processingTime; + } + + /** + * @return Returns the sessionReplaceCounter. + */ + public long getSessionReplaceCounter() { + return sessionReplaceCounter; + } + + /** + * Number of session creations that failed due to maxActiveSessions + * + * @return The count + */ + public int getRejectedSessions() { + return rejectedSessions; + } + + public void setRejectedSessions(int rejectedSessions) { + this.rejectedSessions = rejectedSessions; + } + + /** + * @return Returns the counterNoStateTransfered. + */ + public int getCounterNoStateTransfered() { + return counterNoStateTransfered; + } + + public int getReceivedQueueSize() { + return receivedMessageQueue.size() ; + } + + /** + * @return Returns the stateTransferTimeout. + */ + public int getStateTransferTimeout() { + return stateTransferTimeout; + } + /** + * @param timeoutAllSession The timeout + */ + public void setStateTransferTimeout(int timeoutAllSession) { + this.stateTransferTimeout = timeoutAllSession; + } + + /** + * is session state transfered complete? + * + */ + public boolean getStateTransfered() { + return stateTransfered; + } + + /** + * set that state ist complete transfered + * @param stateTransfered + */ + public void setStateTransfered(boolean stateTransfered) { + this.stateTransfered = stateTransfered; + } + + /** + * @return Returns the sendAllSessionsWaitTime in msec + */ + public int getSendAllSessionsWaitTime() { + return sendAllSessionsWaitTime; + } + + /** + * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec. + */ + public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) { + this.sendAllSessionsWaitTime = sendAllSessionsWaitTime; + } + + /** + * @return Returns the sendClusterDomainOnly. + */ + public boolean isSendClusterDomainOnly() { + return sendClusterDomainOnly; + } + + /** + * @param sendClusterDomainOnly The sendClusterDomainOnly to set. + */ + public void setSendClusterDomainOnly(boolean sendClusterDomainOnly) { + this.sendClusterDomainOnly = sendClusterDomainOnly; + } + + /** + * @return Returns the stateTimestampDrop. + */ + public boolean isStateTimestampDrop() { + return stateTimestampDrop; + } + + /** + * @param isTimestampDrop The new flag value + */ + public void setStateTimestampDrop(boolean isTimestampDrop) { + this.stateTimestampDrop = isTimestampDrop; + } + + /** + * Return the maximum number of active Sessions allowed, or -1 for no limit. + */ + public int getMaxActiveSessions() { + + return (this.maxActiveSessions); + + } + + /** + * Set the maximum number of actives Sessions allowed, or -1 for no limit. + * + * @param max + * The new maximum number of sessions + */ + public void setMaxActiveSessions(int max) { + + int oldMaxActiveSessions = this.maxActiveSessions; + this.maxActiveSessions = max; + support.firePropertyChange("maxActiveSessions", new Integer( + oldMaxActiveSessions), new Integer(this.maxActiveSessions)); + + } + + /** + * + * @return Returns the sendAllSessions. + */ + public boolean isSendAllSessions() { + return sendAllSessions; + } + + /** + * @param sendAllSessions The sendAllSessions to set. + */ + public void setSendAllSessions(boolean sendAllSessions) { + this.sendAllSessions = sendAllSessions; + } + + /** + * @return Returns the sendAllSessionsSize. + */ + public int getSendAllSessionsSize() { + return sendAllSessionsSize; + } + + /** + * @param sendAllSessionsSize The sendAllSessionsSize to set. + */ + public void setSendAllSessionsSize(int sendAllSessionsSize) { + this.sendAllSessionsSize = sendAllSessionsSize; + } + + /** + * @return Returns the notifySessionListenersOnReplication. + */ + public boolean isNotifySessionListenersOnReplication() { + return notifySessionListenersOnReplication; + } + + /** + * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set. + */ + public void setNotifySessionListenersOnReplication( + boolean notifyListenersCreateSessionOnReplication) { + this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication; + } + + + public boolean isExpireSessionsOnShutdown() { + return expireSessionsOnShutdown; + } + + public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { + this.expireSessionsOnShutdown = expireSessionsOnShutdown; + } + + public boolean isNotifyListenersOnReplication() { + return notifyListenersOnReplication; + } + + public void setNotifyListenersOnReplication( + boolean notifyListenersOnReplication) { + this.notifyListenersOnReplication = notifyListenersOnReplication; + } + + + /** + * @return Returns the defaultMode. + */ + public boolean isDefaultMode() { + return defaultMode; + } + /** + * @param defaultMode The defaultMode to set. + */ + public void setDefaultMode(boolean defaultMode) { + this.defaultMode = defaultMode; + } + + public CatalinaCluster getCluster() { + return cluster; + } + + public void setCluster(CatalinaCluster cluster) { + this.cluster = cluster; + } + + /** + * Set the Container with which this Manager has been associated. If it is a + * Context (the usual case), listen for changes to the session timeout + * property. + * + * @param container + * The associated Container + */ + public void setContainer(Container container) { + + // De-register from the old Container (if any) + if ((this.container != null) && (this.container instanceof Context)) + ((Context) this.container).removePropertyChangeListener(this); + + // Default processing provided by our superclass + super.setContainer(container); + + // Register with the new Container (if any) + if ((this.container != null) && (this.container instanceof Context)) { + setMaxInactiveInterval(((Context) this.container) + .getSessionTimeout() * 60); + ((Context) this.container).addPropertyChangeListener(this); + } + + } + + // --------------------------------------------------------- Public Methods + + /** + * Construct and return a new session object, based on the default settings + * specified by this Manager's properties. The session id will be assigned + * by this method, and available via the getId() method of the returned + * session. If a new session cannot be created for any reason, return + * null. + * + * @exception IllegalStateException + * if a new session cannot be instantiated for any reason + * + * Construct and return a new session object, based on the default settings + * specified by this Manager's properties. The session id will be assigned + * by this method, and available via the getId() method of the returned + * session. If a new session cannot be created for any reason, return + * null. + * + * @exception IllegalStateException + * if a new session cannot be instantiated for any reason + */ + public Session createSession(String sessionId) { + return createSession(sessionId, true); + } + + /** + * create new session with check maxActiveSessions and send session creation + * to other cluster nodes. + * + * @param distribute + * @return The session + */ + public Session createSession(String sessionId, boolean distribute) { + + if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) { + rejectedSessions++; + throw new IllegalStateException(sm + .getString("deltaManager.createSession.ise")); + } + + DeltaSession session = (DeltaSession) super.createSession(sessionId) ; + session.resetDeltaRequest(); + if (distribute) { + sendCreateSession(session.getId(), session); + } + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.createSession.newSession", + session.getId(), new Integer(sessions.size()))); + + return (session); + + } + + /** + * Send create session evt to all backup node + * @param sessionId + * @param session + */ + protected void sendCreateSession(String sessionId, DeltaSession session) { + if(cluster.getMembers().length > 0 ) { + SessionMessage msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_CREATED, null, sessionId, + sessionId + "-" + System.currentTimeMillis()); + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.sendMessage.newSession", + name, sessionId)); + counterSend_EVT_SESSION_CREATED++; + send(msg); + } + session.resetDeltaRequest(); + } + + /** + * Send messages to other backup member (domain or all) + * @param msg Session message + */ + protected void send(SessionMessage msg) { + if(cluster != null) { + if(isSendClusterDomainOnly()) + cluster.sendClusterDomain(msg); + else + cluster.send(msg); + } + } + + /** + * Create DeltaSession + * @see org.apache.catalina.Manager#createEmptySession() + */ + public Session createEmptySession() { + return getNewDeltaSession() ; + } + + /** + * Get new session class to be used in the doLoad() method. + */ + protected DeltaSession getNewDeltaSession() { + return new DeltaSession(this); + } + + /** + * Load Deltarequest from external node + * Load the Class at container classloader + * @see DeltaRequest#readExternal(java.io.ObjectInput) + * @param session + * @param data message data + * @return The request + * @throws ClassNotFoundException + * @throws IOException + */ + protected DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) + throws ClassNotFoundException, IOException { + ByteArrayInputStream fis = null; + ReplicationStream ois = null; + Loader loader = null; + ClassLoader classLoader = null; + //fix to be able to run the DeltaManager + //stand alone without a container. + //use the Threads context class loader + if (container != null) + loader = container.getLoader(); + if (loader != null) + classLoader = loader.getClassLoader(); + else + classLoader = Thread.currentThread().getContextClassLoader(); + //end fix + fis = new ByteArrayInputStream(data); + ois = new ReplicationStream(fis, classLoader); + session.getDeltaRequest().readExternal(ois); + ois.close(); + return session.getDeltaRequest(); + } + + /** + * serialize DeltaRequest + * @see DeltaRequest#writeExternal(java.io.ObjectOutput) + * + * @param deltaRequest + * @return serialized delta request + * @throws IOException + */ + protected byte[] unloadDeltaRequest(DeltaRequest deltaRequest) + throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + deltaRequest.writeExternal(oos); + oos.flush(); + oos.close(); + return bos.toByteArray(); + } + + /** + * Load sessions from other cluster node. + * FIXME replace currently sessions with same id without notifcation. + * FIXME SSO handling is not really correct with the session replacement! + * @exception ClassNotFoundException + * if a serialized class cannot be found during the reload + * @exception IOException + * if an input/output error occurs + */ + protected void deserializeSessions(byte[] data) throws ClassNotFoundException, + IOException { + + // Initialize our internal data structures + //sessions.clear(); //should not do this + // Open an input stream to the specified pathname, if any + ClassLoader originalLoader = Thread.currentThread() + .getContextClassLoader(); + ObjectInputStream ois = null; + // Load the previously unloaded active sessions + try { + ois = openDeserializeObjectStream(data); + Integer count = (Integer) ois.readObject(); + int n = count.intValue(); + for (int i = 0; i < n; i++) { + DeltaSession session = (DeltaSession) createEmptySession(); + session.readObjectData(ois); + session.setManager(this); + session.setValid(true); + session.setPrimarySession(false); + //in case the nodes in the cluster are out of + //time synch, this will make sure that we have the + //correct timestamp, isValid returns true, cause + // accessCount=1 + session.access(); + //make sure that the session gets ready to expire if + // needed + session.setAccessCount(0); + session.resetDeltaRequest(); + // FIXME How inform other session id cache like SingleSignOn + // increment sessionCounter to correct stats report + if (findSession(session.getIdInternal()) == null ) { + sessionCounter++; + } else { + sessionReplaceCounter++; + // FIXME better is to grap this sessions again ! + if (log.isWarnEnabled()) + log.warn(sm.getString( + "deltaManager.loading.existing.session", + session.getIdInternal())); + } + add(session); + } + } catch (ClassNotFoundException e) { + log.error(sm.getString("deltaManager.loading.cnfe", e), e); + throw e; + } catch (IOException e) { + log.error(sm.getString("deltaManager.loading.ioe", e), e); + throw e; + } finally { + // Close the input stream + try { + if (ois != null) + ois.close(); + } catch (IOException f) { + // ignored + } + ois = null; + if (originalLoader != null) + Thread.currentThread().setContextClassLoader(originalLoader); + } + + } + + /** + * Open Stream and use correct ClassLoader (Container) Switch + * ThreadClassLoader + * + * @param data + * @return The object input stream + * @throws IOException + */ + protected ObjectInputStream openDeserializeObjectStream(byte[] data) throws IOException { + ObjectInputStream ois = null; + ByteArrayInputStream fis = null; + try { + Loader loader = null; + ClassLoader classLoader = null; + fis = new ByteArrayInputStream(data); + BufferedInputStream bis = new BufferedInputStream(fis); + if (container != null) + loader = container.getLoader(); + if (loader != null) + classLoader = loader.getClassLoader(); + if (classLoader != null) { + if (log.isTraceEnabled()) + log.trace(sm.getString( + "deltaManager.loading.withContextClassLoader", + getName())); + ois = new CustomObjectInputStream(bis, classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } else { + if (log.isTraceEnabled()) + log.trace(sm.getString( + "deltaManager.loading.withoutClassLoader", + getName())); + ois = new ObjectInputStream(bis); + } + } catch (IOException e) { + log.error(sm.getString("deltaManager.loading.ioe", e), e); + if (ois != null) { + try { + ois.close(); + } catch (IOException f) { + ; + } + ois = null; + } + throw e; + } + return ois; + } + + /** + * Save any currently active sessions in the appropriate persistence + * mechanism, if any. If persistence is not supported, this method returns + * without doing anything. + * + * @exception IOException + * if an input/output error occurs + */ + protected byte[] serializeSessions(Session[] currentSessions) throws IOException { + + // Open an output stream to the specified pathname, if any + ByteArrayOutputStream fos = null; + ObjectOutputStream oos = null; + + try { + fos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(new BufferedOutputStream(fos)); + oos.writeObject(new Integer(currentSessions.length)); + for(int i=0 ; i < currentSessions.length;i++) { + ((DeltaSession)currentSessions[i]).writeObjectData(oos); + } + // Flush and close the output stream + oos.flush(); + } catch (IOException e) { + log.error(sm.getString("deltaManager.unloading.ioe", e), e); + throw e; + } finally { + if (oos != null) { + try { + oos.close(); + } catch (IOException f) { + ; + } + oos = null; + } + } + // send object data as byte[] + return fos.toByteArray(); + } + + // ------------------------------------------------------ Lifecycle Methods + + /** + * Add a lifecycle event listener to this component. + * + * @param listener + * The listener to add + */ + public void addLifecycleListener(LifecycleListener listener) { + + lifecycle.addLifecycleListener(listener); + + } + + /** + * Get the lifecycle listeners associated with this lifecycle. If this + * Lifecycle has no listeners registered, a zero-length array is returned. + */ + public LifecycleListener[] findLifecycleListeners() { + + return lifecycle.findLifecycleListeners(); + + } + + /** + * Remove a lifecycle event listener from this component. + * + * @param listener + * The listener to remove + */ + public void removeLifecycleListener(LifecycleListener listener) { + + lifecycle.removeLifecycleListener(listener); + + } + + /** + * Prepare for the beginning of active use of the public methods of this + * component. This method should be called after configure(), + * and before any of the public methods of the component are utilized. + * + * @exception LifecycleException + * if this component detects a fatal error that prevents this + * component from being used + */ + public void start() throws LifecycleException { + if (!initialized) + init(); + + // Validate and update our current component state + if (started) { + return; + } + started = true; + lifecycle.fireLifecycleEvent(START_EVENT, null); + + // Force initialization of the random number generator + generateSessionId(); + + // Load unloaded sessions, if any + try { + //the channel is already running + Cluster cluster = getCluster() ; + // stop remove cluster binding + if(cluster == null) { + Container context = getContainer() ; + if(context != null && context instanceof Context) { + Container host = context.getParent() ; + if(host != null && host instanceof Host) { + cluster = host.getCluster(); + if(cluster != null && cluster instanceof CatalinaCluster) { + setCluster((CatalinaCluster) cluster) ; + } else { + Container engine = host.getParent() ; + if(engine != null && engine instanceof Engine) { + cluster = engine.getCluster(); + if(cluster != null && cluster instanceof CatalinaCluster) { + setCluster((CatalinaCluster) cluster) ; + } + } else { + cluster = null ; + } + } + } + } + } + if (cluster == null) { + log.error(sm.getString("deltaManager.noCluster", getName())); + return; + } else { + if (log.isInfoEnabled()) { + String type = "unknown" ; + if( cluster.getContainer() instanceof Host){ + type = "Host" ; + } else if( cluster.getContainer() instanceof Engine){ + type = "Engine" ; + } + log.info(sm + .getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName())); + } + } + if (log.isInfoEnabled()) + log.info(sm + .getString("deltaManager.startClustering", getName())); + //to survice context reloads, as only a stop/start is called, not + // createManager + ((CatalinaCluster)cluster).addManager(getName(), this); + + getAllClusterSessions(); + + } catch (Throwable t) { + log.error(sm.getString("deltaManager.managerLoad"), t); + } + } + + /** + * get from first session master the backup from all clustered sessions + * @see #findSessionMasterMember() + */ + public synchronized void getAllClusterSessions() { + if (cluster != null && cluster.getMembers().length > 0) { + long beforeSendTime = System.currentTimeMillis(); + Member mbr = findSessionMasterMember(); + if(mbr == null) { // No domain member found + return; + } + SessionMessage msg = new SessionMessageImpl(this.getName(), + SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", + "GET-ALL-" + getName()); + msg.setResend(ClusterMessage.FLAG_FORBIDDEN); + // set reference time + msg.setTimestamp(beforeSendTime); + stateTransferCreateSendTime = beforeSendTime ; + // request session state + counterSend_EVT_GET_ALL_SESSIONS++; + stateTransfered = false ; + // FIXME This send call block the deploy thread, when sender waitForAck is enabled + try { + synchronized(receivedMessageQueue) { + receiverQueue = true ; + } + cluster.send(msg, mbr); + if (log.isWarnEnabled()) + log.warn(sm.getString("deltaManager.waitForSessionState", + getName(), mbr)); + // FIXME At sender ack mode this method check only the state transfer and resend is a problem! + waitForSendAllSessions(beforeSendTime); + } finally { + synchronized(receivedMessageQueue) { + for (Iterator iter = receivedMessageQueue.iterator(); iter + .hasNext();) { + SessionMessage smsg = (SessionMessage) iter.next(); + if (!stateTimestampDrop) { + messageReceived(smsg, + smsg.getAddress() != null ? (Member) smsg + .getAddress() : null); + } else { + if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS + && smsg.getTimestamp() >= stateTransferCreateSendTime) { + // FIXME handle EVT_GET_ALL_SESSIONS later + messageReceived( + smsg, + smsg.getAddress() != null ? (Member) smsg + .getAddress() + : null); + } else { + if (log.isWarnEnabled()) { + log.warn(sm.getString( + "deltaManager.dropMessage", + getName(), smsg + .getEventTypeString(), + new Date(stateTransferCreateSendTime), new Date( + smsg.getTimestamp()))); + } + } + } + } + receivedMessageQueue.clear(); + receiverQueue = false ; + } + } + } else { + if (log.isInfoEnabled()) + log.info(sm.getString("deltaManager.noMembers", getName())); + } + } + + /** + * Register cross context session at replication valve thread local + * @param session cross context session + */ + protected void registerSessionAtReplicationValve(DeltaSession session) { + if(replicationValve == null) { + if(container instanceof StandardContext + && ((StandardContext)container).getCrossContext()) { + Cluster cluster = getCluster() ; + if(cluster != null && cluster instanceof CatalinaCluster) { + Valve[] valves = ((CatalinaCluster)cluster).getValves(); + if(valves != null && valves.length > 0) { + for(int i=0; replicationValve == null && i < valves.length ; i++ ){ + if(valves[i] instanceof ReplicationValve) + replicationValve = (ReplicationValve)valves[i] ; + } + + if(replicationValve == null && log.isDebugEnabled()) { + log.debug("no ReplicationValve found for CrossContext Support"); + } + } + } + } + } + if(replicationValve != null) { + replicationValve.registerReplicationSession(session); + } + } + + /** + * Find the master of the session state + * @return master member of sessions + */ + protected Member findSessionMasterMember() { + Member mbr = null; + Member mbrs[] = cluster.getMembers(); + String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain(); + if(isSendClusterDomainOnly()) { + for (int i = 0; mbr == null && i < mbrs.length; i++) { + Member member = mbrs[i]; + if(localMemberDomain.equals(member.getDomain())) + mbr = member ; + } + } else { + if(mbrs.length != 0 ) + mbr = mbrs[0]; + } + if(mbr == null && log.isWarnEnabled()) + log.warn(sm.getString("deltaManager.noMasterMember", + getName(), localMemberDomain)); + if(mbr != null && log.isDebugEnabled()) + log.warn(sm.getString("deltaManager.foundMasterMember", + getName(), mbr)); + return mbr; + } + + /** + * Wait that cluster session state is transfer or timeout after 60 Sec + * With stateTransferTimeout == -1 wait that backup is transfered (forever mode) + */ + protected void waitForSendAllSessions(long beforeSendTime) { + long reqStart = System.currentTimeMillis(); + long reqNow = reqStart ; + boolean isTimeout = false; + if(getStateTransferTimeout() > 0) { + // wait that state is transfered with timeout check + do { + try { + Thread.sleep(100); + } catch (Exception sleep) { + } + reqNow = System.currentTimeMillis(); + isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout())); + } while ((!getStateTransfered()) && (!isTimeout)); + } else { + if(getStateTransferTimeout() == -1) { + // wait that state is transfered + do { + try { + Thread.sleep(100); + } catch (Exception sleep) { + } + } while ((!getStateTransfered())); + reqNow = System.currentTimeMillis(); + } + } + if (isTimeout || (!getStateTransfered())) { + counterNoStateTransfered++ ; + log.error(sm.getString("deltaManager.noSessionState", + getName(),new Date(beforeSendTime),new Long(reqNow - beforeSendTime))); + } else { + if (log.isInfoEnabled()) + log.info(sm.getString("deltaManager.sessionReceived", + getName(), new Date(beforeSendTime), new Long(reqNow - beforeSendTime))); + } + } + + /** + * Gracefully terminate the active use of the public methods of this + * component. This method should be the last one called on a given instance + * of this component. + * + * @exception LifecycleException + * if this component detects a fatal error that needs to be + * reported + */ + public void stop() throws LifecycleException { + + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.stopped", getName())); + + + // Validate and update our current component state + if (!started) + throw new LifecycleException(sm + .getString("deltaManager.notStarted")); + lifecycle.fireLifecycleEvent(STOP_EVENT, null); + started = false; + + // Expire all active sessions + if (log.isInfoEnabled()) + log.info(sm.getString("deltaManager.expireSessions", getName())); + Session sessions[] = findSessions(); + for (int i = 0; i < sessions.length; i++) { + DeltaSession session = (DeltaSession) sessions[i]; + if (!session.isValid()) + continue; + try { + session.expire(true, isExpireSessionsOnShutdown()); + } catch (Throwable ignore) { + ; + } + } + + // Require a new random number generator if we are restarted + this.random = null; + getCluster().removeManager(getName(),this); + replicationValve = null; + if (initialized) { + destroy(); + } + } + + // ----------------------------------------- PropertyChangeListener Methods + + /** + * Process property change events from our associated Context. + * + * @param event + * The property change event that has occurred + */ + public void propertyChange(PropertyChangeEvent event) { + + // Validate the source of this event + if (!(event.getSource() instanceof Context)) + return; + // Process a relevant property change + if (event.getPropertyName().equals("sessionTimeout")) { + try { + setMaxInactiveInterval(((Integer) event.getNewValue()) + .intValue() * 60); + } catch (NumberFormatException e) { + log.error(sm.getString("deltaManager.sessionTimeout", event + .getNewValue())); + } + } + + } + + // -------------------------------------------------------- Replication + // Methods + + /** + * A message was received from another node, this is the callback method to + * implement if you are interested in receiving replication messages. + * + * @param cmsg - + * the message received. + */ + public void messageDataReceived(ClusterMessage cmsg) { + if (cmsg != null && cmsg instanceof SessionMessage) { + SessionMessage msg = (SessionMessage) cmsg; + switch (msg.getEventType()) { + case SessionMessage.EVT_GET_ALL_SESSIONS: + case SessionMessage.EVT_SESSION_CREATED: + case SessionMessage.EVT_SESSION_EXPIRED: + case SessionMessage.EVT_SESSION_ACCESSED: + case SessionMessage.EVT_SESSION_DELTA: { + synchronized(receivedMessageQueue) { + if(receiverQueue) { + receivedMessageQueue.add(msg); + return ; + } + } + break; + } + default: { + //we didn't queue, do nothing + break; + } + } //switch + + messageReceived(msg, msg.getAddress() != null ? (Member) msg + .getAddress() : null); + } + } + + /** + * When the request has been completed, the replication valve will notify + * the manager, and the manager will decide whether any replication is + * needed or not. If there is a need for replication, the manager will + * create a session message and that will be replicated. The cluster + * determines where it gets sent. + * + * @param sessionId - + * the sessionId that just completed. + * @return a SessionMessage to be sent, + */ + public ClusterMessage requestCompleted(String sessionId) { + try { + DeltaSession session = (DeltaSession) findSession(sessionId); + DeltaRequest deltaRequest = session.getDeltaRequest(); + SessionMessage msg = null; + boolean isDeltaRequest = false ; + synchronized(deltaRequest) { + isDeltaRequest = deltaRequest.getSize() > 0 ; + if (isDeltaRequest) { + counterSend_EVT_SESSION_DELTA++; + byte[] data = unloadDeltaRequest(deltaRequest); + msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_DELTA, data, sessionId, + sessionId + "-" + System.currentTimeMillis()); + session.resetDeltaRequest(); + } + } + if(!isDeltaRequest) { + if(!session.isPrimarySession()) { + counterSend_EVT_SESSION_ACCESSED++; + msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, + sessionId + "-" + System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(sm.getString( + "deltaManager.createMessage.accessChangePrimary", + getName(), sessionId)); + } + } + } else { // log only outside synch block! + if (log.isDebugEnabled()) { + log.debug(sm.getString( + "deltaManager.createMessage.delta", + getName(), sessionId)); + } + } + session.setPrimarySession(true); + //check to see if we need to send out an access message + if ((msg == null)) { + long replDelta = System.currentTimeMillis() + - session.getLastTimeReplicated(); + if (replDelta > (getMaxInactiveInterval() * 1000)) { + counterSend_EVT_SESSION_ACCESSED++; + msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_ACCESSED, null, + sessionId, sessionId + "-" + System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(sm.getString( + "deltaManager.createMessage.access", getName(), + sessionId)); + } + } + + } + + //update last replicated time + if (msg != null) + session.setLastTimeReplicated(System.currentTimeMillis()); + return msg; + } catch (IOException x) { + log.error(sm.getString( + "deltaManager.createMessage.unableCreateDeltaRequest", + sessionId), x); + return null; + } + + } + /** + * Reset manager statistics + */ + public synchronized void resetStatistics() { + processingTime = 0 ; + expiredSessions = 0 ; + rejectedSessions = 0 ; + sessionReplaceCounter = 0 ; + counterNoStateTransfered = 0 ; + maxActive = getActiveSessions() ; + sessionCounter = getActiveSessions() ; + counterReceive_EVT_ALL_SESSION_DATA = 0; + counterReceive_EVT_GET_ALL_SESSIONS = 0; + counterReceive_EVT_SESSION_ACCESSED = 0 ; + counterReceive_EVT_SESSION_CREATED = 0 ; + counterReceive_EVT_SESSION_DELTA = 0 ; + counterReceive_EVT_SESSION_EXPIRED = 0 ; + counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0; + counterSend_EVT_ALL_SESSION_DATA = 0; + counterSend_EVT_GET_ALL_SESSIONS = 0; + counterSend_EVT_SESSION_ACCESSED = 0 ; + counterSend_EVT_SESSION_CREATED = 0 ; + counterSend_EVT_SESSION_DELTA = 0 ; + counterSend_EVT_SESSION_EXPIRED = 0 ; + counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0; + + } + + // -------------------------------------------------------- persistence handler + + public void load() { + + } + + public void unload() { + + } + + // -------------------------------------------------------- expire + + /** + * send session expired to other cluster nodes + * + * @param id + * session id + */ + protected void sessionExpired(String id) { + counterSend_EVT_SESSION_EXPIRED++ ; + SessionMessage msg = new SessionMessageImpl(getName(), + SessionMessage.EVT_SESSION_EXPIRED, null, id, id + + "-EXPIRED-MSG"); + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.createMessage.expire", + getName(), id)); + send(msg); + } + + /** + * Exipre all find sessions. + */ + public void expireAllLocalSessions() + { + long timeNow = System.currentTimeMillis(); + Session sessions[] = findSessions(); + int expireDirect = 0 ; + int expireIndirect = 0 ; + + if(log.isDebugEnabled()) + log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); + for (int i = 0; i < sessions.length; i++) { + if (sessions[i] instanceof DeltaSession) { + DeltaSession session = (DeltaSession) sessions[i]; + if (session.isPrimarySession()) { + if (session.isValid()) { + session.expire(); + expireDirect++; + } else { + expireIndirect++; + } + } + } + } + long timeEnd = System.currentTimeMillis(); + if(log.isDebugEnabled()) + log.debug("End expire sessions " + getName() + " exipre processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); + + } + + /** + * When the manager expires session not tied to a request. The cluster will + * periodically ask for a list of sessions that should expire and that + * should be sent across the wire. + * + * @return The invalidated sessions array + */ + public String[] getInvalidatedSessions() { + return new String[0]; + } + + // -------------------------------------------------------- message receive + + /** + * Test that sender and local domain is the same + */ + protected boolean checkSenderDomain(SessionMessage msg,Member sender) { + String localMemberDomain = cluster.getMembershipService().getLocalMember().getDomain(); + boolean sameDomain= localMemberDomain.equals(sender.getDomain()); + if (!sameDomain && log.isWarnEnabled()) { + log.warn(sm.getString("deltaManager.receiveMessage.fromWrongDomain", + new Object[] {getName(), + msg.getEventTypeString(), + sender, + sender.getDomain(), + localMemberDomain } + )); + } + return sameDomain ; + } + + /** + * This method is called by the received thread when a SessionMessage has + * been received from one of the other nodes in the cluster. + * + * @param msg - + * the message received + * @param sender - + * the sender of the message, this is used if we receive a + * EVT_GET_ALL_SESSION message, so that we only reply to the + * requesting node + */ + protected void messageReceived(SessionMessage msg, Member sender) { + if(isSendClusterDomainOnly() && !checkSenderDomain(msg,sender)) { + return; + } + try { + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.receiveMessage.eventType", + getName(), msg.getEventTypeString(), sender)); + + switch (msg.getEventType()) { + case SessionMessage.EVT_GET_ALL_SESSIONS: { + handleGET_ALL_SESSIONS(msg,sender); + break; + } + case SessionMessage.EVT_ALL_SESSION_DATA: { + handleALL_SESSION_DATA(msg,sender); + break; + } + case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: { + handleALL_SESSION_TRANSFERCOMPLETE(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_CREATED: { + handleSESSION_CREATED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_EXPIRED: { + handleSESSION_EXPIRED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_ACCESSED: { + handleSESSION_ACCESSED(msg,sender); + break; + } + case SessionMessage.EVT_SESSION_DELTA: { + handleSESSION_DELTA(msg,sender); + break; + } + default: { + //we didn't recognize the message type, do nothing + break; + } + } //switch + } catch (Exception x) { + log.error(sm.getString("deltaManager.receiveMessage.error", + getName()), x); + } + } + + // -------------------------------------------------------- message receiver handler + + + /** + * handle receive session state is complete transfered + * @param msg + * @param sender + */ + protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) { + counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++ ; + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.transfercomplete", + getName(), sender.getHost(), new Integer(sender.getPort()))); + stateTransferCreateSendTime = msg.getTimestamp() ; + stateTransfered = true ; + } + + /** + * handle receive session delta + * @param msg + * @param sender + * @throws IOException + * @throws ClassNotFoundException + */ + protected void handleSESSION_DELTA(SessionMessage msg, Member sender) + throws IOException, ClassNotFoundException { + counterReceive_EVT_SESSION_DELTA++; + byte[] delta = msg.getSession(); + DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); + if (session != null) { + if (log.isDebugEnabled()) + log.debug(sm.getString("deltaManager.receiveMessage.delta", + getName(), msg.getSessionID())); + DeltaRequest dreq = loadDeltaRequest(session, delta); + dreq.execute(session, notifyListenersOnReplication); + session.setPrimarySession(false); + } + } + + /** + * handle receive session is access at other node ( primary session is now false) + * @param msg + * @param sender + * @throws IOException + */ + protected void handleSESSION_ACCESSED(SessionMessage msg,Member sender) throws IOException { + counterReceive_EVT_SESSION_ACCESSED++; + DeltaSession session = (DeltaSession) findSession(msg + .getSessionID()); + if (session != null) { + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.accessed", + getName(), msg.getSessionID())); + session.access(); + session.setPrimarySession(false); + session.endAccess(); + } + } + + /** + * handle receive session is expire at other node ( expire session also here) + * @param msg + * @param sender + * @throws IOException + */ + protected void handleSESSION_EXPIRED(SessionMessage msg,Member sender) throws IOException { + counterReceive_EVT_SESSION_EXPIRED++; + DeltaSession session = (DeltaSession) findSession(msg + .getSessionID()); + if (session != null) { + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.expired", + getName(), msg.getSessionID())); + session.expire(notifySessionListenersOnReplication, false); + } + } + + /** + * handle receive new session is created at other node (create backup - primary false) + * @param msg + * @param sender + */ + protected void handleSESSION_CREATED(SessionMessage msg,Member sender) { + counterReceive_EVT_SESSION_CREATED++; + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.createNewSession", + getName(), msg.getSessionID())); + DeltaSession session = (DeltaSession) createEmptySession(); + session.setManager(this); + session.setValid(true); + session.setPrimarySession(false); + session.access(); + if(notifySessionListenersOnReplication) + session.setId(msg.getSessionID()); + else + session.setIdInternal(msg.getSessionID()); + session.resetDeltaRequest(); + session.endAccess(); + + } + + /** + * handle receive sessions from other not ( restart ) + * @param msg + * @param sender + * @throws ClassNotFoundException + * @throws IOException + */ + protected void handleALL_SESSION_DATA(SessionMessage msg,Member sender) throws ClassNotFoundException, IOException { + counterReceive_EVT_ALL_SESSION_DATA++; + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.allSessionDataBegin", + getName())); + byte[] data = msg.getSession(); + deserializeSessions(data); + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.allSessionDataAfter", + getName())); + //stateTransferred = true; + } + + /** + * handle receive that other node want all sessions ( restart ) + * a) send all sessions with one message + * b) send session at blocks + * After sending send state is complete transfered + * @param msg + * @param sender + * @throws IOException + */ + protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) + throws IOException { + counterReceive_EVT_GET_ALL_SESSIONS++; + //get a list of all the session from this manager + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.unloadingBegin", getName())); + // Write the number of active sessions, followed by the details + // get all sessions and serialize without sync + Session[] currentSessions = findSessions(); + long findSessionTimestamp = System.currentTimeMillis() ; + if (isSendAllSessions()) { + sendSessions(sender, currentSessions, findSessionTimestamp); + } else { + // send session at blocks + int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length + : getSendAllSessionsSize(); + Session[] sendSessions = new Session[len]; + for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { + len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length + - i + : getSendAllSessionsSize(); + System.arraycopy(currentSessions, i, sendSessions, 0, len); + sendSessions(sender, sendSessions,findSessionTimestamp); + if (getSendAllSessionsWaitTime() > 0) { + try { + Thread.sleep(getSendAllSessionsWaitTime()); + } catch (Exception sleep) { + } + } + } + } + + SessionMessage newmsg = new SessionMessageImpl(name, + SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null, + "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED" + + getName()); + newmsg.setTimestamp(findSessionTimestamp); + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.createMessage.allSessionTransfered", + getName())); + counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++; + cluster.send(newmsg, sender); + } + + + /** + * send a block of session to sender + * @param sender + * @param currentSessions + * @param sendTimestamp + * @throws IOException + */ + protected void sendSessions(Member sender, Session[] currentSessions, + long sendTimestamp) throws IOException { + byte[] data = serializeSessions(currentSessions); + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.receiveMessage.unloadingAfter", + getName())); + SessionMessage newmsg = new SessionMessageImpl(name, + SessionMessage.EVT_ALL_SESSION_DATA, data, + "SESSION-STATE", "SESSION-STATE-" + getName()); + newmsg.setTimestamp(sendTimestamp); + //if(isSendSESSIONSTATEcompressed()) { + // newmsg.setCompress(ClusterMessage.FLAG_ALLOWED); + //} + if (log.isDebugEnabled()) + log.debug(sm.getString( + "deltaManager.createMessage.allSessionData", + getName())); + counterSend_EVT_ALL_SESSION_DATA++; + cluster.send(newmsg, sender); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org