Author: fhanik
Date: Tue Feb 28 10:47:46 2006
New Revision: 381740
URL: http://svn.apache.org/viewcvs?rev=381740&view=rev
Log:
Completed the order protocol
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381740&r1=381739&r2=381740&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
(original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
Tue Feb 28 10:47:46 2006
@@ -52,11 +52,25 @@
int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
msg.getMessage().trim(4);
MessageOrder order = new MessageOrder(msgnr,msg);
- processIncoming(order);
+ if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
//getPrevious().messageReceived(msg);
}
- public synchronized void processIncoming(MessageOrder order) {
-int val = order.getMsgNr();
+
+ public synchronized void processLeftOvers(Member member, boolean force) {
+ MessageOrder tmp = (MessageOrder)incoming.get(member);
+ if ( force ) {
+ Counter cnt = getInCounter(member);
+ cnt.setCounter(Integer.MAX_VALUE);
+ }
+ if ( tmp!= null ) processIncoming(tmp);
+ }
+ /**
+ *
+ * @param order MessageOrder
+ * @return boolean - true if a message expired and was processed
+ */
+ public synchronized boolean processIncoming(MessageOrder order) {
+ boolean result = false;
Member member = order.getMessage().getAddress();
Counter cnt = getInCounter(member);
@@ -86,10 +100,12 @@
if ( tmp.isExpired(expire) ) {
//reset the head
if ( tmp == head ) head = tmp.next;
+ cnt.setCounter(tmp.getMsgNr()+1);
if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
tmp.setMessage(null);
tmp = tmp.next;
- if ( prev != null ) prev.next = tmp;
+ if ( prev != null ) prev.next = tmp;
+ result = true;
} else {
prev = tmp;
tmp = tmp.next;
@@ -97,6 +113,7 @@
}
if ( head == null ) incoming.remove(member);
else incoming.put(member, head);
+ return result;
}
public void memberAdded(Member member) {
@@ -110,6 +127,8 @@
//notify upwards
outcounter.remove(member);
incounter.remove(member);
+ //clear the remaining queue
+ processLeftOvers(member,true);
super.memberDisappeared(member);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org
|