Adds a threadsafe latch to the DeltaManager which is used to block processing of cluster messages until local applications have completed initialization.

Includes changes to the DeltaManager to create the latch based on configuration, changes to Catalina to automatically open the latch when initialization is complete, a constant used to as the attribute key in the ServletContext and an modification to the mbean-descriptor for the new DeltaManager attribute.

Also includes a handful of style cleanups interspersed (spelling, removing compiler warnings about type checking, removing spaces before semi colons and blank lines before and after braces) throughout.

  - Jason

"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."


Index: catalina/Globals.java
===================================================================
--- catalina/Globals.java    (revision 719433)
+++ catalina/Globals.java    (working copy)
@@ -36,6 +36,14 @@
         "org.apache.catalina.deploy.alt_dd";
 
     /**
+     * The servlet context attribute under which we store the concurrent latch
+     * used to block processing cluster messages until after local application
+     * initialization
+     */
+    public static final String CLUSTER_DELAY =
+        "org.apache.catalina.ha.delay";
+
+    /**
      * The request attribute under which we store the array of X509Certificate
      * objects representing the certificate chain presented by our client,
      * if any.
Index: catalina/ha/session/DeltaManager.java
===================================================================
--- catalina/ha/session/DeltaManager.java    (revision 719433)
+++ catalina/ha/session/DeltaManager.java    (working copy)
@@ -26,11 +26,15 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
 
 import org.apache.catalina.Cluster;
 import org.apache.catalina.Container;
 import org.apache.catalina.Context;
 import org.apache.catalina.Engine;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Host;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.LifecycleListener;
@@ -38,13 +42,13 @@
 import org.apache.catalina.Valve;
 import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
 import org.apache.catalina.ha.ClusterMessage;
 import org.apache.catalina.ha.tcp.ReplicationValve;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.ReplicationStream;
 import org.apache.catalina.util.LifecycleSupport;
 import org.apache.catalina.util.StringManager;
-import org.apache.catalina.ha.ClusterManager;
 
 /**
  * The DeltaManager manages replicated sessions by only replicating the deltas
@@ -62,10 +66,11 @@
  * @author Craig R. McClanahan
  * @author Jean-Francois Arcand
  * @author Peter Rossbach
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */
 
-public class DeltaManager extends ClusterManagerBase{
+public class DeltaManager extends ClusterManagerBase {
 
     // ---------------------------------------------------- Security Classes
     public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
@@ -106,6 +111,18 @@
     protected LifecycleSupport lifecycle = new LifecycleSupport(this);
 
     /**
+     * Flag indicating that messageReceived(ClusterMessage) should block until
+     * all local applications have completed initialization
+     */
+    protected boolean delay = false;
+   
+    /**
+     * Barrier used to receive notification from other threads that it is okay
+     * to process incoming messages from the cluster
+     */
+    private CountDownLatch gate = null;
+   
+    /**
      * The maximum number of active Sessions allowed, or -1 for no limit.
      */
     private int maxActiveSessions = -1;
@@ -121,8 +138,8 @@
     /**
      * wait time between send session block (default 2 sec)
      */
-    private int sendAllSessionsWaitTime = 2 * 1000 ;
-    private ArrayList receivedMessageQueue = new ArrayList() ;
+    private int sendAllSessionsWaitTime = 2 * 1000;
+    private ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<SessionMessage>() ;
     private boolean receiverQueue = false ;
     private boolean stateTimestampDrop = true ;
     private long stateTransferCreateSendTime;
@@ -175,6 +192,27 @@
     public String getName() {
         return name;
     }
+   
+    /**
+     * Set the member that indicates processing messages should wait for local
+     * initialization of applications to complete
+     *
+     * @param delayed If true, processing messageReceived(ClusterMessage) will
+     * block until local web applications have completed initialization.
+     */
+    public void setDelay ( boolean delay ) {
+        this.delay = delay;
+    }
+   
+    /**
+     * Gets the member the delayed flag
+     * @return delayed - boolean flag indicating that
+     * messageReceived(ClusterMessage) should block until local initialization
+     * of applications has completed
+     */
+    public boolean getDelay () {
+        return delay;
+    }
 
     /**
      * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
@@ -781,6 +819,24 @@
         lifecycle.removeLifecycleListener(listener);
     }
 
+    @Override
+    /**
+     * If this.delay is true, create a gate that will be opened when
+     * local application initialization is complete 
+     */
+    public void init () {
+        if (delay && container != null) {
+            if (container instanceof StandardContext) {
+                ServletContext servletContext = ((StandardContext) container).getServletContext();
+                if (servletContext != null) {
+                    gate = new CountDownLatch(1);
+                    servletContext.setAttribute(Globals.CLUSTER_DELAY, gate);
+                }
+            }
+        }
+        super.init();
+    }
+
     /**
      * Prepare for the beginning of active use of the public methods of this
      * component. This method should be called after <code>configure()</code>,
@@ -885,8 +941,8 @@
                 waitForSendAllSessions(beforeSendTime);
             } finally {
                 synchronized(receivedMessageQueue) {
-                    for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
-                        SessionMessage smsg = (SessionMessage) iter.next();
+                    for (Iterator<SessionMessage> iter = receivedMessageQueue.iterator(); iter.hasNext();) {
+                        SessionMessage smsg = iter.next();
                         if (!stateTimestampDrop) {
                             messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
                         } else {
@@ -1068,6 +1124,18 @@
      *            the message received.
      */
     public void messageDataReceived(ClusterMessage cmsg) {
+        // Block processing until local application initialization has
+        // completed, if a gate has been erected
+        if(gate != null) {
+            try {
+                gate.await();
+                gate = null;
+            }
+            catch(InterruptedException e) {
+                log.error(e, e);
+            }
+        }
+
         if (cmsg != null && cmsg instanceof SessionMessage) {
             SessionMessage msg = (SessionMessage) cmsg;
             switch (msg.getEventType()) {
@@ -1535,7 +1603,8 @@
         result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
         result.receiverQueue = receiverQueue ;
         result.stateTimestampDrop = stateTimestampDrop ;
-        result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.stateTransferCreateSendTime = stateTransferCreateSendTime;
+        result.delay = delay;
         return result;
     }
 }
Index: catalina/ha/session/mbeans-descriptors.xml
===================================================================
--- catalina/ha/session/mbeans-descriptors.xml    (revision 719433)
+++ catalina/ha/session/mbeans-descriptors.xml    (working copy)
@@ -268,7 +268,7 @@
     <attribute
       name="expireSessionsOnShutdown"
       is="true"
-      description="exipre all sessions cluster wide as one node goes down"
+      description="expire all sessions cluster wide as one node goes down"
       type="boolean"/>
     <attribute
       name="notifyListenersOnReplication"
@@ -293,6 +293,10 @@
       name="sendAllSessionsWaitTime"
       description="wait time between send session block (default 2 sec)"
       type="int"/>
+    <attribute
+      name="delay"
+      description="wait until local applications have initialized before processing cluster messages"
+      type="boolean"/>
     <operation
       name="listSessionIds"
       description="Return the list of active session ids"
Index: catalina/startup/Catalina.java
===================================================================
--- catalina/startup/Catalina.java    (revision 719433)
+++ catalina/startup/Catalina.java    (working copy)
@@ -28,11 +28,18 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
 
 import org.apache.catalina.Container;
+import org.apache.catalina.Globals;
 import org.apache.catalina.Lifecycle;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.Server;
+import org.apache.catalina.Service;
+import org.apache.catalina.core.ContainerBase;
+import org.apache.catalina.core.StandardContext;
 import org.apache.catalina.core.StandardServer;
 import org.apache.tomcat.util.digester.Digester;
 import org.apache.tomcat.util.digester.Rule;
@@ -56,6 +63,7 @@
  *
  * @author Craig R. McClanahan
  * @author Remy Maucherat
+ * @author Jason Lunn
  * @version $Revision$ $Date$
  */
 
@@ -579,7 +587,18 @@
                 log.error("Catalina.start: ", e);
             }
         }
-
+       
+        // Open any gates stored in ServletContexts to allow the processing of
+        // cluster messages once local applications have been initialized
+        Service [] services = server.findServices();
+        if ( services != null ) {
+            for ( Service service : services ) {
+                if ( service != null ) {
+                    openContainerGates( service.getContainer() );
+                }
+            }
+        }
+       
         long t2 = System.nanoTime();
         if(log.isInfoEnabled())
             log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms");
@@ -601,7 +620,6 @@
             await();
             stop();
         }
-
     }
 
 
@@ -609,7 +627,6 @@
      * Stop an existing server instance.
      */
     public void stop() {
-
         try {
             // Remove the ShutdownHook first so that server.stop()
             // doesn't get invoked twice
@@ -629,7 +646,6 @@
                 log.error("Catalina.stop", e);
             }
         }
-
     }
 
 
@@ -637,9 +653,7 @@
      * Await and shutdown.
      */
     public void await() {
-
         server.await();
-
     }
 
 
@@ -647,15 +661,50 @@
      * Print usage information for this application.
      */
     protected void usage() {
-
         System.out.println
             ("usage: java org.apache.catalina.startup.Catalina"
              + " [ -config {pathname} ]"
              + " [ -nonaming ] { start | stop }");
+    }
 
+
+    /**
+     * Traverses the argument container and decrements the CountDownLatch
+     * found in the servlet context with attribute key Globals.CLUSTER_DELAY
+     * if found
+     * @param container Possibly null Container instance
+     */
+    protected void openContainerGates ( Container container ) {
+        if (container == null) {
+            return;
+        }
+
+        if ( container instanceof StandardContext ) {
+            StandardContext context =
+                (StandardContext)container;
+            ServletContext servletContext =
+                context.getServletContext();
+            if (servletContext != null) {
+                Object contextAttribute = servletContext
+                        .getAttribute(Globals.CLUSTER_DELAY);
+                if (contextAttribute != null &&
+                        contextAttribute instanceof CountDownLatch) {
+                    CountDownLatch gate =
+                        (CountDownLatch) contextAttribute;
+                    gate.countDown();
+                }
+            }
+        } else if ( container instanceof ContainerBase ) {
+            ContainerBase base = (ContainerBase)container;
+            Container [] containers = base.findChildren();
+            if ( containers != null ) {
+                for ( Container childContainer : containers ) {
+                    openContainerGates( childContainer );
+                }
+            }
+        }
     }
 
-
     // --------------------------------------- CatalinaShutdownHook Inner Class
 
     // XXX Should be moved to embedded !
@@ -709,6 +758,4 @@
         top.setParentClassLoader(parentClassLoader);
 
     }
-
-
 }



On Fri, Nov 21, 2008 at 10:42 PM, Peter Rossbach <pr@objektpark.de> wrote:
Hi Jason,

send us your implementation and let us review your stuff :-)

You can also register a ContextListener at DeltaManager.setContainer() to control your latch.
Are your sure that session sync message (GET ALL Session) is received before first request at second node
is processed?

I think your feature is an extension of the current reveivedQueue usage!

Regards
Peter



Am 20.11.2008 um 22:54 schrieb Jason:


This message is targeted at Filip Hanik, Craig R. McClanahan, Jean-Francois
Arcand, Peter Rossbach or anyone with a direct interest in the DeltaManager
implementation in Tomcat 6.

A vendor (who will remain nameless) whose product I support for a client
recently gave me an idea for a patch to DeltaManager to address what the
vendor claims is a Tomcat specific issue related to session replication. I'm
wondering if it would be of value to the community or if the "problem" it is
trying to remedy is an intentional "feature".

The primary issue is that, according to vendor engineering support, the
other application containers the vendor supports deploying their product on,
including WebSphere, WebLogic, et al, wait until after local applications
have been initialized before processing incoming messages from the cluster
that could include deserializing remote sessions and the objects therein. I
have not confirmed this by examining the other containers mind you, but am
pretty confident that this is an accurate statement in so far that vendor's
product works in those environments but does not work in a clustered tomcat
environment.

The reason it fails in tomcat is that some of the objects in the serialized
session make calls at construction time to the vendor's (archaic)
preferences API's static methods, which are not initialized properly until
the web application itself is started. The result is that the first node in
the cluster starts up fine, but the 2nd-Nth nodes die a horrible death
trying to deserialize remote sessions populated by the first node.

The workaround we've implemented locally is a simple one: we extend the
DeltaManager with a custom class. Therein, we create a latch
(java.util.concurrent.CountDownLatch, to be specific) and save it in the
ServletContext. The only overridden method is messageDataReceived(), which
uses the latch.await() method to block before calling the original
implementation of the parent messageDataReceived() method.

The vendor's application (or, more properly, the custom extensions we've
built on their platform) looks at the ServletContext for a latch after the
preferences have been initialized locally, and calls latch.countDown(),
allowing any blocked calls to messageDataReceived() to start executing as
normally.

Without breaking the current sequence of initializing the session
replication code before local applications that Tomcat developers may have
come to expect, it seems like there is a potential solution here that might
enable applications like the one I've got to support to choose to configure
the session replication to wait to process incoming messages until after the
application has started.

I think it would be pretty trivial for me to offer a patch to DeltaManager
that created a latch based on a configuration element. One could imagine an
automatic mechanism for toggling the latch by the container after the
application initialization, or deferring to the application to deactivate.
The question is, does anybody want such functionality besides me? The
corollary is, if being able to choose when session replication begins is a
desirable feature, is this the right tactic to implement it?

Sincerely,

 - Jason Lunn

"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org