Return-Path: Delivered-To: apmail-ws-sandesha-dev-archive@www.apache.org Received: (qmail 16131 invoked from network); 14 Nov 2005 09:37:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 14 Nov 2005 09:37:50 -0000 Received: (qmail 74228 invoked by uid 500); 14 Nov 2005 09:37:49 -0000 Delivered-To: apmail-ws-sandesha-dev-archive@ws.apache.org Received: (qmail 74168 invoked by uid 500); 14 Nov 2005 09:37:49 -0000 Mailing-List: contact sandesha-dev-help@ws.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list sandesha-dev@ws.apache.org Received: (qmail 74157 invoked by uid 500); 14 Nov 2005 09:37:49 -0000 Delivered-To: apmail-ws-sandesha-cvs@ws.apache.org Received: (qmail 74154 invoked by uid 99); 14 Nov 2005 09:37:48 -0000 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 14 Nov 2005 01:37:48 -0800 Received: (qmail 16032 invoked by uid 65534); 14 Nov 2005 09:37:27 -0000 Message-ID: <20051114093727.16031.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r344090 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ policy/ storage/beans/ storage/inmemory/ util/ workers/ Date: Mon, 14 Nov 2005 09:37:23 -0000 To: sandesha-cvs@ws.apache.org From: chamikara@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chamikara Date: Mon Nov 14 01:36:19 2005 New Revision: 344090 URL: http://svn.apache.org/viewcvs?rev=344090&view=rev Log: Added exponetian backoff concept - Retransmission interval get twice after every retransmission. Added a MAXUMIM_RETRANSMISSION_COUNT. Retransmission will happen only this number of times. A new class MessageRetransmissionAdjuster to set the retransmitterBean after retransmission. Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Nov 14 01:36:19 2005 @@ -116,10 +116,12 @@ } public interface WSP { - long RETRANSMISSION_INTERVAL = 500000; + long RETRANSMISSION_INTERVAL = 1000; long ACKNOWLEDGEMENT_INTERVAL = 3000; - boolean EXPONENTION_BACKOFF = false; + boolean EXPONENTION_BACKOFF = true; long INACTIVITY_TIMEOUT_INTERVAL = 5000000; + + String RM_POLICY_BEAN = "RMPolicyBean"; } public interface MessageTypes { @@ -282,5 +284,7 @@ String SANDESHA_DEBUG_MODE = "SandeshaDebugMode"; String STORAGE_MANAGER_IMPL = "org.apache.sandesha2.storage.inmemory.InMemoryStorageManager"; + + int MAXIMUM_RETRANSMISSION_ATTEMPTS = 5; } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon Nov 14 01:36:19 2005 @@ -51,14 +51,24 @@ protected Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName()); public void invoke(MessageContext msgContext) throws AxisFault { - + //try { + + //Quit the message with minimum processing if not intended for RM. + boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage (msgContext); + if (!isRMGlobalMessage) { + return; + } + RMMsgContext rmMessageContext = MsgInitializer .initializeMessage(msgContext); + ConfigurationContext context = rmMessageContext.getMessageContext() .getSystemContext(); + //context.setProperty (Constants.SANDESHA_DEBUG_MODE,"on"); + Object debug = context.getProperty(Constants.SANDESHA_DEBUG_MODE); if (debug != null && "on".equals(debug)) { System.out.println("DEBUG: SandeshaGlobalInHandler got a '" @@ -157,8 +167,8 @@ receivedMsgsBean.setValue(receivedMsgStr); seqPropMgr.update(receivedMsgsBean); - //ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor (); - //ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr); + ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor (); + ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr); } } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Nov 14 01:36:19 2005 @@ -41,6 +41,7 @@ import org.apache.sandesha2.SandeshaDynamicProperties; import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.RMMsgContext; +import org.apache.sandesha2.policy.RMPolicyBean; import org.apache.sandesha2.storage.StorageManager; import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr; import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr; @@ -50,6 +51,7 @@ import org.apache.sandesha2.storage.beans.SequencePropertyBean; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.RMMsgCreator; +import org.apache.sandesha2.util.RMPolicyManager; import org.apache.sandesha2.util.SOAPAbstractFactory; import org.apache.sandesha2.util.SandeshaUtil; import org.apache.sandesha2.util.SequenceManager; @@ -62,6 +64,7 @@ import org.apache.sandesha2.wsrm.SequenceOffer; import org.apache.wsdl.WSDLConstants; + /** * * @author chamikara @@ -134,6 +137,12 @@ //Strating the sender. SandeshaUtil.startSenderIfStopped(context); + + //Adding the policy bean + RMPolicyBean policyBean = RMPolicyManager.getPolicyBean(rmMsgCtx); + rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean); + + StorageManager storageManager = SandeshaUtil .getSandeshaStorageManager(context); @@ -471,7 +480,7 @@ .getMessageContext()); RetransmitterBean createSeqEntry = new RetransmitterBean(); createSeqEntry.setKey(key); - createSeqEntry.setLastSentTime(0); + createSeqEntry.setTimeToSend(System.currentTimeMillis()); createSeqEntry.setMessageId(createSeqRMMessage.getMessageId()); createSeqEntry.setSend(true); retransmitterMgr.insert(createSeqEntry); @@ -661,7 +670,7 @@ String key = SandeshaUtil .storeMessageContext(rmMsg.getMessageContext()); appMsgEntry.setKey(key); - appMsgEntry.setLastSentTime(0); + appMsgEntry.setTimeToSend(System.currentTimeMillis()); appMsgEntry.setMessageId(rmMsg.getMessageId()); appMsgEntry.setMessageNumber(messageNumber); if (outSequenceBean == null || outSequenceBean.getValue() == null) { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Nov 14 01:36:19 2005 @@ -41,6 +41,10 @@ public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException { +// boolean b = true; +// if (b) +// return; + SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx .getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); if (sequenceAck == null) @@ -223,7 +227,7 @@ // some delay. //Otherwise this get send before return of the current request (ack). //TODO verify that the time given is correct - terminateBean.setLastSentTime(System.currentTimeMillis() + terminateBean.setTimeToSend(System.currentTimeMillis() + Constants.TERMINATE_DELAY); terminateBean.setMessageId(terminateRMMessage.getMessageId()); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java Mon Nov 14 01:36:19 2005 @@ -14,21 +14,26 @@ * limitations under the License. */ -package org.apache.sandesha2.policy; /** * @author Sanka Samaranayake (sanka@apache.org) */ +package org.apache.sandesha2.policy; + +import org.apache.sandesha2.Constants; public class RMPolicyBean { - private long inactiveTimeoutInterval = -1l; - private long acknowledgementInterval = -1l; - private long retransmissionInterval = -1l; - private boolean exponentialBackoff = false; + private long inactiveTimeoutInterval = Constants.WSP.INACTIVITY_TIMEOUT_INTERVAL; + private long acknowledgementInterval = Constants.WSP.ACKNOWLEDGEMENT_INTERVAL; + private long retransmissionInterval = Constants.WSP.RETRANSMISSION_INTERVAL; + private boolean exponentialBackoff = Constants.WSP.EXPONENTION_BACKOFF; + public RMPolicyBean () { + loadValuesFromPropertyFile (); + } - public RMPolicyBean() { - + private void loadValuesFromPropertyFile () { + //TODO load policy values from the file. } public long getInactiveTimeoutInterval() { @@ -43,7 +48,7 @@ return retransmissionInterval; } - public boolean getExponentialBackoff() { + public boolean isExponentialBackoff() { return exponentialBackoff; } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java Mon Nov 14 01:36:19 2005 @@ -23,7 +23,7 @@ private String key; - private long LastSentTime; +// private long LastSentTime; private boolean Send; @@ -35,15 +35,18 @@ private boolean reSend = true; + private long timeToSend = 0; + public RetransmitterBean() { } - public RetransmitterBean(String messageId, String key, long lastSentTime, - boolean send, String tempSequenceId, long messageNumber) { + public RetransmitterBean(String messageId, String key, + boolean send,long timeToSend, String tempSequenceId, long messageNumber) { this.messageId = messageId; this.key = key; - this.LastSentTime = lastSentTime; + //this.LastSentTime = lastSentTime; + this.timeToSend = timeToSend; this.Send = send; this.tempSequenceId = tempSequenceId; this.messageNumber = messageNumber; @@ -57,13 +60,13 @@ this.key = key; } - public long getLastSentTime() { - return LastSentTime; - } - - public void setLastSentTime(long lastSentTime) { - LastSentTime = lastSentTime; - } +// public long getLastSentTime() { +// return LastSentTime; +// } +// +// public void setLastSentTime(long lastSentTime) { +// LastSentTime = lastSentTime; +// } public String getMessageId() { return messageId; @@ -112,5 +115,12 @@ public void setReSend(boolean reSend) { this.reSend = reSend; } - + + public long getTimeToSend() { + return timeToSend; + } + + public void setTimeToSend(long timeToSend) { + this.timeToSend = timeToSend; + } } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java Mon Nov 14 01:36:19 2005 @@ -71,6 +71,7 @@ RetransmitterBean temp; while (iterator.hasNext()) { + temp = (RetransmitterBean) iterator.next(); boolean add = true; @@ -78,8 +79,8 @@ if (bean.getKey() != null && !bean.getKey().equals(temp.getKey())) add = false; - if (bean.getLastSentTime() > 0 - && bean.getLastSentTime() != temp.getLastSentTime()) + if (bean.getTimeToSend() > 0 + && bean.getTimeToSend() != temp.getTimeToSend()) add = false; if (bean.getMessageId() != null @@ -110,11 +111,14 @@ while (iterator.hasNext()) { temp = (RetransmitterBean) iterator.next(); if (temp.isSend()) { - long lastSentTime = temp.getLastSentTime(); + + long timeToSend = temp.getTimeToSend(); + int count = temp.getSentCount(); + long timeNow = System.currentTimeMillis(); if (count == 0 - || (timeNow > (lastSentTime + Constants.WSP.RETRANSMISSION_INTERVAL))) { + || (timeNow >= timeToSend)) { beans.add(temp); } } Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=344090&view=auto ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (added) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Mon Nov 14 01:36:19 2005 @@ -0,0 +1,93 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.sandesha2.util; + +import org.apache.axis2.context.MessageContext; +import org.apache.derby.iapi.sql.dictionary.ConsInfo; +import org.apache.sandesha2.Constants; +import org.apache.sandesha2.policy.RMPolicyBean; +import org.apache.sandesha2.storage.beans.RetransmitterBean; + + +/** + * @author chamikara + */ + +public class MessageRetransmissionAdjuster { + + public RetransmitterBean adjustRetransmittion (RetransmitterBean retransmitterBean) { + String storedKey = (String) retransmitterBean.getKey(); + + if (storedKey==null) + return retransmitterBean; + + MessageContext messageContext = SandeshaUtil.getStoredMessageContext(storedKey); + + if (messageContext.getSystemContext()==null) + return retransmitterBean; + + RMPolicyBean policyBean = (RMPolicyBean) messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN); + if (policyBean==null){ + return retransmitterBean; + } + + long oldRetransmissionTime = retransmitterBean.getTimeToSend(); + + retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1); + adjustNextRetransmissionTime (retransmitterBean,policyBean); + + if (retransmitterBean.getSentCount()>=Constants.MAXIMUM_RETRANSMISSION_ATTEMPTS) + stopRetransmission (retransmitterBean); + + return retransmitterBean; + } + + private RetransmitterBean adjustNextRetransmissionTime (RetransmitterBean retransmitterBean,RMPolicyBean policyBean) { + + long lastSentTime = retransmitterBean.getTimeToSend(); + + int count = retransmitterBean.getSentCount(); + + long baseInterval = policyBean.getRetransmissionInterval(); + + long timeToSendNext; + if (policyBean.isExponentialBackoff()) { + long newInterval = generateNextExponentialBackedoffDifference (count,baseInterval); + retransmitterBean.setTimeToSend(lastSentTime+newInterval); + } + + return retransmitterBean; + } + + private void stopRetransmission (RetransmitterBean bean) { + bean.setReSend(false); + } + + + //TODO: Have to change this to be plugable + private long generateNextExponentialBackedoffDifference(int count,long initialInterval) { + long interval = initialInterval; + for (int i=1;i<=count;i++){ + interval = interval*2; + } + + return interval; + } + + +} Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Mon Nov 14 01:36:19 2005 @@ -38,6 +38,7 @@ import org.apache.sandesha2.Constants; import org.apache.sandesha2.RMMsgContext; import org.apache.sandesha2.SandeshaException; +import org.apache.sandesha2.policy.RMPolicyBean; import org.apache.sandesha2.storage.StorageManager; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.SequencePropertyBean; @@ -56,6 +57,14 @@ public class RMMsgCreator { + + private static void setUpMessage (MessageContext rmMsgCtx) { + //Seting RMPolicyBean + RMPolicyBean policyBean = new RMPolicyBean (); + rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean); + + } + public static RMMsgContext createCreateSeqMsg( RMMsgContext applicationRMMsg, String tempSequenceId, String acksTo) throws SandeshaException { @@ -94,6 +103,8 @@ createSeqmsgContext.setServiceContextID(applicationMsgContext .getServiceContextID()); + setUpMessage(createSeqmsgContext); + String createSeqMsgId = SandeshaUtil.getUUID(); try { AxisOperation appMsgOperationDesc = applicationMsgContext @@ -234,6 +245,8 @@ if (terminateMessage == null) throw new SandeshaException("MessageContext is null"); + setUpMessage(terminateMessage); + SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion ( referenceMessage.getEnvelope())); MessageInformationHeaders newMessageInfoHeaders = new MessageInformationHeaders(); @@ -338,11 +351,14 @@ outMessage.setWSAAction(Constants.WSRM.Actions.ACTION_CREATE_SEQUENCE_RESPONSE); outMessage.setSoapAction(Constants.WSRM.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE); + String newMessageId = SandeshaUtil.getUUID(); outMessage.setMessageID(newMessageId); outMessage.setEnvelope(envelope); + setUpMessage(outMessage); + RMMsgContext createSeqResponse = null; try { createSeqResponse = MsgInitializer.initializeMessage(outMessage); @@ -419,6 +435,9 @@ .getAxisOperation()); ackMsgCtx.setOperationContext(ackOpCtx); + + setUpMessage(ackMsgCtx); + ackOpCtx.addMessageContext(ackMsgCtx); Sequence reqSequence = (Sequence) applicationRMMsgCtx Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java?rev=344090&view=auto ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java (added) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java Mon Nov 14 01:36:19 2005 @@ -0,0 +1,30 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.sandesha2.util; + +import org.apache.sandesha2.RMMsgContext; +import org.apache.sandesha2.msgreceivers.RMMessageReceiver; +import org.apache.sandesha2.policy.RMPolicyBean; + +public class RMPolicyManager { + + public static RMPolicyBean getPolicyBean (RMMsgContext msgContext) { + RMPolicyBean policyBean = new RMPolicyBean (); + return policyBean; + } +} Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Nov 14 01:36:19 2005 @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; + +import javax.xml.namespace.QName; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamReader; import org.apache.axis2.AxisFault; @@ -31,6 +33,7 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.description.TransportOutDescription; import org.apache.axis2.i18n.Messages; +import org.apache.axis2.om.OMElement; import org.apache.axis2.om.impl.llom.builder.StAXBuilder; import org.apache.axis2.om.impl.llom.builder.StAXOMBuilder; import org.apache.axis2.soap.SOAP11Constants; @@ -496,5 +499,24 @@ return Constants.SOAPVersion.v1_2; else throw new SandeshaException ("Unknown SOAP version"); + } + + public static boolean isRMGlobalMessage (MessageContext msgCtx) { + boolean rmGlobalMsg = false; + + String action = msgCtx.getWSAAction(); + SOAPEnvelope env = msgCtx.getEnvelope(); + OMElement sequenceElem = env.getFirstChildWithName(new QName (Constants.WSRM.NS_URI_RM,Constants.WSRM.SEQUENCE)); + + if (sequenceElem!=null) + rmGlobalMsg = true; + + if (Constants.WSRM.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT.equals(action)) + rmGlobalMsg = true; + + if (Constants.WSRM.Actions.ACTION_TERMINATE_SEQUENCE.equals(action)) + rmGlobalMsg = true; + + return rmGlobalMsg; } } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=344090&r1=344089&r2=344090&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Nov 14 01:36:19 2005 @@ -29,6 +29,7 @@ import org.apache.sandesha2.storage.StorageManager; import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr; import org.apache.sandesha2.storage.beans.RetransmitterBean; +import org.apache.sandesha2.util.MessageRetransmissionAdjuster; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.SandeshaUtil; @@ -61,7 +62,9 @@ .getRetransmitterBeanMgr(); Collection coll = mgr.findMsgsToSend(); Iterator iter = coll.iterator(); + while (iter.hasNext()) { + RetransmitterBean bean = (RetransmitterBean) iter.next(); String key = (String) bean.getKey(); MessageContext msgCtx = SandeshaUtil @@ -85,7 +88,9 @@ } new AxisEngine(context).send(msgCtx); - + MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster (); + retransmitterAdjuster.adjustRetransmittion(bean); + if (!msgCtx.isServerSide()) checkForSyncResponses(msgCtx); @@ -96,8 +101,8 @@ } //changing the values of the sent bean. - bean.setLastSentTime(System.currentTimeMillis()); - bean.setSentCount(bean.getSentCount() + 1); + //bean.setLastSentTime(System.currentTimeMillis()); + //bean.setSentCount(bean.getSentCount() + 1); //update if resend=true otherwise delete. (reSend=false // means --------------------------------------------------------------------- To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org For additional commands, e-mail: sandesha-dev-help@ws.apache.org