Author: azeez
Date: Wed Jul 2 08:13:52 2008
New Revision: 673414
URL: http://svn.apache.org/viewvc?rev=673414&view=rev
Log:
Handling JOIN of a WKA memeber who also happens to be a load balancer
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
Wed Jul 2 08:13:52 2008
@@ -47,7 +47,6 @@
}
public void execute(ConfigurationContext configurationContext) throws ClusteringFault
{
- log.info("Received MEMBER_LIST message");
if(log.isDebugEnabled()){
log.debug("MembershipManager#domain: " + new String(membershipManager.getDomain()));
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
Wed Jul 2 08:13:52 2008
@@ -75,11 +75,6 @@
log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(sender));
MemberJoinedCommand command = (MemberJoinedCommand) msg;
- // do something specific for the membership scheme
- if (sender.equals(command.getMember())) { // only if the sender is the member
who joine, we need to do some special processing
- membershipScheme.processJoin(sender); //TODO: This may not be necessary
- }
-
try {
command.setMembershipManager(membershipManager);
command.execute(null);
@@ -90,13 +85,16 @@
}
} else if (msg instanceof MemberListCommand) {
try { //TODO: What if we receive more than one member list
message?
+ log.info("Received MEMBER_LIST message from " + TribesUtil.getName(sender));
MemberListCommand command = (MemberListCommand) msg;
command.setMembershipManager(membershipManager);
command.execute(null);
+ return "Processed MEMBER_LIST message";
//TODO Send MEMBER_JOINED messages to all nodes
} catch (ClusteringFault e) {
- String errMsg = "Cannot handle MEMBER_LIST message";
+ String errMsg = "Cannot handle MEMBER_LIST message from " +
+ TribesUtil.getName(sender);
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
Wed Jul 2 08:13:52 2008
@@ -27,6 +27,7 @@
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.membership.MemberImpl;
@@ -35,6 +36,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -166,8 +168,27 @@
List<Member> members = new ArrayList<Member>(this.members);
members.add(localMember); // Need to set the local member too
memListCmd.setMembers(members.toArray(new Member[members.size()]));
- rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+
+ Response[] responses =
+ rpcMembershipChannel.send(new Member[]{member}, memListCmd,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION,
10000);
+
+ // Once a response is received from the WKA member to the MEMBER_LIST
message,
+ // if it does not belong to this domain, simply remove it from the members
+ if(responses != null && responses.length > 0 && responses[0]
!= null){
+ Member source = responses[0].getSource();
+ if(!Arrays.equals(source.getDomain(), member.getDomain())){
+ if(log.isDebugEnabled()){
+ log.debug("Member " + TribesUtil.getName(source) +
+ " does not belong to local domain " + new String(domain)+
+ ". Hence removing it from the list.");
+ }
+ members.remove(member);
+ return false;
+ }
+ }
} catch (Exception e) {
String errMsg = "Could not send MEMBER_LIST to well-known member " +
TribesUtil.getName(member);
@@ -196,7 +217,8 @@
List<Member> members = new ArrayList<Member>(this.members);
memListCmd.setMembers(members.toArray(new Member[members.size()]));
rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
if (log.isDebugEnabled()) {
log.debug("Sent MEMBER_LIST to " + TribesUtil.getName(member));
}
@@ -222,7 +244,9 @@
if (membersToSend.size() > 0) {
rpcMembershipChannel.send(membersToSend.toArray(new Member[membersToSend.size()]),
cmd,
- RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION,
10000);
if (log.isDebugEnabled()) {
log.debug("Sent MEMBER_JOINED[" + TribesUtil.getName(member) +
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
Wed Jul 2 08:13:52 2008
@@ -67,10 +67,10 @@
}
public static byte[] getRpcMembershipChannelId(byte[] domain) {
- return (new String(domain) + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
+ return (new String(domain) + ":" + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
}
public static byte[] getRpcInitChannelId(byte[] domain) {
- return (new String(domain) + TribesConstants.RPC_INIT_CHANNEL).getBytes();
+ return (new String(domain) + ":" + TribesConstants.RPC_INIT_CHANNEL).getBytes();
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=673414&r1=673413&r2=673414&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
(original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Wed Jul 2 08:13:52 2008
@@ -358,7 +358,7 @@
this));
appDomainMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
if (log.isDebugEnabled()) {
- log.debug("Created RPC Channel for application domain " + domain);
+ log.debug("Created RPC Membership Channel for application domain " + domain);
}
}
@@ -367,6 +367,9 @@
new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
channel, new RpcMembershipRequestHandler(primaryMembershipManager,
this));
+ if(log.isDebugEnabled()){
+ log.debug("Created primary membership channel " + new String(localDomain));
+ }
primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
// Send JOIN message to a WKA member
|