tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fha...@apache.org
Subject svn commit: r379904 - in /tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster: ClusterReceiver.java group/ChannelCoordinator.java group/ChannelInterceptorBase.java group/GroupChannel.java tcp/ReplicationListener.java
Date Wed, 22 Feb 2006 21:16:31 GMT
Author: fhanik
Date: Wed Feb 22 13:16:25 2006
New Revision: 379904

URL: http://svn.apache.org/viewcvs?rev=379904&view=rev
Log:
Completed a first version of the independent GroupChannel, still need to remove all the JMX
stuff from the core components, JMX should be monitoring using outside beans, not be baked
into the code

Modified:
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
(original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/ClusterReceiver.java
Wed Feb 22 13:16:25 2006
@@ -59,5 +59,8 @@
      * @return The port
      */
     public int getPort();
+    
+    public void setMessageListener(MessageListener listener);
+    public MessageListener getMessageListener();
 
 }

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
(original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelCoordinator.java
Wed Feb 22 13:16:25 2006
@@ -22,6 +22,7 @@
 import org.apache.catalina.cluster.ClusterSender;
 import org.apache.catalina.cluster.ClusterReceiver;
 import org.apache.catalina.cluster.ClusterChannel;
+import java.io.IOException;
 
 
 /**
@@ -36,8 +37,19 @@
     private ClusterSender clusterSender;
     private MembershipService membershipService;
 
+    public ChannelCoordinator() {
+        
+    }
+    
+    public ChannelCoordinator(ClusterReceiver receiver,
+                              ClusterSender sender,
+                              MembershipService service) {
+        this();
+        this.setClusterReceiver(receiver);
+        this.setClusterSender(sender);
+        this.setMembershipService(service);
+    }
     
-
     /**
      * Send a message to one or more members in the cluster
      * @param destination Member[] - the destinations, null or zero length means all
@@ -45,9 +57,12 @@
      * @param options int - sender options, see class documentation
      * @return ClusterMessage[] - the replies from the members, if any.
      */
-    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options)
{
-        throw new UnsupportedOperationException();
-        //implement sending and receiving logic.
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options)
throws IOException {
+        if ( destination == null ) destination = membershipService.getMembers();
+        for ( int i=0; i<destination.length; i++ ) {
+            clusterSender.sendMessage(msg,destination[i]);
+        }
+        return null;
     }
 
 
@@ -97,6 +112,17 @@
         }
 
     }
+    
+    public void memberAdded(Member member){
+        if ( clusterSender!=null ) clusterSender.add(member);
+        super.memberAdded(member);
+    }
+    
+    public void memberDisappeared(Member member){
+        if ( clusterSender!=null ) clusterSender.remove(member);
+        super.memberDisappeared(member);
+    }
+
 
     public ClusterReceiver getClusterReceiver() {
         return clusterReceiver;
@@ -111,7 +137,13 @@
     }
 
     public void setClusterReceiver(ClusterReceiver clusterReceiver) {
-        this.clusterReceiver = clusterReceiver;
+        if ( clusterReceiver != null ) {
+            this.clusterReceiver = clusterReceiver;
+            this.clusterReceiver.setMessageListener(this);
+        } else {
+            if  (this.clusterReceiver!=null ) this.clusterReceiver.setMessageListener(null);
+            this.clusterReceiver = null;
+        }
     }
 
     public void setClusterSender(ClusterSender clusterSender) {

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
(original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/ChannelInterceptorBase.java
Wed Feb 22 13:16:25 2006
@@ -19,6 +19,7 @@
 import org.apache.catalina.cluster.Member;
 import org.apache.catalina.cluster.MembershipListener;
 import org.apache.catalina.cluster.MessageListener;
+import java.io.IOException;
 
 /**
  * Abstract class for the interceptor base class.
@@ -51,7 +52,7 @@
         return previous;
     }
 
-    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options)
{
+    public ClusterMessage[] sendMessage(Member[] destination, ClusterMessage msg, int options)
throws IOException {
         return getNext().sendMessage(destination, msg,options);
     }
     

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
(original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/group/GroupChannel.java
Wed Feb 22 13:16:25 2006
@@ -71,7 +71,17 @@
      * @return ClusterMessage[] - the replies from the members, if any.
      */
     public ClusterMessage[] send(Member[] destination, ClusterMessage msg, int options) throws
ChannelException {
-        throw new UnsupportedOperationException("Method send not yet implemented.");
+        if ( msg == null ) return null;
+        msg.setAddress(getMembershipService().getLocalMember());
+        msg.setCompress(msg.FLAG_ALLOWED);
+        msg.setTimestamp(System.currentTimeMillis());
+        msg.setResend(msg.FLAG_FORBIDDEN);
+        try {
+            if (interceptors != null)return interceptors.sendMessage(destination, msg, options);
+            else return this.coordinator.sendMessage(destination, msg, options);
+        }catch ( Exception x ) {
+            throw new ChannelException(x);
+        }
     }
     
     /**

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java?rev=379904&r1=379903&r2=379904&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
(original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationListener.java
Wed Feb 22 13:16:25 2006
@@ -16,6 +16,7 @@
 
 package org.apache.catalina.cluster.tcp;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.SelectableChannel;
@@ -25,13 +26,14 @@
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 
-import org.apache.catalina.cluster.io.ObjectReader;
-import org.apache.catalina.cluster.io.ListenCallback;
-import org.apache.catalina.cluster.ClusterReceiver;
-import org.apache.catalina.util.StringManager;
-import java.io.IOException;
 import org.apache.catalina.cluster.ClusterMessage;
+import org.apache.catalina.cluster.ClusterReceiver;
+import org.apache.catalina.cluster.group.ChannelInterceptorBase;
+import org.apache.catalina.cluster.io.ListenCallback;
+import org.apache.catalina.cluster.io.ObjectReader;
 import org.apache.catalina.cluster.io.XByteBuffer;
+import org.apache.catalina.util.StringManager;
+import org.apache.catalina.cluster.MessageListener;
 
 /**
  * @author Filip Hanik
@@ -70,7 +72,7 @@
 
 
     private Object interestOpsMutex = new Object();
-
+    private MessageListener listener = null;
     public ReplicationListener() {
     }
 
@@ -309,7 +311,20 @@
     }
 
     public void messageDataReceived(ClusterData data) {
-        //nothing to do yet
+        if ( this.listener != null ) {
+            try {
+                ClusterMessage msg = deserialize(data);
+                listener.messageReceived(msg);
+            }catch ( java.io.IOException x ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. IOException.",x);
+                }
+            }catch ( java.lang.ClassNotFoundException cx ) {
+                if ( log.isErrorEnabled() ) {
+                    log.error("Unable to receive and deserialize cluster data. ClassNotFoundException.",cx);
+                }
+            }
+        }
     }
 
     /**
@@ -385,8 +400,16 @@
         return tcpListenPort;
     }
 
+    public MessageListener getMessageListener() {
+        return listener;
+    }
+
     public void setTcpListenPort(int tcpListenPort) {
         this.tcpListenPort = tcpListenPort;
+    }
+
+    public void setMessageListener(MessageListener listener) {
+        this.listener = listener;
     }
 
     public String getHost() {



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message