Return-Path: Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: (qmail 89388 invoked from network); 23 May 2006 03:41:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 23 May 2006 03:41:33 -0000 Received: (qmail 77919 invoked by uid 500); 23 May 2006 03:41:28 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 77856 invoked by uid 500); 23 May 2006 03:41:27 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 77844 invoked by uid 500); 23 May 2006 03:41:27 -0000 Delivered-To: apmail-jakarta-tomcat-dev@jakarta.apache.org Received: (qmail 77841 invoked by uid 99); 23 May 2006 03:41:27 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2006 20:41:27 -0700 X-ASF-Spam-Status: No, hits=0.6 required=10.0 tests=NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2006 20:41:25 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 9668E1A983A; Mon, 22 May 2006 20:41:05 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r408823 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: group/AbsoluteOrder.java group/interceptors/NonBlockingCoordinator.java membership/Membership.java util/Arrays.java Date: Tue, 23 May 2006 03:41:04 -0000 To: tomcat-dev@jakarta.apache.org From: fhanik@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060523034105.9668E1A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: fhanik Date: Mon May 22 20:41:04 2006 New Revision: 408823 URL: http://svn.apache.org/viewvc?rev=408823&view=rev Log: Defined the algorithm for leadership election, need to create a state diagram so that the implementation doesn't become a clutter Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java?rev=408823&r1=408822&r2=408823&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/AbsoluteOrder.java Mon May 22 20:41:04 2006 @@ -27,7 +27,7 @@ * @see org.apache.catalina.tribes.Member */ public class AbsoluteOrder { - protected static AbsoluteComparator comp = new AbsoluteComparator(); + public static final AbsoluteComparator comp = new AbsoluteComparator(); protected AbsoluteOrder() { super(); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=408823&r1=408822&r2=408823&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Mon May 22 20:41:04 2006 @@ -26,6 +26,8 @@ import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.catalina.tribes.group.AbsoluteOrder; import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.io.ChannelData; +import org.apache.catalina.tribes.Channel; /** *

Title: NonBlockingCoordinator

@@ -33,39 +35,215 @@ *

Description: Implementation of a simple coordinator algorithm.

*

This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on *

- *

Implementation based on ideas fetched from Hans Svensson

+ *

This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership + * to pass a token ring of the current membership.
+ * This is not the same as just using AbsoluteOrder! Consider the following scenario:
+ * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all + * nodes are receiving pings from all the other nodes. + * meaning, that node{i} receives pings from node{all}-node{i}
+ * but the following could happen if a multicast problem occurs. + * A has members {B,C,D}
+ * B has members {A,C}
+ * C has members {D,E}
+ * D has members {A,B,C,E}
+ * E has members {A,C,D}
+ * Because the default Tribes membership implementation, relies on the multicast packets to + * arrive at all nodes correctly, there is nothing guaranteeing that it will.
+ *
+ * To best explain how this algorithm works, lets take the above example: + * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work + * where messages overlap, as they all depend on absolute order
+ * Scenario 1: A,B,C,D,E all come online at the same time + * Eval phase, A thinks of itself as leader, B thinks of A as leader, + * C thinks of itself as leader, D,E think of A as leader
+ * Token phase:
+ * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
+ * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
+ * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
+ * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
+ * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
+ * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
+ * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
+ * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
+ * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
+ * At this point, the state looks like
+ * A - {A-ldr, mbrs-A,B,C,D,E, id=X}
+ * B - {A-ldr, mbrs-A,B,C,D, id=X}
+ * C - {A-ldr, mbrs-A,B,C,D,E, id=X}
+ * D - {A-ldr, mbrs-A,B,C,D,E, id=X}
+ * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}
+ *
+ * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. + * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have + * arrived at the same membership and all nodes are informed of each other.
+ * To synchronize the rest we simply perform the following check at A when A receives X:
+ * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
+ * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B + * When A receives X again, the token is complete. + *

+ *

+ * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
+ * Lets also assume that C1 sees the following view {B,D,E}
+ * C1 sends Z{B-ldr, C-src, mbrs-B,C1,D,E} to D
+ * D receives Z{B-ldr, C-src, mbrs-B,C1,D,E} sends Z{A-ldr, D-src, mbrs-A,B,C,C1,D,E} to E
+ * Once the message reaches A, A will issue a new view and send a new message
+ * A view is not accepted by a member unless ldr==src in the token.
+ *

+ *

+ * Lets assume that A0 arrives A0 being higher than A.
+ * Lets also assume that A0 sees view {B,D,E}
+ * A0 will issue a similar view statement and the same scenario as above will happen.
+ * If A0 sees {A,B,C,D} it simply sends the message to A rather than B. + *

+ *

If we wanted to ensure that the view gets implemented at all nodes at the same time, + * ie, implementing a blocking coordinator, we would simply require that each view, before it gets installed + * has to receive a VIEW_CONF message. + * *

Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

* + *

The example above, of course can be simplified with a finite statemachine:
+ * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
+ * Maybe I'll do a state diagram :) + *

+ * * @author Filip Hanik * @version 1.0 - * @todo - * when sending a HALT message, btw, only the highest in the membership group will do that - * allow for some time to pass, incase there is a higher member around - * preferrably, place a mcast interceptor below, so that we can mcast this sucker + * + * + * */ public class NonBlockingCoordinator extends ChannelInterceptorBase { - protected Membership membership = null; - - protected static byte[] NBC_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63}; - protected static byte[] NBC_REQUEST = new byte[] {-55, -37, 18, -52, -105, 107, 72, 40, -122, 29, 70, -19, -74, 123, 61, 110}; - protected static byte[] NBC_REPLY = new byte[] {6, -15, 14, 23, -96, 106, 78, 124, -94, -122, -85, 31, 88, 21, 126, 20}; - - protected static byte[] NBC_HALT = new byte[] {12, -28, 85, -97, -102, -35, 74, 9, -65, -78, -83, -84, -29, -70, -23, -15}; - protected static byte[] NBC_ACK = new byte[] {12, -49, 117, -70, 77, 52, 65, -91, -93, -110, 37, 34, -28, -127, 26, 18}; - protected static byte[] NBC_NORM = new byte[] {34, -110, 83, 118, -109, -55, 67, -27, -97, -94, -84, -72, -82, -114, 65, 81}; - protected static byte[] NBC_NOTNORM = new byte[] {125, -70, -102, -125, -78, -39, 73, -80, -89, 84, 120, 83, 25, 42, 88, -76}; - protected static byte[] NBC_LDR = new byte[] {97, 31, -23, 30, -42, -72, 72, 116, -97, 7, 112, 25, 82, -96, -87, -48}; - protected static byte[] NBC_HASLDR = new byte[] {93, -80, -88, -58, -127, 21, 76, -90, -89, 77, 58, 25, -55, 65, -1, -83}; - protected static byte[] NBC_ISLDR = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30}; + /** + * header for a coordination message + */ + protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63}; + /** + * Coordination request + */ + protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30}; + /** + * Coordination confirmation, for blocking installations + */ + protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20}; protected Member coordinator = null; + + protected Membership view = null; + protected Membership suggestedview = null; + protected UniqueId viewId; + protected UniqueId suggestedviewId; + + protected boolean started = false; + protected final int startsvc = 0xFFFF; + + protected Object electionMutex = new Object(); + protected boolean runningElection = false; public NonBlockingCoordinator() { super(); } + public void start(int svc) throws ChannelException { + try { + halt(); + if ( started ) return; + super.start(startsvc); + started = true; + + }finally { + release(); + } + //coordination can happen before this line of code executes + Member local = getLocalMember(false); + if (local != null && coordinator == null) coordinator = local; + } + + public void stop(int svc) throws ChannelException { + try { + halt(); + if ( !started ) return; + super.stop(startsvc); + started = false; + }finally { + release(); + } + this.coordinator = null; + } + + public void elect() { + synchronized (electionMutex) { + try { + Member[] mbrs = super.getMembers(); + //no members, exit + if ( mbrs.length == 0 ) return; + AbsoluteOrder.absoluteOrder(mbrs); + MemberImpl local = (MemberImpl)getLocalMember(false); + //I'm not the higest, exit + if ( !local.equals(mbrs[0]) ) return; + //I'm already running an election + if ( suggestedview.hasMembers() ) return; + //create a suggestedview + suggestedview.addMember((MemberImpl)local); + Arrays.fill(suggestedview,mbrs); + suggestedviewId = new UniqueId(UUIDGenerator.randomUUID(true)); + CoordinationMessage msg = new CoordinationMessage(local,local,suggestedview.getMembers(),suggestedviewId); + for (int i=0; i - * HEADER, REQUEST|REPLY, ID, MSG, SOURCE_LEN, SOURCE, PAYLOAD_LEN, PAYLOAD - * @param type byte[] - either NBC_REQUEST or NBC_REPLY - * @param msg byte[] - NBC_HALT, NBC_ACK, NBC_NORM, NBC_NOTNORM, NBC_LDR - */ - protected UniqueId createNBCMessage(XByteBuffer buf, byte[] type, byte[] msg, byte[] payload) { - UniqueId id = new UniqueId(UUIDGenerator.randomUUID(true)); - Member local = getLocalMember(false); - byte[] ldata = ((MemberImpl)local).getData(false,false); - buf.reset(); - buf.append(NBC_HEADER,0,NBC_HEADER.length); - buf.append(type,0,type.length); - buf.append(id.getBytes(),0,id.getBytes().length); - buf.append(msg,0,msg.length); - buf.append(ldata.length); - buf.append(ldata,0,ldata.length); - buf.append(payload.length); - buf.append(payload,0,payload.length); - return id; - } - - protected void receiveNBC(XByteBuffer buf) { + public static class CoordinationMessage { + //X{A-ldr, A-src, mbrs-A,B,C,D} + protected XByteBuffer buf; + protected MemberImpl leader; + protected MemberImpl source; + protected MemberImpl[] view; + protected UniqueId id; + + public CoordinationMessage(XByteBuffer buf) { + this.buf = buf; + } + + public CoordinationMessage(MemberImpl leader, + MemberImpl source, + MemberImpl[] view, + UniqueId id) { + this.buf = new XByteBuffer(4096,false); + this.leader = leader; + this.source = source; + this.view = view; + this.id = id; + this.write(); + } + + + public byte[] getHeader() { + return NonBlockingCoordinator.COORD_HEADER; + } + + public MemberImpl getLeader() { + if ( leader == null ) parse(); + return leader; + } + + public MemberImpl getSource() { + if ( source == null ) parse(); + return source; + } + + public UniqueId getId() { + if ( id == null ) parse(); + return id; + } + + public MemberImpl[] getMembers() { + if ( view == null ) parse(); + return view; + } + + public XByteBuffer getBuffer() { + return this.buf; + } + + public void parse() { + //header + int offset = 16; + //leader + int ldrLen = buf.toInt(buf.getBytesDirect(),offset); + offset += 4; + byte[] ldr = new byte[ldrLen]; + System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen); + leader = MemberImpl.getMember(ldr); + offset += ldrLen; + //source + int srcLen = buf.toInt(buf.getBytesDirect(),offset); + offset += 4; + byte[] src = new byte[srcLen]; + System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen); + source = MemberImpl.getMember(src); + offset += srcLen; + //view + int mbrCount = buf.toInt(buf.getBytesDirect(),offset); + offset += 4; + view = new MemberImpl[mbrCount]; + for (int i=0; i