ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject svn commit: r345658 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: Constants.java msgprocessors/ApplicationMsgProcessor.java storage/inmemory/InMemoryRetransmitterBeanMgr.java util/MessageRetransmissionAdjuster.java workers/Sender.java
Date Sat, 19 Nov 2005 17:37:17 GMT
Author: chamikara
Date: Sat Nov 19 09:36:36 2005
New Revision: 345658

URL: http://svn.apache.org/viewcvs?rev=345658&view=rev
Log:
Async Acks were made to be sent as standalone only after waiting for an given wsp:acknowledgementInterval.
Unwanted ack entries will be deleted (for e.g. ack 1-2 (yet to be send) will be deleted when
adding ack 1-3)

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Sat Nov 19 09:36:36
2005
@@ -118,8 +118,8 @@
 	}
 
 	public interface WSP {
-		long RETRANSMISSION_INTERVAL = 1000;
-		long ACKNOWLEDGEMENT_INTERVAL = 3000;
+		long RETRANSMISSION_INTERVAL = 20000;
+		long ACKNOWLEDGEMENT_INTERVAL = 4000;
 		boolean EXPONENTION_BACKOFF = true;
 		long INACTIVITY_TIMEOUT_INTERVAL = 5000000;
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Sat Nov 19 09:36:36 2005
@@ -18,6 +18,9 @@
 package org.apache.sandesha2.msgprocessors;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
 import javax.xml.namespace.QName;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
@@ -34,9 +37,11 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.axis2.soap.SOAPFactory;
+import org.apache.derby.tools.sysinfo;
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
@@ -55,6 +60,8 @@
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.wsdl.WSDLConstants;
 
+import com.sun.rsasign.p;
+
 public class ApplicationMsgProcessor implements MsgProcessor {
 
 	private boolean letInvoke = false;
@@ -302,6 +309,8 @@
 		RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
 		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
 
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+		
 		ackMsgCtx.setAxisServiceGroup(serviceGroup);
 		ackMsgCtx.setServiceGroupContext(serviceGroupContext);
 		ackMsgCtx.setAxisService(service);
@@ -389,7 +398,35 @@
 			ackBean.setReSend(false);
 			ackBean.setSend(true);
 			ackBean.setMessagetype(Constants.MessageTypes.ACK);
-
+			
+			//the tempSequenceId value of the retransmitter Table for the messages related to an incoming
+			//sequence is the actual sequence ID - TODO document this.
+			ackBean.setTempSequenceId(sequenceId);
+			
+			RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx.getProperty(Constants.WSP.RM_POLICY_BEAN);
+			long ackInterval = Constants.WSP.ACKNOWLEDGEMENT_INTERVAL;
+			if (policyBean!=null) {
+				ackInterval = policyBean.getAcknowledgementInaterval();
+			}
+			
+			//Ack will be sent as stand alone, only after the retransmitter interval.
+			long timeToSend = System.currentTimeMillis()+ackInterval;
+			ackBean.setTimeToSend(timeToSend);
+			
+			
+			//removing old acks.
+			RetransmitterBean findBean = new RetransmitterBean ();
+			findBean.setMessagetype(Constants.MessageTypes.ACK);
+			findBean.setTempSequenceId(sequenceId);
+			Collection coll = retransmitterBeanMgr.find(findBean);
+			Iterator it = coll.iterator();
+			while (it.hasNext()) {
+				RetransmitterBean retransmitterBean = (RetransmitterBean) it.next();
+				retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+			}
+			
+			
+			//inserting the new ack.
 			retransmitterBeanMgr.insert(ackBean);
 
 			SandeshaUtil.startSenderIfStopped(configCtx);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
Sat Nov 19 09:36:36 2005
@@ -115,13 +115,9 @@
 			temp = (RetransmitterBean) iterator.next();
 			if (temp.isSend()) {
 				
-				long timeToSend = temp.getTimeToSend();
-				
-				int count = temp.getSentCount();
-				
+				long timeToSend = temp.getTimeToSend();	
 				long timeNow = System.currentTimeMillis();
-				if (count == 0
-						|| (timeNow >= timeToSend)) {
+				if ((timeNow >= timeToSend)) {
 					beans.add(temp);
 				}
 			}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Sat Nov 19 09:36:36 2005
@@ -20,6 +20,7 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.derby.iapi.sql.dictionary.ConsInfo;
 import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.SandeshaDynamicProperties;
 import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.beans.RetransmitterBean;
 
@@ -43,11 +44,9 @@
 		
 		RMPolicyBean policyBean = (RMPolicyBean) messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN);
 		if (policyBean==null){
-			return retransmitterBean;
+			policyBean = new SandeshaDynamicProperties().getPolicyBean();
 		}
 		
-		long oldRetransmissionTime = retransmitterBean.getTimeToSend();
-		
 		retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1);
 		adjustNextRetransmissionTime (retransmitterBean,policyBean);
 		
@@ -65,12 +64,13 @@
 		
 		long baseInterval = policyBean.getRetransmissionInterval();
 		
-		long timeToSendNext;
+		long newInterval = baseInterval;
 		if (policyBean.isExponentialBackoff()) {
-			long newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
-			retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+			newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
 		}
 		
+		retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+		
 		return retransmitterBean;
 	}
 	
@@ -82,7 +82,7 @@
 	//TODO: Have to change this to be plugable
 	private long generateNextExponentialBackedoffDifference(int count,long initialInterval)
{
 		long interval = initialInterval;
-		for (int i=1;i<=count;i++){
+		for (int i=1;i<count;i++){
 			interval = interval*2;
 		}
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sat Nov 19 09:36:36
2005
@@ -87,9 +87,18 @@
 									+ "' message.");
 						}
 						
-						new AxisEngine(context).send(msgCtx);
+						try {
+							new AxisEngine(context).send(msgCtx);
+						}catch (Exception e) {
+							//Exception is sending. retry later
+							System.out.println("Exception thrown in sending...");
+							e.printStackTrace();
+						}
+						
 						MessageRetransmissionAdjuster  retransmitterAdjuster = new MessageRetransmissionAdjuster
();
 						retransmitterAdjuster.adjustRetransmittion(bean);
+						
+						mgr.update(bean);
 						
 						if (!msgCtx.isServerSide())
 							checkForSyncResponses(msgCtx);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message