ws-axis-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From az...@apache.org
Subject svn commit: r673414 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/wka/ tribes/
Date Wed, 02 Jul 2008 15:13:53 GMT
Author: azeez
Date: Wed Jul  2 08:13:52 2008
New Revision: 673414

URL: http://svn.apache.org/viewvc?rev=673414&view=rev
Log:
Handling JOIN of a WKA memeber who also happens to be a load balancer


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
Wed Jul  2 08:13:52 2008
@@ -47,7 +47,6 @@
     }
 
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault
{
-        log.info("Received MEMBER_LIST message");
         if(log.isDebugEnabled()){
             log.debug("MembershipManager#domain: " + new String(membershipManager.getDomain()));
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
Wed Jul  2 08:13:52 2008
@@ -75,11 +75,6 @@
             log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(sender));
             MemberJoinedCommand command = (MemberJoinedCommand) msg;
 
-            // do something specific for the membership scheme
-            if (sender.equals(command.getMember())) { // only if the sender is the member
who joine, we need to do some special processing
-                membershipScheme.processJoin(sender);  //TODO: This may not be necessary
-            }
-
             try {
                 command.setMembershipManager(membershipManager);
                 command.execute(null);
@@ -90,13 +85,16 @@
             }
         } else if (msg instanceof MemberListCommand) {
             try {                    //TODO: What if we receive more than one member list
message?
+                log.info("Received MEMBER_LIST message from " + TribesUtil.getName(sender));
                 MemberListCommand command = (MemberListCommand) msg;
                 command.setMembershipManager(membershipManager);
                 command.execute(null);
 
+                return "Processed MEMBER_LIST message";
                 //TODO Send MEMBER_JOINED messages to all nodes
             } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle MEMBER_LIST message";
+                String errMsg = "Cannot handle MEMBER_LIST message from " +
+                                TribesUtil.getName(sender);
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
Wed Jul  2 08:13:52 2008
@@ -27,6 +27,7 @@
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.Response;
 import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.catalina.tribes.membership.MemberImpl;
@@ -35,6 +36,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -166,8 +168,27 @@
                     List<Member> members = new ArrayList<Member>(this.members);
                     members.add(localMember); // Need to set the local member too
                     memListCmd.setMembers(members.toArray(new Member[members.size()]));
-                    rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
-                                              Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+
+                    Response[] responses =
+                            rpcMembershipChannel.send(new Member[]{member}, memListCmd,
+                                                      RpcChannel.ALL_REPLY,
+                                                      Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                                      TribesConstants.MEMBERSHIP_MSG_OPTION,
10000);
+
+                    // Once a response is received from the WKA member to the MEMBER_LIST
message,
+                    // if it does not belong to this domain, simply remove it from the members
+                    if(responses != null && responses.length > 0 && responses[0]
!= null){
+                        Member source = responses[0].getSource();
+                        if(!Arrays.equals(source.getDomain(), member.getDomain())){
+                            if(log.isDebugEnabled()){
+                                log.debug("Member " + TribesUtil.getName(source) +
+                                          " does not belong to local domain " + new String(domain)+
+                                          ". Hence removing it from the list.");
+                            }
+                            members.remove(member);
+                            return false;
+                        }
+                    }
                 } catch (Exception e) {
                     String errMsg = "Could not send MEMBER_LIST to well-known member " +
                                     TribesUtil.getName(member);
@@ -196,7 +217,8 @@
             List<Member> members = new ArrayList<Member>(this.members);
             memListCmd.setMembers(members.toArray(new Member[members.size()]));
             rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
-                                      Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+                                      Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                      TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
             if (log.isDebugEnabled()) {
                 log.debug("Sent MEMBER_LIST to " + TribesUtil.getName(member));
             }
@@ -222,7 +244,9 @@
 
             if (membersToSend.size() > 0) {
                 rpcMembershipChannel.send(membersToSend.toArray(new Member[membersToSend.size()]),
cmd,
-                                          RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                          RpcChannel.ALL_REPLY,
+                                          Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                          TribesConstants.MEMBERSHIP_MSG_OPTION,
                                           10000);
                 if (log.isDebugEnabled()) {
                     log.debug("Sent MEMBER_JOINED[" + TribesUtil.getName(member) +

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
Wed Jul  2 08:13:52 2008
@@ -67,10 +67,10 @@
     }
 
     public static byte[] getRpcMembershipChannelId(byte[] domain) {
-       return (new String(domain) + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
+       return (new String(domain) + ":" + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
     }
 
     public static byte[] getRpcInitChannelId(byte[] domain) {
-        return (new String(domain) + TribesConstants.RPC_INIT_CHANNEL).getBytes();
+        return (new String(domain) + ":" + TribesConstants.RPC_INIT_CHANNEL).getBytes();
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Wed Jul  2 08:13:52 2008
@@ -358,7 +358,7 @@
                                                                    this));
             appDomainMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
             if (log.isDebugEnabled()) {
-                log.debug("Created RPC Channel for application domain " + domain);
+                log.debug("Created RPC Membership Channel for application domain " + domain);
             }
         }
 
@@ -367,6 +367,9 @@
                 new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
                                channel, new RpcMembershipRequestHandler(primaryMembershipManager,
                                                                         this));
+        if(log.isDebugEnabled()){
+            log.debug("Created primary membership channel " + new String(localDomain));
+        }
         primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
 
         // Send JOIN message to a WKA member



Mime
View raw message