tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r380209 [2/12] - in /tomcat/container/tc5.5.x/modules: groupcom/ groupcom/etc/ groupcom/src/ groupcom/src/share/ groupcom/src/share/org/ groupcom/src/share/org/apache/ groupcom/src/share/org/apache/catalina/ groupcom/src/share/org/apache/ca...
Date Thu, 23 Feb 2006 19:55:25 GMT
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,158 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.group;
+
+import org.apache.catalina.cluster.MembershipService;
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ChannelException;
+import org.apache.catalina.cluster.ClusterSender;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.ClusterChannel;
+import java.io.IOException;
+
+
+/**
+ * The channel coordinator object coordinates the membership service,
+ * the sender and the receiver.
+ * This is the last interceptor in the chain.
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+public class ChannelCoordinator extends ChannelInterceptorBase {
+    private ClusterReceiver clusterReceiver;
+    private ClusterSender clusterSender;
+    private MembershipService membershipService;
+
+    public ChannelCoordinator() {
+        
+    }
+    
+    public ChannelCoordinator(ClusterReceiver receiver,
+                              ClusterSender sender,
+                              MembershipService service) {
+        this();
+        this.setClusterReceiver(receiver);
+        this.setClusterSender(sender);
+        this.setMembershipService(service);
+    }
+    
+    /**
+     * Send a message to one or more members in the cluster
+     * @param destination Member[] - the destinations, null or zero length means all
+     * @param msg ClusterMessage - the message to send
+     * @param options int - sender options, see class documentation
+     * @return ClusterMessage[] - the replies from the members, if any.
+     */
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) throws IOException {
+        if ( destination == null ) destination = membershipService.getMembers();
+        for ( int i=0; i<destination.length; i++ ) {
+            clusterSender.sendMessage(msg,destination[i]);
+        }
+        return null;
+    }
+
+
+    /**
+     * Starts up the channel. This can be called multiple times for individual services to start
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will start all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is already started.
+     */
+    public void start(int svc) throws ChannelException {
+        try {
+            if ( (svc & ClusterChannel.MBR_RX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX);
+            if ( (svc & ClusterChannel.SND_RX_SEQ) == ClusterChannel.SND_RX_SEQ) clusterReceiver.start();
+            if ( (svc & ClusterChannel.SND_TX_SEQ) == ClusterChannel.SND_TX_SEQ) clusterSender.start();
+            if ( (svc & ClusterChannel.MBR_TX_SEQ) == ClusterChannel.MBR_TX_SEQ) membershipService.start(MembershipService.MBR_TX);
+        }catch ( ChannelException cx ) {
+            throw cx;
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
+    }
+
+    /**
+     * Shuts down the channel. This can be called multiple times for individual services to shutdown
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will shutdown all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is already started.
+     */
+    public void stop(int svc) throws ChannelException {
+        try {
+            if ( (svc & ClusterChannel.MBR_RX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.stop();
+            if ( (svc & ClusterChannel.SND_RX_SEQ) == ClusterChannel.SND_RX_SEQ) clusterReceiver.stop();
+            if ( (svc & ClusterChannel.SND_TX_SEQ) == ClusterChannel.SND_TX_SEQ) clusterSender.stop();
+            if ( (svc & ClusterChannel.MBR_TX_SEQ) == ClusterChannel.MBR_RX_SEQ) membershipService.stop();
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
+
+    }
+    
+    public void memberAdded(Member member){
+        if ( clusterSender!=null ) clusterSender.add(member);
+        super.memberAdded(member);
+    }
+    
+    public void memberDisappeared(Member member){
+        if ( clusterSender!=null ) clusterSender.remove(member);
+        super.memberDisappeared(member);
+    }
+
+
+    public ClusterReceiver getClusterReceiver() {
+        return clusterReceiver;
+    }
+
+    public ClusterSender getClusterSender() {
+        return clusterSender;
+    }
+
+    public MembershipService getMembershipService() {
+        return membershipService;
+    }
+
+    public void setClusterReceiver(ClusterReceiver clusterReceiver) {
+        if ( clusterReceiver != null ) {
+            this.clusterReceiver = clusterReceiver;
+            this.clusterReceiver.setMessageListener(this);
+        } else {
+            if  (this.clusterReceiver!=null ) this.clusterReceiver.setMessageListener(null);
+            this.clusterReceiver = null;
+        }
+    }
+
+    public void setClusterSender(ClusterSender clusterSender) {
+        this.clusterSender = clusterSender;
+    }
+
+    public void setMembershipService(MembershipService membershipService) {
+        this.membershipService = membershipService;
+        this.membershipService.setMembershipListener(this);
+    }
+   
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,87 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.group;
+
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.cluster.MembershipListener;
+import org.apache.catalina.cluster.MessageListener;
+import java.io.IOException;
+
+/**
+ * Abstract class for the interceptor base class.
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+
+public abstract class ChannelInterceptorBase implements MembershipListener, MessageListener {
+    
+    private ChannelInterceptorBase next;
+    private ChannelInterceptorBase previous;
+    
+    public ChannelInterceptorBase() {
+        
+    }
+    
+    protected final void setNext(ChannelInterceptorBase next) {
+        this.next = next;
+    }
+    
+    public final ChannelInterceptorBase getNext() {
+        return next;
+    }
+    
+    protected final void setPrevious(ChannelInterceptorBase previous) {
+        this.previous = previous;
+    }
+
+    public final ChannelInterceptorBase getPrevious() {
+        return previous;
+    }
+
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options) throws IOException {
+        return getNext().sendMessage(destination, msg,options);
+    }
+    
+    public void messageReceived(ClusterMessage msg) {
+        getPrevious().messageReceived(msg);
+    }
+
+    public boolean accept(ClusterMessage msg) {
+        return true;
+    }
+
+    
+    public void memberAdded(Member member) {
+        //notify upwards
+        if ( getPrevious()!=null ) getPrevious().memberAdded(member);
+    }
+    
+    public void memberDisappeared(Member member) {
+        //notify upwards
+        if ( getPrevious()!=null ) getPrevious().memberDisappeared(member);
+    }
+    
+    
+
+    
+    
+
+    
+    
+    
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/GroupChannel.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/group/GroupChannel.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,142 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.group;
+
+
+import org.apache.catalina.cluster.ChannelException;
+import org.apache.catalina.cluster.ClusterChannel;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.ClusterSender;
+import org.apache.catalina.cluster.MembershipService;
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.Member;
+
+
+/**
+ * The GroupChannel manages the replication channel. It coordinates
+ * message being sent and received with membership announcements.
+ * The channel has an chain of interceptors that can modify the message or perform other logic.
+ * It manages a complete cluster group, both membership and replication.
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+public class GroupChannel implements ClusterChannel {
+    private ChannelCoordinator coordinator = new ChannelCoordinator();
+    private ChannelInterceptorBase interceptors = null;
+
+    public GroupChannel() {
+    }
+    
+    
+    /**
+     * Adds an interceptor to the stack for message processing
+     * @param interceptor ChannelInterceptorBase
+     */
+    public void addInterceptor(ChannelInterceptorBase interceptor) { 
+        if ( interceptors == null ) {
+            this.interceptors = interceptor;
+            this.interceptors.setNext(coordinator);
+            coordinator.setPrevious(this.interceptors);
+        } else {
+            ChannelInterceptorBase last = interceptors;
+            while ( last.getNext() != coordinator ) {
+                last = last.getNext();
+            }
+            last.setNext(interceptor);
+            interceptor.setNext(coordinator);
+            interceptor.setPrevious(last);
+            coordinator.setPrevious(interceptor);
+        }
+    }
+    
+    
+    
+    /**
+     * Send a message to one or more members in the cluster
+     * @param destination Member[] - the destinations, null or zero length means all
+     * @param msg ClusterMessage - the message to send
+     * @param options int - sender options, see class documentation
+     * @return ClusterMessage[] - the replies from the members, if any.
+     */
+    public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) throws ChannelException {
+        if ( msg == null ) return null;
+        msg.setAddress(getMembershipService().getLocalMember());
+        msg.setCompress(msg.FLAG_ALLOWED);
+        msg.setTimestamp(System.currentTimeMillis());
+        msg.setResend(msg.FLAG_FORBIDDEN);
+        try {
+            if (interceptors != null)return interceptors.sendMessage(destination, msg, options);
+            else return this.coordinator.sendMessage(destination, msg, options);
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
+    }
+    
+    /**
+     * Starts up the channel. This can be called multiple times for individual services to start
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will start all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is already started.
+     */
+    public void start(int svc) throws ChannelException {
+        coordinator.start(svc);
+    }
+
+    /**
+     * Shuts down the channel. This can be called multiple times for individual services to shutdown
+     * The svc parameter can be the logical or value of any constants
+     * @param svc int value of <BR>
+     * DEFAULT - will shutdown all services <BR>
+     * MBR_RX_SEQ - starts the membership receiver <BR>
+     * MBR_TX_SEQ - starts the membership broadcaster <BR>
+     * SND_TX_SEQ - starts the replication transmitter<BR>
+     * SND_RX_SEQ - starts the replication receiver<BR>
+     * @throws ChannelException if a startup error occurs or the service is already started.
+     */
+    public void stop(int svc) throws ChannelException {
+        coordinator.stop(svc);
+    }
+
+    public ClusterReceiver getClusterReceiver() {
+        return coordinator.getClusterReceiver();
+    }
+
+    public ClusterSender getClusterSender() {
+        return coordinator.getClusterSender();
+    }
+
+    public MembershipService getMembershipService() {
+        return coordinator.getMembershipService();
+    }
+
+    public void setClusterReceiver(ClusterReceiver clusterReceiver) {
+        coordinator.setClusterReceiver(clusterReceiver);
+    }
+
+    public void setClusterSender(ClusterSender clusterSender) {
+        coordinator.setClusterSender(clusterSender);
+    }
+
+    public void setMembershipService(MembershipService membershipService) {
+        coordinator.setMembershipService(membershipService);
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ListenCallback.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ListenCallback.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ListenCallback.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ListenCallback.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.io;
+
+import org.apache.catalina.cluster.tcp.ClusterData ;
+
+/**
+ * The listen callback interface is used by the replication system
+ * when data has been received. The interface does not care about
+ * objects and marshalling and just passes the bytes straight through.
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 303987 $, $Date: 2005-07-08 15:50:30 -0500 (Fri, 08 Jul 2005) $
+ */
+public interface ListenCallback
+{
+    /**
+     * This method is invoked on the callback object to notify it that new data has
+     * been received from one of the cluster nodes.
+     * @param data - the message bytes received from the cluster/replication system
+     */
+     public void messageDataReceived(ClusterData data);
+     
+    /** receiver must be send ack
+      */
+     public boolean isSendAck() ;
+     
+     /** send ack
+      *
+      */
+     public void sendAck() throws java.io.IOException ;
+
+}
\ No newline at end of file

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ObjectReader.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ObjectReader.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ObjectReader.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ObjectReader.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.cluster.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+import org.apache.catalina.cluster.tcp.ClusterData;
+
+/**
+ * The object reader object is an object used in conjunction with
+ * java.nio TCP messages. This object stores the message bytes in a
+ * <code>XByteBuffer</code> until a full package has been received.
+ * When a full package has been received, the append method will call messageDataReceived
+ * on the callback object associated with this object reader.<BR>
+ * This object uses an XByteBuffer which is an extendable object buffer that also allows
+ * for message encoding and decoding.
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ */
+public class ObjectReader {
+
+    private SocketChannel channel;
+
+    private ListenCallback callback;
+
+    private XByteBuffer buffer;
+
+    /**
+     * Create XByteBuffer and store parameter
+     * @param channel
+     * @param selector
+     * @param callback
+     */
+    public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback) {
+        this.channel = channel;
+        this.callback = callback;
+        this.buffer = new XByteBuffer();
+    }
+
+    /**
+     * get the current SimpleTcpCluster
+     * @return Returns the callback.
+     */
+    public ListenCallback getCallback() {
+        return callback;
+    }
+
+    /**
+     * Get underlying NIO channel
+     * @return The socket
+     */
+    public SocketChannel getChannel() {
+        return this.channel;
+    }
+
+    /**
+     * Append new bytes to buffer. 
+     * @see XByteBuffer#countPackages()
+     * @param data new transfer buffer
+     * @param off offset
+     * @param len length in buffer
+     * @return number of messages that sended to callback
+     * @throws java.io.IOException
+     */
+     public int append(byte[] data,int off,int len) throws java.io.IOException {
+        buffer.append(data,off,len);
+        int pkgCnt = buffer.countPackages();
+        return pkgCnt;
+    }
+
+    /**
+     * Send buffer to cluster listener (callback).
+     * Is message complete receiver send message to callback?
+     *
+     * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#messageDataReceived(ClusterData)
+     * @see XByteBuffer#doesPackageExist()
+     * @see XByteBuffer#extractPackage(boolean)
+     *
+     * @return number of received packages/messages
+     * @throws java.io.IOException
+     */
+    public int execute() throws java.io.IOException {
+        int pkgCnt = 0;
+        boolean pkgExists = buffer.doesPackageExist();
+        while ( pkgExists ) {
+            ClusterData data = buffer.extractPackage(true);
+            getCallback().messageDataReceived(data);
+            pkgCnt++;
+            pkgExists = buffer.doesPackageExist();
+        }
+        return pkgCnt;
+    }
+    
+    /**
+     * Write Ack to sender
+     * @param buf
+     * @return The bytes written count
+     * @throws java.io.IOException
+     */
+    public int write(ByteBuffer buf) throws java.io.IOException {
+        return getChannel().write(buf);
+    }
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ReplicationStream.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ReplicationStream.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ReplicationStream.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/ReplicationStream.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,100 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.cluster.io;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * Custom subclass of <code>ObjectInputStream</code> that loads from the
+ * class loader for this web application.  This allows classes defined only
+ * with the web application to be found correctly.
+ *
+ * @author Craig R. McClanahan
+ * @author Bip Thelin
+ * @author Filip Hanik
+ * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ */
+
+public final class ReplicationStream extends ObjectInputStream {
+
+
+    /**
+     * The class loader we will use to resolve classes.
+     */
+    private ClassLoader classLoader = null;
+
+    /**
+     * Construct a new instance of CustomObjectInputStream
+     *
+     * @param stream The input stream we will read from
+     * @param classLoader The class loader used to instantiate objects
+     *
+     * @exception IOException if an input/output error occurs
+     */
+    public ReplicationStream(InputStream stream,
+                             ClassLoader classLoader)
+        throws IOException {
+
+        super(stream);
+        this.classLoader = classLoader;
+    }
+
+    /**
+     * Load the local class equivalent of the specified stream class
+     * description, by using the class loader assigned to this Context.
+     *
+     * @param classDesc Class description from the input stream
+     *
+     * @exception ClassNotFoundException if this class cannot be found
+     * @exception IOException if an input/output error occurs
+     */
+    public Class resolveClass(ObjectStreamClass classDesc)
+        throws ClassNotFoundException, IOException {
+        String name = classDesc.getName();
+        boolean tryRepFirst = name.startsWith("org.apache.catalina.cluster");
+        try {
+            try
+            {
+                if ( tryRepFirst ) return findReplicationClass(name);
+                else return findExternalClass(name);
+            }
+            catch ( Exception x )
+            {
+                if ( tryRepFirst ) return findExternalClass(name);
+                else return findReplicationClass(name);
+            }
+        } catch (ClassNotFoundException e) {
+            return super.resolveClass(classDesc);
+        }
+    }
+    
+    public Class findReplicationClass(String name)
+        throws ClassNotFoundException, IOException {
+        return Class.forName(name, false, getClass().getClassLoader());
+    }
+
+    public Class findExternalClass(String name)
+        throws ClassNotFoundException, IOException {
+        return Class.forName(name, false, classLoader);
+    }
+
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/SocketObjectReader.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,109 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.io;
+
+
+import java.net.Socket;
+
+import org.apache.catalina.cluster.tcp.ClusterData;
+
+/**
+ * The object reader object is an object used in conjunction with
+ * java.nio TCP messages. This object stores the message bytes in a
+ * <code>XByteBuffer</code> until a full package has been received.
+ * When a full package has been received, the append method will call messageDataReceived
+ * on the callback object associated with this object reader.<BR>
+ * This object uses an XByteBuffer which is an extendable object buffer that also allows
+ * for message encoding and decoding.
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ * @since 5.5.10
+ */
+public class SocketObjectReader
+{
+    private Socket socket;
+    private ListenCallback callback;
+    private XByteBuffer buffer;
+
+    /**
+     * use this socket and callback to receive messages
+     * @param socket listener socket
+     * @param callback ClusterReceiverBase listener
+     */
+    public SocketObjectReader( Socket socket,
+                               ListenCallback callback)  {
+        this.socket = socket;
+        this.callback = callback;
+        this.buffer = new XByteBuffer();
+    }
+
+    
+    /**
+     * Append new bytes to buffer. 
+     * Is message complete receiver send message to callback
+     * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#messageDataReceived(ClusterData)
+     * @see XByteBuffer#doesPackageExist()
+     * @see XByteBuffer#extractPackage(boolean)
+     * @param data new transfer buffer
+     * @param off offset
+     * @param len length in buffer
+     * @return number of messages that sended to callback
+     * @throws java.io.IOException
+     */
+    public int append(byte[] data,int off,int len) throws java.io.IOException {
+        if(len > 0)
+            buffer.append(data,off,len);
+        boolean pkgExists = buffer.doesPackageExist();
+        int pkgCnt = 0;
+        while ( pkgExists ) {
+            ClusterData cdata = buffer.extractPackage(true);
+            if(callback.isSendAck())
+                callback.sendAck() ;
+            callback.messageDataReceived(cdata);
+            pkgCnt++;
+            pkgExists = buffer.doesPackageExist();
+        }
+        return pkgCnt;
+    }
+
+    
+    /**
+     * send message to callback
+     * @see SocketObjectReader#append(byte[], int, int)
+     * @return Count of packages written
+     * @throws java.io.IOException
+     */
+    public int execute() throws java.io.IOException {
+        return append(null,0,0);
+    }
+
+    /**
+     * write data to socket (ack)
+     * @see org.apache.catalina.cluster.tcp.SocketReplicationListener#sendAck
+     * @param data
+     * @return Always zero
+     * @throws java.io.IOException
+     */
+    public int write(byte[] data)
+       throws java.io.IOException {
+       socket.getOutputStream().write(data);
+       return 0;
+
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/XByteBuffer.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/XByteBuffer.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/io/XByteBuffer.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,425 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.io;
+
+import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.tcp.ClusterData;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+import org.apache.catalina.cluster.io.ReplicationStream;
+
+/**
+ * The XByteBuffer provides a dual functionality.
+ * One, it stores message bytes and automatically extends the byte buffer if needed.<BR>
+ * Two, it can encode and decode packages so that they can be defined and identified
+ * as they come in on a socket.
+ * <br/>
+ * Transfer package:
+ * <ul>
+ * <li><b>START_DATA/b> - 7 bytes - <i>FLT2002</i></li>
+ * <li><b>COMPRESS</b>  - 4 bytes - is message compressed flag</li>
+ * <li><b>SIZE</b>      - 4 bytes - size of the data package</li>
+ * <li><b>DATA</b>      - should be as many bytes as the prev SIZE</li>
+ * <li><b>END_DATA</b>  - 7 bytes - <i>TLF2003</i></lI>
+ * </ul>
+ * FIXME: Why we not use a list of byte buffers?
+ * FIXME: Used a pool of buffers instead, every time new generation
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 377484 $, $Date: 2006-02-13 15:00:05 -0600 (Mon, 13 Feb 2006) $
+ */
+public class XByteBuffer
+{
+    
+    public static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( XByteBuffer.class );
+    
+    /**
+     * This is a package header, 7 bytes (FLT2002)
+     */
+    public static final byte[] START_DATA = {70,76,84,50,48,48,50};
+    
+    /**
+     * This is the package footer, 7 bytes (TLF2003)
+     */
+    public static final byte[] END_DATA = {84,76,70,50,48,48,51};
+ 
+    /**
+     * Default size on the initial byte buffer
+     */
+    static final int DEF_SIZE = 1024;
+ 
+    /**
+     * Default size to extend the buffer with
+     */
+    static final int DEF_EXT  = 1024;
+    
+    /**
+     * Variable to hold the data
+     */
+    protected byte[] buf = null;
+    
+    /**
+     * Current length of data in the buffer
+     */
+    protected int bufSize = 0;
+    
+    /**
+     * Constructs a new XByteBuffer
+     * @param size - the initial size of the byte buffer
+     */
+    public XByteBuffer(int size) {
+        buf = new byte[size];
+    }
+
+    /**
+     * Constructs a new XByteBuffer with an initial size of 1024 bytes
+     */
+    public XByteBuffer()  {
+        this(DEF_SIZE);
+    }
+
+    /**
+     * Returns the bytes in the buffer, in its exact length
+     */
+    public byte[] getBytes() {
+        byte[] b = new byte[bufSize];
+        System.arraycopy(buf,0,b,0,bufSize);
+        return b;
+    }
+
+    /**
+     * Resets the buffer
+     */
+    public void clear() {
+        bufSize = 0;
+    }
+
+    /**
+     * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
+     * header, false will be returned and the data will be discarded.
+     * @param b - bytes to be appended
+     * @param off - the offset to extract data from
+     * @param len - the number of bytes to append.
+     * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0
+     */
+    public boolean append(byte[] b, int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0) ||
+            ((off + len) > b.length) || ((off + len) < 0))  {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return false;
+        }
+
+        int newcount = bufSize + len;
+        if (newcount > buf.length) {
+            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+            System.arraycopy(buf, 0, newbuf, 0, bufSize);
+            buf = newbuf;
+        }
+        System.arraycopy(b, off, buf, bufSize, len);
+        bufSize = newcount;
+
+        if (bufSize > START_DATA.length && (firstIndexOf(buf,0,START_DATA)==-1)){
+            bufSize = 0;
+            log.error("Discarded the package, invalid header");
+            return false;
+        }
+        return true;
+    }
+
+
+    /**
+     * Internal mechanism to make a check if a complete package exists
+     * within the buffer
+     * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
+     */
+    public int countPackages()
+    {
+        int cnt = 0;
+        int pos = START_DATA.length;
+        int start = 0;
+
+        while ( start < bufSize ) {
+            //first check start header
+            int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
+            //if the header (START_DATA) isn't the first thing or
+            //the buffer isn't even 14 bytes
+            if ( index != start || ((bufSize-start)<14) ) break;
+            //next 4 bytes are compress flag not needed for count packages
+            //then get the size 4 bytes
+            int size = toInt(buf, pos+4);
+            //now the total buffer has to be long enough to hold
+            //START_DATA.length+8+size+END_DATA.length
+            pos = start + START_DATA.length + 8 + size;
+            if ( (pos + END_DATA.length) > bufSize) break;
+            //and finally check the footer of the package END_DATA
+            int newpos = firstIndexOf(buf, pos, END_DATA);
+            //mismatch, there is no package
+            if (newpos != pos) break;
+            //increase the packet count
+            cnt++;
+            //reset the values
+            start = pos + END_DATA.length;
+            pos = start + START_DATA.length;
+        }
+        return cnt;
+    }
+
+    /**
+     * Method to check if a package exists in this byte buffer.
+     * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer
+     */
+    public boolean doesPackageExist()  {
+        return (countPackages()>0);
+    }
+
+    /**
+     * Extracts the message bytes from a package.
+     * If no package exists, a IllegalStateException will be thrown.
+     * @param clearFromBuffer - if true, the package will be removed from the byte buffer
+     * @return - returns the actual message bytes (header, compress,size and footer not included).
+     */
+    public ClusterData extractPackage(boolean clearFromBuffer)
+            throws java.io.IOException {
+        int psize = countPackages();
+        if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
+        int compress = toInt(buf, START_DATA.length);
+        int size = toInt(buf, START_DATA.length +4);
+        byte[] data = new byte[size];
+        System.arraycopy(buf, START_DATA.length + 8, data, 0, size);
+        ClusterData cdata = new ClusterData() ;
+        cdata.setMessage(data);
+        cdata.setCompress(compress);
+        if (clearFromBuffer) {
+            int totalsize = START_DATA.length + 8 + size + END_DATA.length;
+            bufSize = bufSize - totalsize;
+            System.arraycopy(buf, totalsize, buf, 0, bufSize);
+        }
+        return cdata;
+    }
+
+    /**
+     * Convert four bytes to an int
+     * @param b - the byte array containing the four bytes
+     * @param off - the offset
+     * @return the integer value constructed from the four bytes
+     * @exception java.lang.ArrayIndexOutOfBoundsException
+     */
+    public static int toInt(byte[] b,int off){
+        return ( ( (int) b[off+3]) & 0xFF) +
+            ( ( ( (int) b[off+2]) & 0xFF) << 8) +
+            ( ( ( (int) b[off+1]) & 0xFF) << 16) +
+            ( ( ( (int) b[off+0]) & 0xFF) << 24);
+    }
+
+    /**
+     * Convert eight bytes to a long
+     * @param b - the byte array containing the four bytes
+     * @param off - the offset
+     * @return the long value constructed from the eight bytes
+     * @exception java.lang.ArrayIndexOutOfBoundsException
+     */
+    public static long toLong(byte[] b,int off){
+        return ( ( (long) b[off+7]) & 0xFF) +
+            ( ( ( (long) b[off+6]) & 0xFF) << 8) +
+            ( ( ( (long) b[off+5]) & 0xFF) << 16) +
+            ( ( ( (long) b[off+4]) & 0xFF) << 24) +
+            ( ( ( (long) b[off+3]) & 0xFF) << 32) +
+            ( ( ( (long) b[off+2]) & 0xFF) << 40) +
+            ( ( ( (long) b[off+1]) & 0xFF) << 48) +
+            ( ( ( (long) b[off+0]) & 0xFF) << 56);
+    }
+
+    /**
+     * Converts an integer to four bytes
+     * @param n - the integer
+     * @return - four bytes in an array
+     */
+    public static byte[] toBytes(int n) {
+        byte[] b = new byte[4];
+        b[3] = (byte) (n);
+        n >>>= 8;
+        b[2] = (byte) (n);
+        n >>>= 8;
+        b[1] = (byte) (n);
+        n >>>= 8;
+        b[0] = (byte) (n);
+        return b;
+    }
+
+    /**
+     * Converts an long to eight bytes
+     * @param n - the long
+     * @return - eight bytes in an array
+     */
+    public static byte[] toBytes(long n) {
+        byte[] b = new byte[8];
+        b[7] = (byte) (n);
+        n >>>= 8;
+        b[6] = (byte) (n);
+        n >>>= 8;
+        b[5] = (byte) (n);
+        n >>>= 8;
+        b[4] = (byte) (n);
+        n >>>= 8;
+        b[3] = (byte) (n);
+        n >>>= 8;
+        b[2] = (byte) (n);
+        n >>>= 8;
+        b[1] = (byte) (n);
+        n >>>= 8;
+        b[0] = (byte) (n);
+        return b;
+    }
+
+    /**
+     * Similar to a String.IndexOf, but uses pure bytes
+     * @param src - the source bytes to be searched
+     * @param srcOff - offset on the source buffer
+     * @param find - the string to be found within src
+     * @return - the index of the first matching byte. -1 if the find array is not found
+     */
+    public static int firstIndexOf(byte[] src, int srcOff, byte[] find){
+        int result = -1;
+        if (find.length > src.length) return result;
+        if (find.length == 0 || src.length == 0) return result;
+        if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
+        boolean found = false;
+        int srclen = src.length;
+        int findlen = find.length;
+        byte first = find[0];
+        int pos = srcOff;
+        while (!found) {
+            //find the first byte
+            while (pos < srclen){
+                if (first == src[pos])
+                    break;
+                pos++;
+            }
+            if (pos >= srclen)
+                return -1;
+
+            //we found the first character
+            //match the rest of the bytes - they have to match
+            if ( (srclen - pos) < findlen)
+                return -1;
+            //assume it does exist
+            found = true;
+            for (int i = 1; ( (i < findlen) && found); i++)
+                found = found && (find[i] == src[pos + i]);
+            if (found)
+                result = pos;
+            else if ( (srclen - pos) < findlen)
+                return -1; //no more matches possible
+            else
+                pos++;
+        }
+        return result;
+    }
+
+    /**
+     * Creates a complete data package
+     * @param indata - the message data to be contained within the package
+     * @param compressed - compression flag for the indata buffer
+     * @return - a full package (header,compress,size,data,footer)
+     * 
+     */
+    public static byte[] createDataPackage(ClusterData cdata)
+            throws java.io.IOException {
+        byte[] data = cdata.getMessage();
+        byte[] comprdata = XByteBuffer.toBytes(cdata.getCompress());
+        int length = 
+            START_DATA.length + //header length
+            4 + //compression flag
+            4 + //data length indicator
+            data.length + //actual data length
+            END_DATA.length; //footer length
+        byte[] result = new byte[length];
+        System.arraycopy(START_DATA, 0, result, 0, START_DATA.length);
+        System.arraycopy(comprdata, 0, result, START_DATA.length, 4);
+        System.arraycopy(toBytes(data.length), 0, result, START_DATA.length + 4, 4);
+        System.arraycopy(data, 0, result, START_DATA.length + 8, data.length);
+        System.arraycopy(END_DATA, 0, result, START_DATA.length + 8 + data.length, END_DATA.length);
+        return result;
+    }
+    
+    public static ClusterMessage deserialize(ClusterData data, boolean compress) 
+        throws IOException, ClassNotFoundException, ClassCastException {
+        Object message = null;
+        if (data != null) {
+            InputStream instream;
+            if (compress ) {
+                instream = new GZIPInputStream(new ByteArrayInputStream(data.getMessage()));
+            } else {
+                instream = new ByteArrayInputStream(data.getMessage());
+            }
+            ReplicationStream stream = new ReplicationStream(instream,XByteBuffer.class.getClassLoader());
+            message = stream.readObject();
+            instream.close();
+        }
+        if ( message == null ) {
+            return null;
+        } else if (message instanceof ClusterMessage)
+            return (ClusterMessage) message;
+        else {
+            throw new ClassCastException("Message has the wrong class. It should implement ClusterMessage, instead it is:"+message.getClass().getName());
+        }
+    }
+
+    /**
+     * Serializes a message into cluster data
+     * @param msg ClusterMessage
+     * @param compress boolean
+     * @return ClusterData
+     * @throws IOException
+     */
+    public static ClusterData serialize(ClusterMessage msg, boolean compress) throws IOException {
+        msg.setTimestamp(System.currentTimeMillis());
+        ByteArrayOutputStream outs = new ByteArrayOutputStream();
+        ObjectOutputStream out;
+        GZIPOutputStream gout = null;
+        ClusterData data = new ClusterData();
+        data.setType(msg.getClass().getName());
+        data.setUniqueId(msg.getUniqueId());
+        data.setTimestamp(msg.getTimestamp());
+        data.setCompress(msg.getCompress());
+        data.setResend(msg.getResend());
+        if (compress) {
+            gout = new GZIPOutputStream(outs);
+            out = new ObjectOutputStream(gout);
+        } else {
+            out = new ObjectOutputStream(outs);
+        }
+        out.writeObject(msg);
+        // flush out the gzip stream to byte buffer
+        if(gout != null) {
+            gout.flush();
+            gout.close();
+        }
+        data.setMessage(outs.toByteArray());
+        return data;
+    }
+
+    
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mbeans-descriptors.xml Thu Feb 23 11:55:14 2006
@@ -0,0 +1,94 @@
+<?xml version="1.0"?>
+<mbeans-descriptors>
+
+  <mbean         name="SimpleTcpCluster"
+            className="org.apache.catalina.mbeans.ClassNameMBean"
+          description="Tcp Cluster implementation"
+               domain="Catalina"
+                group="Cluster"
+                 type="org.apache.catalina.cluster.tcp.SimpleTcpCluster">
+
+    <attribute   name="protocolStack"
+          description="JavaGroups protocol stack selection"
+                 type="java.lang.String"/>
+
+  </mbean>
+
+
+  <mbean         name="SimpleTcpReplicationManager"
+            className="org.apache.catalina.mbeans.ClassNameMBean"
+          description="Clustered implementation of the Manager interface"
+               domain="Catalina"
+                group="Manager"
+                 type="org.apache.catalina.cluster.tcp.SimpleTcpReplicationManager">
+
+    <attribute   name="algorithm"
+          description="The message digest algorithm to be used when generating
+                       session identifiers"
+                 type="java.lang.String"/>
+
+    <attribute   name="checkInterval"
+          description="The interval (in seconds) between checks for expired
+                       sessions"
+                 type="int"/>
+
+    <attribute   name="className"
+          description="Fully qualified class name of the managed object"
+                 type="java.lang.String"
+            writeable="false"/>
+
+    <attribute   name="distributable"
+          description="The distributable flag for Sessions created by this
+                       Manager"
+                 type="boolean"/>
+
+    <attribute   name="entropy"
+          description="A String initialization parameter used to increase the
+                       entropy of the initialization of our random number
+                       generator"
+                 type="java.lang.String"/>
+
+    <attribute   name="managedResource"
+          description="The managed resource this MBean is associated with"
+                 type="java.lang.Object"/>
+
+    <attribute   name="maxActiveSessions"
+          description="The maximum number of active Sessions allowed, or -1
+                       for no limit"
+                 type="int"/>
+
+    <attribute   name="maxInactiveInterval"
+          description="The default maximum inactive interval for Sessions
+                       created by this Manager"
+                 type="int"/>
+
+    <attribute   name="name"
+          description="The descriptive name of this Manager implementation
+                       (for logging)"
+                 type="java.lang.String"
+            writeable="false"/>
+
+  </mbean>
+
+
+
+<mbean         name="ReplicationValve"
+            className="org.apache.catalina.mbeans.ClassNameMBean"
+          description="Valve for simple tcp replication"
+               domain="Catalina"
+                group="Valve"
+                 type="org.apache.catalina.cluster.tcp.ReplicationValve">
+
+    <attribute   name="className"
+          description="Fully qualified class name of the managed object"
+                 type="java.lang.String"
+            writeable="false"/>
+
+    <attribute   name="debug"
+          description="The debugging detail level for this component"
+                 type="int"/>
+
+  </mbean>
+
+
+</mbeans-descriptors>

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/Constants.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/Constants.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/Constants.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/Constants.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.cluster.mcast;
+
+/**
+ * Manifest constants for the <code>org.apache.catalina.cluster.mcast</code>
+ * package.
+ *
+ * @author Peter Rossbach
+ * @version $Revision: 303950 $ $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 2005) $
+ */
+
+public class Constants {
+
+    public static final String Package = "org.apache.catalina.cluster.mcast";
+
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/LocalStrings.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/LocalStrings.properties?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/LocalStrings.properties (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/LocalStrings.properties Thu Feb 23 11:55:14 2006
@@ -0,0 +1 @@
+cluster.mbean.register.already=MBean {0} already registered!

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMember.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMember.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMember.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,338 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.mcast;
+
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.cluster.io.XByteBuffer;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast member.
+ * Carries the host, and port of the this or other cluster nodes.
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+public class McastMember implements Member, java.io.Serializable {
+
+    /**
+     * Digits, used for "superfast" de-serialization of an
+     * IP address
+     */
+    final transient static char[] digits = {
+        '0', '1', '2', '3', '4', '5',
+        '6', '7', '8', '9'};
+
+    /**
+     * Public properties specific to this implementation
+     */
+    public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
+    public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
+    public static final transient String MEMBER_NAME = "memberName";
+    public static final transient String MEMBER_DOMAIN = "memberDomain";
+    
+    /**
+     * The listen host for this member
+     */
+    protected String host;
+    /**
+     * The tcp listen port for this member
+     */
+    protected int port;
+    /**
+     * The name for this member, has be be unique within the cluster.
+     */
+    private String name;
+
+    /**
+     * The name of the cluster domain from this node
+     */
+    private String domain;
+    
+    /**
+     * Counter for how many messages have been sent from this member
+     */
+    protected int msgCount = 0;
+    /**
+     * The number of milliseconds since this members was
+     * created, is kept track of using the start time
+     */
+    protected long memberAliveTime = 0;
+
+
+    /**
+     * Construct a new member object
+     * @param name - the name of this member, cluster unique
+     * @param domain - the cluster domain name of this member
+     * @param host - the tcp listen host
+     * @param port - the tcp listen port
+     */
+    public McastMember(String name,
+                       String domain,
+                       String host,
+                       int port,
+                       long aliveTime) {
+        this.host = host;
+        this.port = port;
+        this.name = name;
+        this.domain = domain;
+        this.memberAliveTime=aliveTime;
+    }
+
+    /**
+     *
+     * @return a Hashmap containing the following properties:<BR>
+     * 1. tcpListenPort - the port this member listens to for messages - string<BR>
+     * 2. tcpListenHost - the host address of this member - string<BR>
+     * 3. memberName    - the name of this member - string<BR>
+     */
+    public java.util.HashMap getMemberProperties() {
+        java.util.HashMap map = new java.util.HashMap(2);
+        map.put(McastMember.TCP_LISTEN_HOST,this.host);
+        map.put(McastMember.TCP_LISTEN_PORT,String.valueOf(this.port));
+        map.put(McastMember.MEMBER_NAME,name);
+        map.put(McastMember.MEMBER_DOMAIN,domain);
+        return map;
+    }
+
+    /**
+     * Increment the message count.
+     */
+    protected void inc() {
+        msgCount++;
+    }
+
+    /**
+     * Create a data package to send over the wire representing this member.
+     * This is faster than serialization.
+     * @return - the bytes for this member deserialized
+     * @throws Exception
+     */
+    protected byte[] getData(long startTime) throws Exception {
+        //package looks like
+        //alive - 8 bytes
+        //port - 4 bytes
+        //host - 4 bytes
+        //nlen - 4 bytes
+        //name - nlen bytes
+        //dlen - 4 bytes
+        //domain - dlen bytes
+        byte[] named = getName().getBytes();
+        byte[] domaind = getDomain().getBytes();
+        byte[] addr = java.net.InetAddress.getByName(host).getAddress();
+        byte[] data = new byte[8+4+addr.length+4+named.length+4+domaind.length];
+        long alive=System.currentTimeMillis()-startTime;
+        System.arraycopy(XByteBuffer.toBytes((long)alive),0,data,0,8);
+        System.arraycopy(XByteBuffer.toBytes(port),0,data,8,4);
+        System.arraycopy(addr,0,data,12,addr.length);
+        System.arraycopy(XByteBuffer.toBytes(named.length),0,data,16,4);
+        System.arraycopy(named,0,data,20,named.length);
+        System.arraycopy(XByteBuffer.toBytes(domaind.length),0,data,named.length+20,4);
+        System.arraycopy(domaind,0,data,named.length+24,domaind.length);
+        return data;
+    }
+    /**
+     * Deserializes a member from data sent over the wire
+     * @param data - the bytes received
+     * @return a member object.
+     */
+    protected static McastMember getMember(byte[] data) {
+       //package looks like
+       //alive - 8 bytes
+       //port - 4 bytes
+       //host - 4 bytes
+       //nlen - 4 bytes
+       //name - nlen bytes
+       //dlen - 4 bytes
+       //domain - dlen bytes
+       byte[] alived = new byte[8];
+       System.arraycopy(data, 0, alived, 0, 8);
+       byte[] portd = new byte[4];
+       System.arraycopy(data, 8, portd, 0, 4);
+       byte[] addr = new byte[4];
+       System.arraycopy(data, 12, addr, 0, 4);
+       //FIXME control the nlen
+       byte[] nlend = new byte[4];
+       System.arraycopy(data, 16, nlend, 0, 4);
+       int nlen = XByteBuffer.toInt(nlend, 0);
+       byte[] named = new byte[nlen];
+       System.arraycopy(data, 20, named, 0, named.length);
+       //FIXME control the dlen
+       byte[] dlend = new byte[4];
+       System.arraycopy(data, nlen + 20, dlend, 0, 4);
+       int dlen = XByteBuffer.toInt(dlend, 0);
+       byte[] domaind = new byte[dlen];
+       System.arraycopy(data, nlen + 24, domaind, 0, domaind.length);
+       return new McastMember(new String(named),
+                              new String(domaind),
+                              addressToString(addr),
+                              XByteBuffer.toInt(portd, 0),
+                              XByteBuffer.toLong(alived, 0));
+    }
+
+    /**
+     * Return the name of this object
+     * @return a unique name to the cluster
+     */
+    public String getName() {
+        return name;
+    }
+    
+    /**
+     * Return the domain of this object
+     * @return a cluster domain to the cluster
+     */
+    public String getDomain() {
+        return domain;
+    }
+    
+    /**
+     * Return the listen port of this member
+     * @return - tcp listen port
+     */
+    public int getPort()  {
+        return this.port;
+    }
+
+    /**
+     * Return the TCP listen host for this member
+     * @return IP address or host name
+     */
+    public String getHost()  {
+        return this.host;
+    }
+
+    /**
+     * Contains information on how long this member has been online.
+     * The result is the number of milli seconds this member has been
+     * broadcasting its membership to the cluster.
+     * @return nr of milliseconds since this member started.
+     */
+    public long getMemberAliveTime() {
+       return memberAliveTime;
+    }
+
+    public void setMemberAliveTime(long time) {
+       memberAliveTime=time;
+    }
+
+
+
+    /**
+     * String representation of this object
+     */
+    public String toString()  {
+        return "org.apache.catalina.cluster.mcast.McastMember["+name+","+domain+","+host+","+port+", alive="+memberAliveTime+"]";
+    }
+
+    /**
+     * @see java.lang.Object#hashCode()
+     * @return The hash code
+     */
+    public int hashCode() {
+        return this.name.hashCode();
+    }
+
+    /**
+     * Returns true if the param o is a McastMember with the same name
+     * @param o
+     */
+    public boolean equals(Object o) {
+        if ( o instanceof McastMember )    {
+            return this.name.equals(((McastMember)o).getName());
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Converts for bytes (ip address) to a string representation of it<BR>
+     * Highly optimized method.
+     * @param address (4 bytes ip address)
+     * @return string representation of that ip address
+     */
+    private static final String addressToString(byte[] address) {
+        int q, r = 0;
+        int charPos = 15;
+        char[] buf = new char[15];
+        char dot = '.';
+
+        int i = address[3] & 0xFF;
+        for (; ; )
+        {
+            q = (i * 52429) >>> (19);
+            r = i - ( (q << 3) + (q << 1));
+            buf[--charPos] = digits[r];
+            i = q;
+            if (i == 0)
+                break;
+        }
+        buf[--charPos] = dot;
+        i = address[2] & 0xFF;
+        for (; ; )
+        {
+            q = (i * 52429) >>> (19);
+            r = i - ( (q << 3) + (q << 1));
+            buf[--charPos] = digits[r];
+            i = q;
+            if (i == 0)
+                break;
+        }
+        buf[--charPos] = dot;
+
+        i = address[1] & 0xFF;
+        for (; ; )
+        {
+            q = (i * 52429) >>> (19);
+            r = i - ( (q << 3) + (q << 1));
+            buf[--charPos] = digits[r];
+            i = q;
+            if (i == 0)
+                break;
+        }
+
+        buf[--charPos] = dot;
+        i = address[0] & 0xFF;
+
+        for (; ; )
+        {
+            q = (i * 52429) >>> (19);
+            r = i - ( (q << 3) + (q << 1));
+            buf[--charPos] = digits[r];
+            i = q;
+            if (i == 0)
+                break;
+        }
+        return new String(buf, charPos, 15 - charPos);
+    }
+    public void setHost(String host) {
+        this.host = host;
+    }
+    public void setMsgCount(int msgCount) {
+        this.msgCount = msgCount;
+    }
+    public void setName(String name) {
+        this.name = name;
+    }
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+    public void setPort(int port) {
+        this.port = port;
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMembership.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMembership.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMembership.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastMembership.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,280 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.mcast;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast membership.
+ * This class is responsible for maintaining a list of active cluster nodes in the cluster.
+ * If a node fails to send out a heartbeat, the node will be dismissed.
+ *
+ * @author Filip Hanik
+ * @author Peter Rossbach
+ * @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec 2005) $
+ */
+public class McastMembership
+{
+    protected static final McastMember[] EMPTY_MEMBERS = new McastMember[0];
+    
+    /**
+     * The name of this membership, has to be the same as the name for the local
+     * member
+     */
+    protected String name;
+    
+    /**
+     * A map of all the members in the cluster.
+     */
+    protected Map map = new HashMap();
+    
+    /**
+     * A list of all the members in the cluster.
+     */
+    protected McastMember[] members = EMPTY_MEMBERS;
+    
+    /**
+      * sort members by alive time
+      */
+    protected MemberComparator memberComparator = new MemberComparator();
+
+    /**
+     * Constructs a new membership
+     * @param name - has to be the name of the local member. Used to filter the local member from the cluster membership
+     */
+    public McastMembership(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Reset the membership and start over fresh.
+     * Ie, delete all the members and wait for them to ping again and join this membership
+     */
+    public synchronized void reset() {
+        map.clear();
+        members = EMPTY_MEMBERS ;
+    }
+
+    /**
+     * Notify the membership that this member has announced itself.
+     *
+     * @param member - the member that just pinged us
+     * @return - true if this member is new to the cluster, false otherwise.
+     * @return - false if this member is the local member or updated.
+     */
+    public synchronized boolean memberAlive(McastMember member) {
+        boolean result = false;
+        //ignore ourselves
+        if ( member.getName().equals(name) ) return result;
+
+        //return true if the membership has changed
+        MbrEntry entry = (MbrEntry)map.get(member.getName());
+        if ( entry == null ) {
+            entry = new MbrEntry(member);
+            map.put(member.getName(),entry);
+            addMcastMember(member);
+            result = true;
+       } else {
+            //update the member alive time
+            McastMember updateMember = entry.getMember() ;
+            if(updateMember.getMemberAliveTime() != member.getMemberAliveTime()) {
+                updateMember.setMemberAliveTime(member.getMemberAliveTime());
+                Arrays.sort(members, memberComparator);
+            }
+        }
+        entry.accessed();
+ 
+        return result;
+    }
+
+    /**
+     * Add a member to this component and sort array with memberComparator
+     * @param member The member to add
+     */
+    protected void addMcastMember(McastMember member) {
+      synchronized (members) {
+          McastMember results[] =
+            new McastMember[members.length + 1];
+          for (int i = 0; i < members.length; i++)
+              results[i] = members[i];
+          results[members.length] = member;
+          members = results;
+          Arrays.sort(members, memberComparator);
+      }
+    }
+    
+    /**
+     * Remove a member from this component.
+     * 
+     * @param member The member to remove
+     */
+    protected void removeMcastMember(McastMember member) {
+        synchronized (members) {
+            int n = -1;
+            for (int i = 0; i < members.length; i++) {
+                if (members[i] == member) {
+                    n = i;
+                    break;
+                }
+            }
+            if (n < 0)
+                return;
+            McastMember results[] =
+              new McastMember[members.length - 1];
+            int j = 0;
+            for (int i = 0; i < members.length; i++) {
+                if (i != n)
+                    results[j++] = members[i];
+            }
+            members = results;
+        }
+    }
+
+    /**
+     * Runs a refresh cycle and returns a list of members that has expired.
+     * This also removes the members from the membership, in such a way that
+     * getMembers() = getMembers() - expire()
+     * @param maxtime - the max time a member can remain unannounced before it is considered dead.
+     * @return the list of expired members
+     */
+    public synchronized McastMember[] expire(long maxtime) {
+        if(!hasMembers() )
+           return EMPTY_MEMBERS;
+       
+        ArrayList list = null;
+        Iterator i = map.values().iterator();
+        while(i.hasNext()) {
+            MbrEntry entry = (MbrEntry)i.next();
+            if( entry.hasExpired(maxtime) ) {
+                if(list == null) // only need a list when members are expired (smaller gc)
+                    list = new java.util.ArrayList();
+                list.add(entry.getMember());
+            }
+        }
+        
+        if(list != null) {
+            McastMember[] result = new McastMember[list.size()];
+            list.toArray(result);
+            for( int j=0; j<result.length; j++) {
+                map.remove(result[j].getName());
+                removeMcastMember(result[j]);
+            }
+            return result;
+        } else {
+            return EMPTY_MEMBERS ;
+        }
+    }
+
+    /**
+     * Returning that service has members or not
+     */
+    public synchronized boolean hasMembers() {
+        return members.length > 0 ;
+    }
+ 
+    /**
+     * Returning a list of all the members in the membership
+     * We not need a copy: add and remove generate new arrays.
+     */
+    public synchronized McastMember[] getMembers() {
+        if(hasMembers()) {
+            return members;
+        } else {
+            return EMPTY_MEMBERS;
+        }
+    }
+
+    /**
+     * get a copy from all member entries
+     */
+    protected synchronized MbrEntry[] getMemberEntries()
+    {
+        MbrEntry[] result = new MbrEntry[map.size()];
+        java.util.Iterator i = map.entrySet().iterator();
+        int pos = 0;
+        while ( i.hasNext() )
+            result[pos++] = ((MbrEntry)((java.util.Map.Entry)i.next()).getValue());
+        return result;
+    }
+    
+    // --------------------------------------------- Inner Class
+
+    private class MemberComparator implements java.util.Comparator {
+
+        public int compare(Object o1, Object o2) {
+            try {
+                return compare((McastMember) o1, (McastMember) o2);
+            } catch (ClassCastException x) {
+                return 0;
+            }
+        }
+
+        public int compare(McastMember m1, McastMember m2) {
+            //longer alive time, means sort first
+            long result = m2.getMemberAliveTime() - m1.getMemberAliveTime();
+            if (result < 0)
+                return -1;
+            else if (result == 0)
+                return 0;
+            else
+                return 1;
+        }
+    }
+    
+    /**
+     * Inner class that represents a member entry
+     */
+    protected static class MbrEntry
+    {
+
+        protected McastMember mbr;
+        protected long lastHeardFrom;
+
+        public MbrEntry(McastMember mbr) {
+           this.mbr = mbr;
+        }
+
+        /**
+         * Indicate that this member has been accessed.
+         */
+        public void accessed(){
+           lastHeardFrom = System.currentTimeMillis();
+        }
+
+        /**
+         * Return the actual McastMember object
+         */
+        public McastMember getMember() {
+            return mbr;
+        }
+
+        /**
+         * Check if this dude has expired
+         * @param maxtime The time threshold
+         */
+        public boolean hasExpired(long maxtime) {
+            long delta = System.currentTimeMillis() - lastHeardFrom;
+            return delta > maxtime;
+        }
+    }
+}

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastService.java?rev=380209&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastService.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/cluster/mcast/McastService.java Thu Feb 23 11:55:14 2006
@@ -0,0 +1,428 @@
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.cluster.mcast;
+
+import java.util.Properties;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.catalina.Cluster;
+import org.apache.catalina.Container;
+import org.apache.catalina.cluster.Member;
+import org.apache.catalina.cluster.MembershipListener;
+import org.apache.catalina.cluster.MembershipService;
+import org.apache.catalina.core.StandardHost;
+import org.apache.catalina.util.StringManager;
+import org.apache.commons.modeler.Registry;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast membership service.
+ * This class is responsible for maintaining a list of active cluster nodes in the cluster.
+ * If a node fails to send out a heartbeat, the node will be dismissed.
+ *
+ * @author Filip Hanik
+ * @version $Revision: 378093 $, $Date: 2006-02-15 15:13:45 -0600 (Wed, 15 Feb 2006) $
+ */
+
+
+public class McastService implements MembershipService,MembershipListener {
+
+    private static org.apache.commons.logging.Log log =
+        org.apache.commons.logging.LogFactory.getLog( McastService.class );
+
+    /**
+     * The string manager for this package.
+     */
+    protected StringManager sm = StringManager.getManager(Constants.Package);
+
+    /**
+     * The descriptive information about this implementation.
+     */
+    private static final String info = "McastService/2.1";
+
+    /**
+     * The implementation specific properties
+     */
+    protected Properties properties = new Properties();
+    /**
+     * A handle to the actual low level implementation
+     */
+    protected McastServiceImpl impl;
+    /**
+     * A membership listener delegate (should be the cluster :)
+     */
+    protected MembershipListener listener;
+    /**
+     * The local member
+     */
+    protected McastMember localMember ;
+    private int mcastSoTimeout;
+    private int mcastTTL;
+
+    /**
+     * Transmitter Mbean name
+     */
+    private ObjectName objectName;
+
+    private Registry registry;
+
+    /**
+     * Create a membership service.
+     */
+    public McastService() {
+        properties.setProperty("mcastClusterDomain", "catalina");
+    }
+
+    /**
+     * Return descriptive information about this implementation and the
+     * corresponding version number, in the format
+     * <code>&lt;description&gt;/&lt;version&gt;</code>.
+     */
+    public String getInfo() {
+        return (info);
+    }
+    
+    /**
+     * Transmitter ObjectName
+     * 
+     * @param name
+     */
+    public void setObjectName(ObjectName name) {
+        objectName = name;
+    }
+
+    public ObjectName getObjectName() {
+        return objectName;
+    }
+
+    /**
+     *
+     * @param properties
+     * <BR/>All are required<BR />
+     * 1. mcastPort - the port to listen to<BR>
+     * 2. mcastAddress - the mcast group address<BR>
+     * 3. mcastClusterDomain - the mcast cluster domain<BR>
+     * 4. bindAddress - the bind address if any - only one that can be null<BR>
+     * 5. memberDropTime - the time a member is gone before it is considered gone.<BR>
+     * 6. msgFrequency - the frequency of sending messages<BR>
+     * 7. tcpListenPort - the port this member listens to<BR>
+     * 8. tcpListenHost - the bind address of this member<BR>
+     * @exception java.lang.IllegalArgumentException if a property is missing.
+     */
+    public void setProperties(Properties properties) {
+        hasProperty(properties,"mcastPort");
+        hasProperty(properties,"mcastAddress");
+        hasProperty(properties,"mcastClusterDomain");
+        hasProperty(properties,"memberDropTime");
+        hasProperty(properties,"msgFrequency");
+        hasProperty(properties,"tcpListenPort");
+        hasProperty(properties,"tcpListenHost");
+        this.properties = properties;
+    }
+
+    /**
+     * Return the properties, see setProperties
+     */
+    public Properties getProperties() {
+        return properties;
+    }
+
+    /**
+     * Return the local member name
+     */
+    public String getLocalMemberName() {
+        return localMember.toString() ;
+    }
+ 
+    /**
+     * Return the local member
+     */
+    public Member getLocalMember() {
+        localMember.setMemberAliveTime(System.currentTimeMillis()-impl.getServiceStartTime());
+        return localMember;
+    }
+    
+    /**
+     * Sets the local member properties for broadcasting
+     */
+    public void setLocalMemberProperties(String listenHost, int listenPort) {
+        properties.setProperty("tcpListenHost",listenHost);
+        properties.setProperty("tcpListenPort",String.valueOf(listenPort));
+    }
+    
+    public void setMcastAddr(String addr) {
+        properties.setProperty("mcastAddress", addr);
+    }
+
+    public String getMcastAddr() {
+        return properties.getProperty("mcastAddress");
+    }
+
+    public void setMcastBindAddress(String bindaddr) {
+        properties.setProperty("mcastBindAddress", bindaddr);
+    }
+
+    public String getMcastBindAddress() {
+        return properties.getProperty("mcastBindAddress");
+    }
+
+    public void setMcastClusterDomain(String clusterDomain) {
+        properties.setProperty("mcastClusterDomain", clusterDomain);
+    }
+
+    public String getMcastClusterDomain() {
+        return properties.getProperty("mcastClusterDomain");
+    }
+
+    public void setMcastPort(int port) {
+        properties.setProperty("mcastPort", String.valueOf(port));
+    }
+
+    public int getMcastPort() {
+        String p = properties.getProperty("mcastPort");
+        return new Integer(p).intValue();
+    }
+    
+    public void setMcastFrequency(long time) {
+        properties.setProperty("msgFrequency", String.valueOf(time));
+    }
+
+    public long getMcastFrequency() {
+        String p = properties.getProperty("msgFrequency");
+        return new Long(p).longValue();
+    }
+
+    public void setMcastDropTime(long time) {
+        properties.setProperty("memberDropTime", String.valueOf(time));
+    }
+
+    public long getMcastDropTime() {
+        String p = properties.getProperty("memberDropTime");
+        return new Long(p).longValue();
+    }
+
+    /**
+     * Check if a required property is available.
+     * @param properties The set of properties
+     * @param name The property to check for
+     */
+    protected void hasProperty(Properties properties, String name){
+        if ( properties.getProperty(name)==null) throw new IllegalArgumentException("Required property \""+name+"\" is missing.");
+    }
+
+    /**
+     * Start broadcasting and listening to membership pings
+     * @throws java.lang.Exception if a IO error occurs
+     */
+    public void start() throws java.lang.Exception {
+        start(MembershipService.MBR_RX);
+        start(MembershipService.MBR_TX);
+    }
+    
+    public void start(int level) throws java.lang.Exception {
+        if ( impl != null ) {
+            impl.start(level);
+            return;
+        }
+        String host = getProperties().getProperty("tcpListenHost");
+        String domain = getProperties().getProperty("mcastClusterDomain");
+        int port = Integer.parseInt(getProperties().getProperty("tcpListenPort"));
+        String name = "tcp://"+host+":"+port;
+        if ( localMember == null ) {
+            localMember = new McastMember(name, domain, host, port, 100);
+        } else {
+            localMember.setName(name);
+            localMember.setDomain(domain);
+            localMember.setHost(host);
+            localMember.setPort(port);
+            localMember.setMemberAliveTime(100);
+        }
+        java.net.InetAddress bind = null;
+        if ( properties.getProperty("mcastBindAddress")!= null ) {
+            bind = java.net.InetAddress.getByName(properties.getProperty("mcastBindAddress"));
+        }
+        int ttl = -1;
+        int soTimeout = -1;
+        if ( properties.getProperty("mcastTTL") != null ) {
+            try {
+                ttl = Integer.parseInt(properties.getProperty("mcastTTL"));
+            } catch ( Exception x ) {
+                log.error("Unable to parse mcastTTL="+properties.getProperty("mcastTTL"),x);
+            }
+        }
+        if ( properties.getProperty("mcastSoTimeout") != null ) {
+            try {
+                soTimeout = Integer.parseInt(properties.getProperty("mcastSoTimeout"));
+            } catch ( Exception x ) {
+                log.error("Unable to parse mcastSoTimeout="+properties.getProperty("mcastSoTimeout"),x);
+            }
+        }
+
+        impl = new McastServiceImpl((McastMember)localMember,Long.parseLong(properties.getProperty("msgFrequency")),
+                                    Long.parseLong(properties.getProperty("memberDropTime")),
+                                    Integer.parseInt(properties.getProperty("mcastPort")),
+                                    bind,
+                                    java.net.InetAddress.getByName(properties.getProperty("mcastAddress")),
+                                    ttl,
+                                    soTimeout,
+                                    this);
+
+        impl.start(level);
+		long memberwait = (Long.parseLong(properties.getProperty("msgFrequency"))*4);
+        if(log.isInfoEnabled())
+            log.info("Sleeping for "+memberwait+" milliseconds to establish cluster membership");
+        Thread.sleep(memberwait);
+
+    }
+
+ 
+    /**
+     * Stop broadcasting and listening to membership pings
+     */
+    public void stop() {
+        try  {
+            if ( impl != null) impl.stop();
+        } catch ( Exception x)  {
+            log.error("Unable to stop the mcast service.",x);
+        }
+        impl = null;
+    }
+
+    /**
+     * register mbean descriptor for package mcast 
+     * @throws Exception
+     */
+    protected void initMBeans() throws Exception {
+      if(registry == null) {
+        registry = Registry.getRegistry(null, null);
+        registry.loadMetadata(this.getClass().getResourceAsStream(
+            "mbeans-descriptors.xml"));
+      }
+    }
+    
+
+    /**
+     * Return all the members by name
+     */
+    public String[] getMembersByName() {
+        Member[] currentMembers = getMembers();
+        String [] membernames ;
+        if(currentMembers != null) {
+            membernames = new String[currentMembers.length];
+            for (int i = 0; i < currentMembers.length; i++) {
+                membernames[i] = currentMembers[i].toString() ;
+            }
+        } else
+            membernames = new String[0] ;
+        return membernames ;
+    }
+ 
+    /**
+     * Return the member by name
+     */
+    public Member findMemberByName(String name) {
+        Member[] currentMembers = getMembers();
+        for (int i = 0; i < currentMembers.length; i++) {
+            if (name.equals(currentMembers[i].toString()))
+                return currentMembers[i];
+        }
+        return null;
+    }
+
+    /**
+     * has members?
+     */
+    public boolean hasMembers() {
+       if ( impl == null || impl.membership == null ) return false;
+       return impl.membership.hasMembers();
+    }
+
+    /**
+     * Return all the members
+     */
+    public Member[] getMembers() {
+        if ( impl == null || impl.membership == null ) return null;
+        return impl.membership.getMembers();
+    }
+    /**
+     * Add a membership listener, this version only supports one listener per service,
+     * so calling this method twice will result in only the second listener being active.
+     * @param listener The listener
+     */
+    public void setMembershipListener(MembershipListener listener) {
+        this.listener = listener;
+    }
+    /**
+     * Remove the membership listener
+     */
+    public void removeMembershipListener(){
+        listener = null;
+    }
+
+    public void memberAdded(Member member) {
+        if ( listener!=null ) listener.memberAdded(member);
+    }
+
+    /**
+     * Callback from the impl when a new member has been received
+     * @param member The member
+     */
+    public void memberDisappeared(Member member)
+    {
+        if ( listener!=null ) listener.memberDisappeared(member);
+    }
+
+    public int getMcastSoTimeout() {
+        return mcastSoTimeout;
+    }
+    public void setMcastSoTimeout(int mcastSoTimeout) {
+        this.mcastSoTimeout = mcastSoTimeout;
+        properties.setProperty("mcastSoTimeout", String.valueOf(mcastSoTimeout));
+    }
+    public int getMcastTTL() {
+        return mcastTTL;
+    }
+    public void setMcastTTL(int mcastTTL) {
+        this.mcastTTL = mcastTTL;
+        properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
+    }
+
+    /**
+     * Simple test program
+     * @param args Command-line arguments
+     * @throws Exception If an error occurs
+     */
+    public static void main(String args[]) throws Exception {
+		if(log.isInfoEnabled())
+            log.info("Usage McastService hostname tcpport");
+        McastService service = new McastService();
+        java.util.Properties p = new java.util.Properties();
+        p.setProperty("mcastPort","5555");
+        p.setProperty("mcastAddress","224.10.10.10");
+        p.setProperty("mcastClusterDomain","catalina");
+        p.setProperty("bindAddress","localhost");
+        p.setProperty("memberDropTime","3000");
+        p.setProperty("msgFrequency","500");
+        p.setProperty("tcpListenPort",args[1]);
+        p.setProperty("tcpListenHost",args[0]);
+        service.setProperties(p);
+        service.start();
+        Thread.sleep(60*1000*60);
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message