ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mckie...@apache.org
Subject svn commit: r599016 - in /webservices/sandesha/trunk/java/modules/core/src/main: java/org/apache/sandesha2/client/ java/org/apache/sandesha2/i18n/ java/org/apache/sandesha2/msgprocessors/ java/org/apache/sandesha2/util/ java/org/apache/sandesha2/worker...
Date Wed, 28 Nov 2007 14:48:30 GMT
Author: mckierna
Date: Wed Nov 28 06:48:29 2007
New Revision: 599016

URL: http://svn.apache.org/viewvc?rev=599016&view=rev
Log:
Allow sequences to be auto-restarted after being torn down

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClientConstants.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClientConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClientConstants.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClientConstants.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClientConstants.java
Wed Nov 28 06:48:29 2007
@@ -37,4 +37,5 @@
 	public static final String USE_REPLY_TO_AS_ACKS_TO = "UseReplyToAsAcksTo";
 	public static final String OFFERED_ENDPOINT = "OfferedEndpoint";
 	public static final String AVOID_AUTO_TERMINATION = "AviodAutoTermination";
+	public static String AUTO_START_NEW_SEQUENCE = "AutoStartNewSequence";
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Wed Nov 28 06:48:29 2007
@@ -76,6 +76,7 @@
 	public static final String propertyInvalidValue="propertyInvalidValue";
 	public static final String invalidRange="invalidRange";
 	public static final String workAlreadyAssigned="workAlreadyAssigned";
+	public static final String reallocationFailed="reallocationFailed"; 
 
 
 	public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Wed Nov 28 06:48:29 2007
@@ -87,6 +87,46 @@
 		return false;
 	}
 	
+	private String getSequenceID(RMMsgContext rmMsgCtx, boolean serverSide, boolean forceNewSequence)throws
SandeshaException{
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
+		ConfigurationContext configContext = msgContext.getConfigurationContext();
+		
+		String internalSequenceId = null;
+		if (serverSide) {
+			if (inboundSequence == null || "".equals(inboundSequence)) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID,
inboundSequence);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(inboundSequence);
+		} else {
+			// set the internal sequence id for the client side.
+			EndpointReference toEPR = msgContext.getTo();
+			if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid,
null);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			String to = toEPR.getAddress();
+			String sequenceKey = null;
+			if(forceNewSequence){
+				sequenceKey = SandeshaUtil.getUUID();
+				msgContext.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey);
+			}
+			else{
+				sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+				if (sequenceKey == null)
+					sequenceKey = (String)configContext.getAxisConfiguration().getParameterValue(SandeshaClientConstants.SEQUENCE_KEY);
+			}
+
+			
+			internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+		}
+		return internalSequenceId;
+	}
+	
 	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction tran) throws AxisFault
{
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
@@ -119,9 +159,6 @@
 		if (msgContext.getMessageID() == null)
 			msgContext.setMessageID(SandeshaUtil.getUUID());
 
-		// find internal sequence id
-		String internalSequenceId = null;
-
 		String storageKey = SandeshaUtil.getUUID(); // the key which will be
 													// used to store this
 													// message.
@@ -132,32 +169,10 @@
 		 * side - a derivation of the sequenceId of the incoming sequence client
 		 * side - a derivation of wsaTo & SeequenceKey
 		 */
-
+		String internalSequenceId = getSequenceID(rmMsgCtx, serverSide, false); //get a sequenceID,
possibly pre-existing
+		
 		boolean lastMessage = false;
-		if (serverSide) {
-			if (inboundSequence == null || "".equals(inboundSequence)) {
-				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID,
inboundSequence);
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(inboundSequence);
-		} else {
-			// set the internal sequence id for the client side.
-			EndpointReference toEPR = msgContext.getTo();
-			if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
-				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid,
null);
-				log.debug(message);
-				throw new SandeshaException(message);
-			}
-
-			String to = toEPR.getAddress();
-			String sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-			if (sequenceKey == null)
-				sequenceKey = (String)configContext.getAxisConfiguration().getParameterValue(SandeshaClientConstants.SEQUENCE_KEY);
-			
-			internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
-
+		if(!serverSide){
 			String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
 			if (lastAppMessage != null && "true".equals(lastAppMessage))
 				lastMessage = true;
@@ -195,18 +210,21 @@
 		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
 
 		//see if the sequence is closed
-		if(rmsBean != null && rmsBean.isSequenceClosedClient()){
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed,
internalSequenceId));
-		}
-
-		//see if the sequence is terminated
-		if(rmsBean != null && rmsBean.isTerminateAdded()) {
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated,
internalSequenceId));
-		}
-
-		//see if the sequence is timed out
-		if(rmsBean != null && rmsBean.isTimedOut()){
-			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout,
internalSequenceId));
+		if(rmsBean != null && 
+			(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut())){
+			if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
+				internalSequenceId = getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
+				rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+			}
+			else if(rmsBean.isSequenceClosedClient()){
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed,
internalSequenceId));
+			}
+			else if(rmsBean.isTerminateAdded()){
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated,
internalSequenceId));
+			}
+			else if(rmsBean.isTimedOut()){
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout,
internalSequenceId));
+			}
 		}
 		
 		// If the call application is a 2-way MEP, and uses a anonymous replyTo, and the

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
Wed Nov 28 06:48:29 2007
@@ -74,7 +74,7 @@
 			}
 		}
 
-		TerminateManager.terminateSendingSide (rmsBean, storageManager);
+		TerminateManager.terminateSendingSide (rmsBean, storageManager, false);
 		
 		// Stop this message travelling further through the Axis runtime
 		terminateResRMMsg.pause();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
Wed Nov 28 06:48:29 2007
@@ -794,7 +794,7 @@
 		// Cleanup sending side.
 		if (log.isDebugEnabled())
 			log.debug("Terminating sending sequence " + rmsBean);
-		TerminateManager.terminateSendingSide(rmsBean, storageManager);
+		TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
@@ -820,16 +820,16 @@
 		// Find the rmsBean
 		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
 		if (rmsBean != null) {
-		
-			// Notify the clients of a failure
-			notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
-			
+					
 			rmMsgCtx.pause();
 			
 			// Cleanup sending side.
 			if (log.isDebugEnabled())
 				log.debug("Terminating sending sequence " + rmsBean);
-			TerminateManager.terminateSendingSide(rmsBean, storageManager);
+			if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true)){
+				// We did not reallocate so we notify the clients of a failure
+				notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
+			}
 			
 			// Update the last activated time.
 			rmsBean.setLastActivatedTime(System.currentTimeMillis());

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
Wed Nov 28 06:48:29 2007
@@ -24,6 +24,7 @@
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import javax.xml.namespace.QName;
@@ -41,6 +42,7 @@
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -1062,6 +1064,40 @@
 		            
 		return targetEnv;
 	}
+	
+	public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean
oldRMSBean, List msgsToSend)throws AxisFault{
+	    if (log.isDebugEnabled())
+	        log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence");
+	    
+		ConfigurationContext ctx = storageManager.getContext();
+		ServiceClient client = new ServiceClient(ctx,  null);
+		
+		//populate the client options
+		Options options = client.getOptions();
+		options.setTo(oldRMSBean.getToEndpointReference());
+		options.setReplyTo(oldRMSBean.getReplyToEndpointReference());
+		
+        //internal sequence ID is different
+        String internalSequenceID = oldRMSBean.getInternalSequenceID();
+        //we also need to obtain the sequenceKey from the internalSequenceID.
+        String sequenceKey = 
+          SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress());
+        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, sequenceKey); 
+        options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
internalSequenceID);
+        options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion());
+      	options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE);
+      	
+        //send the msgs
+      	Iterator it = msgsToSend.iterator();
+      	while(it.hasNext()){
+      		MessageContext msgCtx = (MessageContext)it.next();
+      		client.getOptions().setAction(msgCtx.getWSAAction());
+      		client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
+      	}
+      	
+	    if (log.isDebugEnabled())
+	        log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence");
+	}
 
   /**
    * Remove the MustUnderstand header blocks.
@@ -1117,6 +1153,32 @@
 		
 		return newEPR;
 	}	
+	
+	public static boolean isAutoStartNewSequence(MessageContext mc){
+		if(log.isDebugEnabled()) log.debug("Entry: SandeshaUtil::isAutoStartNewSequence");
+		boolean result = false;
+
+		//look at the msg ctx first
+		String auto = (String) mc.getProperty(SandeshaClientConstants.AUTO_START_NEW_SEQUENCE);
+		if ("true".equals(auto)) {
+			if (log.isDebugEnabled()) log.debug("Autostart message context");
+			result = true;
+		}			
+		
+		if(!result) {
+			//look at the operation
+			if (mc.getAxisOperation() != null) {
+				Parameter autoParam = mc.getAxisOperation().getParameter(SandeshaClientConstants.AUTO_START_NEW_SEQUENCE);
+				if (null != autoParam && "true".equals(autoParam.getValue())) {
+					if (log.isDebugEnabled()) log.debug("autostart operation");
+					result = true;
+				}
+			}
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Exit: SandeshaUtil::isAutoStartNewSequence, " + result);
+		return result;		
+	}
 	
 	public static boolean isMessageUnreliable(MessageContext mc) {
 		if(log.isDebugEnabled()) log.debug("Entry: SandeshaUtil::isMessageUnreliable");

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
Wed Nov 28 06:48:29 2007
@@ -22,11 +22,14 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
@@ -34,6 +37,8 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -46,6 +51,8 @@
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
+import com.ibm.xslt4j.bcel.generic.RETURN;
+
 /**
  * Contains logic to remove all the storad data of a sequence. Methods of this
  * are called by sending side and the receiving side when appropriate
@@ -225,16 +232,18 @@
 	 * @param configContext
 	 * @param sequenceID
 	 * @throws SandeshaException
+	 * 
+	 * @return true if the reallocation happened sucessfully
 	 */
-	public static void terminateSendingSide(RMSBean rmsBean, 
-			StorageManager storageManager) throws SandeshaException {
+	public static boolean terminateSendingSide(RMSBean rmsBean, 
+			StorageManager storageManager, boolean reallocate) throws SandeshaException {
 
 		// Indicate that the sequence is terminated
 		rmsBean.setTerminated(true);
 		rmsBean.setTerminateAdded(true);
 		storageManager.getRMSBeanMgr().update(rmsBean);
 		
-		cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean);
+		return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean,
reallocate);
 	}
 
 	public static void timeOutSendingSideSequence(String internalSequenceId,
@@ -245,26 +254,72 @@
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
 		storageManager.getRMSBeanMgr().update(rmsBean);
 
-		cleanSendingSideData(internalSequenceId, storageManager, rmsBean);
+		cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false);
 	}
 
-	private static void cleanSendingSideData(String internalSequenceId, StorageManager storageManager,
RMSBean rmsBean) throws SandeshaException {
+	private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager,

+			RMSBean rmsBean, boolean reallocateIfPossible) throws SandeshaException {
 
+		if(log.isDebugEnabled())
+			log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", "
+ reallocateIfPossible);
+		
+		boolean reallocatedOK = false;
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
 
 		// removing retransmitterMgr entries and corresponding message contexts.
 		Collection collection = retransmitterBeanMgr.find(internalSequenceId);
 		Iterator iterator = collection.iterator();
+		List msgsToReallocate = null;
+		if(reallocateIfPossible){
+			msgsToReallocate = new LinkedList();
+		}
+		Range[] ranges = rmsBean.getClientCompletedMessages().getRanges();
+		long lastAckedMsg = -1;
+		
+		if(ranges.length==1){
+			//a single contiguous acked range
+			lastAckedMsg = ranges[0].upperValue;
+		}
+		else{
+			//cannot reallocate as there are gaps
+			reallocateIfPossible=false;
+			if(log.isDebugEnabled())
+				log.debug("cannot reallocate sequence as there are gaps");
+		}
+		
 		while (iterator.hasNext()) {
 			SenderBean retransmitterBean = (SenderBean) iterator.next();
 			if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ ||
rmsBean.isTerminated()){
 				//remove all but terminate sequence messages
-				retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
-
 				String messageStoreKey = retransmitterBean.getMessageContextRefKey();
+				if(reallocateIfPossible
+					&& retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.APPLICATION
+					&& retransmitterBean.getMessageNumber()==lastAckedMsg+1){
+					
+					//try to reallocate application msgs
+					msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
+					lastAckedMsg++;
+				}
+				retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
 				storageManager.removeMessageContext(messageStoreKey);				
 			}
 		}
+		
+		if(reallocateIfPossible && msgsToReallocate.size()>0){
+			try{
+			      SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate);

+			      reallocatedOK = true;
+			}
+			catch(Exception e){
+				//want that the reallocation failed
+				if(log.isDebugEnabled())
+					log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(),
e.toString()));				
+			}			
+		}
+		
+		if(log.isDebugEnabled())
+			log.debug("Exit: TerminateManager::cleanSendingSideData " + reallocatedOK);
+		return reallocatedOK;
 	}
 
 	public static void addTerminateSequenceMessage(RMMsgContext referenceMessage, String internalSequenceID,
String outSequenceId, StorageManager storageManager) throws AxisFault {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Wed Nov 28 06:48:29 2007
@@ -387,7 +387,7 @@
 					String sequenceID = terminateSequence.getIdentifier().getIdentifier();
 	
 					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
-					TerminateManager.terminateSendingSide(rmsBean, storageManager);
+					TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
 					
 					if(transaction != null && transaction.isActive()) transaction.commit();
 					transaction = null;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=599016&r1=599015&r2=599016&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
(original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Wed Nov 28 06:48:29 2007
@@ -62,6 +62,7 @@
 msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null.
 transportOutNotPresent=Sandesha2 Internal Error: original transport sender is not present.
 workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. Will try the
next one.
+reallocationFailed=The sequence ''{0}'' could not be reallocated due to the error ''{1}''.
 couldNotFindOperation=Could not find operation for message type {0} and spec level {1}.
 cannotChooseAcksTo=Could not find an appropriate acksTo for the reply sequence, given inbound
sequence {0} and bean info {1}.
 cannotChooseSpecLevel=Could not find an appropriate specification level for the reply sequence,
given inbound sequence {0} and bean info {1}.



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message