ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject svn commit: r394865 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: client/ handlers/ msgprocessors/ util/ workers/
Date Tue, 18 Apr 2006 06:57:28 GMT
Author: chamikara
Date: Mon Apr 17 23:57:27 2006
New Revision: 394865

URL: http://svn.apache.org/viewcvs?rev=394865&view=rev
Log:
Addded the SandeshaListner interface.
Users can register an object of this to get notified for special events (for e.g. RM faults,
sequence time outs).
Additional methods to the Sandeshaclient.
Corrected the timing out logic. Now a sequence can timeout when the maximum number of retransmissions
exceed or depending on the last activated time. In a timeout, a registered SandeshaListner
object will be invoked.


Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
Removed:
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaFaultCallback.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java Mon Apr
17 23:57:27 2006
@@ -588,7 +588,7 @@
 		return getOutgoingSequenceReport(internalSequenceID,configurationContext);
 	}
 	
-	private static SequenceReport getOutgoingSequenceReport (String internalSequenceID,ConfigurationContext
configurationContext) throws SandeshaException {
+	public static SequenceReport getOutgoingSequenceReport (String internalSequenceID,ConfigurationContext
configurationContext) throws SandeshaException {
 		
 		SequenceReport sequenceReport = new SequenceReport ();
 		sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
@@ -599,12 +599,16 @@
 		
 		Transaction reportTransaction = storageManager.getTransaction();
 		
+		sequenceReport.setInternalSequenceID(internalSequenceID);
+		
 		CreateSeqBean createSeqFindBean = new CreateSeqBean ();
 		createSeqFindBean.setInternalSequenceID(internalSequenceID);
 		
 		CreateSeqBean createSeqBean = createSeqMgr.findUnique(createSeqFindBean);
 		
+		//if data not is available sequence has to be terminated or timedOut.
 		if (createSeqBean==null) {
+			
 			//check weather this is an terminated sequence.
 			if (isSequenceTerminated(internalSequenceID,seqPropMgr)) {
 				fillTerminatedOutgoingSequenceInfo (sequenceReport,internalSequenceID,seqPropMgr);
@@ -617,10 +621,16 @@
 				
 				return sequenceReport;
 			}
+		
+			//sequence must hv been timed out before establiching. No other posibility I can think
of.
+			//this does not get recorded since there is no key (which is normally the sequenceID)
to store it.
+			//(properties with key as the internalSequenceID get deleted in timing out)
 			
-			String message = "Unrecorder internalSequenceID";
-			log.debug(message);
-			return null;
+			//so, setting the sequence status to INITIAL
+			sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+			
+			//returning the current sequence report.
+			return sequenceReport;
 		}
 		
 		String outSequenceID = createSeqBean.getSequenceID();
@@ -633,7 +643,6 @@
 		}
 		
 		sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
-		
 		fillOutgoingSequenceInfo(sequenceReport,outSequenceID,seqPropMgr);
 		
 		reportTransaction.commit();
@@ -678,7 +687,6 @@
 		}
 		
 		String outSequenceID = internalSequenceBean.getSequenceID();
-	
 		SequencePropertyBean sequenceTerminatedBean = seqPropMgr.retrieve(outSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
 		if (sequenceTerminatedBean!=null && Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue()))
{
 			return true;
@@ -798,7 +806,7 @@
 		return sequenceReport;
 	}
 	
-	private static String getInternalSequenceID (String to, String sequenceKey) {
+	public static String getInternalSequenceID (String to, String sequenceKey) {
 		return SandeshaUtil.getInternalSequenceID(to,sequenceKey);
 	}
 	

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
Mon Apr 17 23:57:27 2006
@@ -25,5 +25,5 @@
 	public static String MESSAGE_NUMBER = "Sandesha2MessageNumber";
 	public static String RM_SPEC_VERSION = "Sandesha2RMSpecVersion";
 	public static String DUMMY_MESSAGE = "Sandesha2DummyMessage"; //If this property is set,
even though this message will invoke the RM handlers, this will not be sent as an actual application
message
-	public static String RM_FAULT_CALLBACK = "Sandesha2RMFaultCallback";
+	public static String SANDESHA_LISTENER = "Sandesha2Listener";
 }

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java?rev=394865&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java Mon Apr
17 23:57:27 2006
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.client;
+
+import org.apache.axis2.AxisFault;
+
+/**
+ * By implementing this interface and registering an object with
+ * Sandesha2, users will be invoked in some events.
+ * 
+ * @author Chamikara Jayalath <chamikaramj@gmail.com>
+ */
+
+public interface SandeshaListener {
+
+	/**
+	 * This sill be invoked when Sandesha2 receive a fault message
+	 * in response to a RM control message that was sent by it.
+	 */
+	public void onError(AxisFault fault);
+	
+	/**
+	 * This will be invoked when a specific sequence time out.
+	 * The timing out method depends on policies.
+	 */
+	public void onTimeOut(SequenceReport report);
+	
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Mon Apr 17 23:57:27 2006
@@ -40,7 +40,7 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
 import org.apache.sandesha2.storage.StorageManager;
@@ -90,7 +90,7 @@
 					if (requestMessage!=null) {
 						if(SandeshaUtil.isRetriableOnFaults(requestMessage)){
 							
-							SandeshaFaultCallback faultCallback = (SandeshaFaultCallback) operationContext.getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+							SandeshaListener faultCallback = (SandeshaListener) operationContext.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
 							if (faultCallback!=null) {
 								
 								

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon
Apr 17 23:57:27 2006
@@ -114,10 +114,7 @@
 		if (dummyMessageString!=null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
 			dummyMessage = true;
 		
-		StorageManager storageManager = SandeshaUtil
-				.getSandeshaStorageManager(context);
-
-
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 
 		MsgProcessor msgProcessor = null;
 		int messageType = rmMsgCtx.getMessageType();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Mon Apr 17 23:57:27 2006
@@ -131,10 +131,12 @@
 			return;
 		}
 		
-		//updating the last activated time of the sequence.
-//		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
-//		SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
-//		lastUpdatedTimeTransaction.commit();
+        String internalSequenceID = SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx);
+		
+        //updating the last activated time of the sequence.
+		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
+		SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
+		lastUpdatedTimeTransaction.commit();
 		
 		//Starting transaction
 		Transaction ackTransaction = storageManager.getTransaction();

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -42,7 +42,7 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -438,11 +438,11 @@
 		ConfigurationContext configContext = msgContext .getConfigurationContext();
 		
 		//setting the Fault callback		
-		SandeshaFaultCallback faultCallback = (SandeshaFaultCallback) msgContext.getOptions().getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+		SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
 		if (faultCallback!=null) {
 			OperationContext operationContext = msgContext.getOperationContext();
 			if (operationContext!=null) {
-				operationContext.setProperty(SandeshaClientConstants.RM_FAULT_CALLBACK,faultCallback);
+				operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER,faultCallback);
 			}
 		}
 		
@@ -848,7 +848,7 @@
 		createSeqEntry.setMessageContextRefKey(key);
 		createSeqEntry.setTimeToSend(System.currentTimeMillis());
 		createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-
+		createSeqEntry.setInternalSequenceID(internalSequenceId);
 		// this will be set to true in the sender
 		createSeqEntry.setSend(true);
 

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -32,7 +32,7 @@
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.client.SandeshaClient;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -230,12 +230,12 @@
 		
 		MessageContext msgCtx = rmMsgCtx.getMessageContext();
 		
-		//adding the RM_FAULT_CALLBACK
-		SandeshaFaultCallback faultCallback = (SandeshaFaultCallback) msgCtx.getOptions().getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+		//adding the SANDESHA_LISTENER
+		SandeshaListener faultCallback = (SandeshaListener) msgCtx.getOptions().getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
 		if (faultCallback!=null) {
 			OperationContext operationContext = msgCtx.getOperationContext();
 			if (operationContext!=null) {
-				operationContext.setProperty(SandeshaClientConstants.RM_FAULT_CALLBACK,faultCallback);
+				operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER,faultCallback);
 			}
 		}
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -288,9 +288,8 @@
 		updateAppMessagesTransaction.commit();
 		
 		Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
-		SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+		SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
 		lastUpdatedTimeTransaction.commit();
-		
 		
 		createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
 				.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java Mon
Apr 17 23:57:27 2006
@@ -80,7 +80,7 @@
 		SenderBean findBean = new SenderBean();
 
 		String sequnceID = SandeshaUtil.getSequenceIDFromRMMessage (rmMessageContext);
-		
+
 		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
 		findBean.setSend(true);
 		findBean.setReSend(false);

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Mon Apr 17 23:57:27 2006
@@ -22,8 +22,13 @@
 import org.apache.axis2.description.Parameter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.client.SequenceReport;
 import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -38,7 +43,7 @@
 
 	Log log = LogFactory.getLog( getClass());
 	
-	public SenderBean adjustRetransmittion(
+	public boolean adjustRetransmittion(
 			SenderBean retransmitterBean,ConfigurationContext configContext) throws SandeshaException
{
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		
@@ -48,18 +53,36 @@
 			throw new SandeshaException ("Stored Key not present in the retransmittable message");
 
 		MessageContext messageContext = storageManager.retrieveMessageContext(storedKey,configContext);
-
-
+		RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(messageContext);
+		
+		String internalSequenceID = retransmitterBean.getInternalSequenceID();
+		String sequenceID = retransmitterBean.getSequenceID();
+		
 		SandeshaPropertyBean propertyBean = SandeshaUtil.getPropretyBean(messageContext);
 		
 		retransmitterBean.setSentCount(retransmitterBean.getSentCount() + 1);
 		adjustNextRetransmissionTime(retransmitterBean, propertyBean);
 
 		int maxRetransmissionAttempts = propertyBean.getMaximumRetransmissionCount();
+		
+		boolean timeOutSequence = false;
 		if (maxRetransmissionAttempts>=0 && retransmitterBean.getSentCount() > maxRetransmissionAttempts)
+			timeOutSequence = true;		
+		
+		boolean sequenceTimedOut = SequenceManager.hasSequenceTimedOut(internalSequenceID, rmMsgCtx);
+		if (sequenceTimedOut)
+			timeOutSequence = true;
+	
+		boolean continueSending = true;
+		if (timeOutSequence) {
 			stopRetransmission(retransmitterBean);
-
-		return retransmitterBean;
+			
+			//Only messages of outgoing sequences get retransmitted. So named following method according
to that.
+			finalizeTimedOutSequence (internalSequenceID,sequenceID, messageContext);
+			continueSending = false;
+		}
+		
+		return continueSending;
 	}
 
 	/**
@@ -108,6 +131,17 @@
 		}
 
 		return interval;
+	}
+	
+	private void finalizeTimedOutSequence (String internalSequenceID, String sequenceID ,MessageContext
messageContext) throws SandeshaException {
+		ConfigurationContext configurationContext = messageContext.getConfigurationContext();
+		SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID ,configurationContext);
+		TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID, false);
+		
+		SandeshaListener listener = (SandeshaListener) messageContext.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+		if (listener!=null) {
+			listener.onTimeOut(report);
+		}
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Mon Apr
17 23:57:27 2006
@@ -311,6 +311,9 @@
 		specVerionBean.setValue(specVersion);
 		seqPropMgr.insert(specVerionBean);
 		
+		//updating the last activated time.
+		updateLastActivatedTime(internalSequenceId,configurationContext);
+		
 		SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
 		
 		updateClientSideListnerIfNeeded (firstAplicationMsgCtx,anonymousURI);
@@ -357,19 +360,19 @@
 	 * @param configContext
 	 * @throws SandeshaException
 	 */
-	public static void updateLastActivatedTime (String sequenceID, ConfigurationContext configContext)
throws SandeshaException {
+	public static void updateLastActivatedTime (String propertyKey, ConfigurationContext configContext)
throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		//Transaction lastActivatedTransaction = storageManager.getTransaction();
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
-		SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(propertyKey,
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
 		
 		boolean added = false;
 		
 		if (lastActivatedBean==null) {
 			added = true;
 			lastActivatedBean = new SequencePropertyBean ();
-			lastActivatedBean.setSequenceID(sequenceID);
+			lastActivatedBean.setSequenceID(propertyKey);
 			lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
 		}
 		
@@ -385,12 +388,12 @@
 	}
 	
 	
-	public static long getLastActivatedTime (String sequenceID, ConfigurationContext configContext)
throws SandeshaException {
+	public static long getLastActivatedTime (String propertyKey, ConfigurationContext configContext)
throws SandeshaException {
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
-		SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+		SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(propertyKey,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
 		
 		long lastActivatedTime = -1;
 		
@@ -401,7 +404,7 @@
 		return lastActivatedTime;
 	}
 		
-	public static boolean hasSequenceTimedOut (String sequenceID, RMMsgContext rmMsgCtx) throws
SandeshaException {
+	public static boolean hasSequenceTimedOut (String propertyKey, RMMsgContext rmMsgCtx) throws
SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
 		SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		
@@ -422,7 +425,7 @@
 		
 		//SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
 		//if (lastActivatedBean!=null) {
-		long lastActivatedTime = getLastActivatedTime(sequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
+		long lastActivatedTime = getLastActivatedTime(propertyKey,rmMsgCtx.getMessageContext().getConfigurationContext());
 		long timeNow = System.currentTimeMillis();
 		if (lastActivatedTime>0 && (lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow))
 			sequenceTimedOut = true;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Mon Apr
17 23:57:27 2006
@@ -184,15 +184,16 @@
 	 * @param sequenceID
 	 * @throws SandeshaException
 	 */
-	public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID,boolean
serverSide) throws SandeshaException {
+	public static void terminateSendingSide (ConfigurationContext configContext, String internalSequenceID,boolean
serverSide) throws SandeshaException {
 		
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
+		
+		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
 		seqPropMgr.insert(seqTerminatedBean);
 		
-		cleanSendingSideData(configContext,sequenceID,serverSide);
+		cleanSendingSideData(configContext,internalSequenceID,serverSide);
 		
 		
 	}
@@ -200,8 +201,26 @@
 	
 	
 	private static void doUpdatesIfNeeded (String sequenceID, SequencePropertyBean propertyBean,
SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+		
+		boolean addEntryWithSequenceID = false;
+		
 		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES))
{
-			
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED))
{
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED))
{
+			addEntryWithSequenceID = true;
+		}
+		
+		if (propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT))
{
+			addEntryWithSequenceID = true;
+		}
+		
+		if (addEntryWithSequenceID && sequenceID!=null) {
 			//this value cannot be completely deleted since this data will be needed by SequenceReports
 			//so saving it with the sequenceID value being the out sequenceID.
 			
@@ -213,8 +232,10 @@
 			seqPropMgr.insert(newBean);
 			//TODO amazingly this property does not seem to get deleted without following - in the
hibernate impl 
 			//(even though the lines efter current methodcall do this).
-			seqPropMgr.delete (propertyBean.getSequenceID(),propertyBean.getName());			
+			seqPropMgr.delete (propertyBean.getSequenceID(),propertyBean.getName());
 		}
+		
+		
 	}
 	
 	private static boolean isProportyDeletable (String name) {
@@ -244,60 +265,45 @@
 		return deleatable;
 	}
 	
-	public static void timeOutSendingSideSequence (ConfigurationContext context,String sequenceID,
boolean serverside) throws SandeshaException {
+	public static void timeOutSendingSideSequence (ConfigurationContext context,String internalSequenceID,
boolean serverside) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context);
 		
 		SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
-		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
+		SequencePropertyBean seqTerminatedBean = new SequencePropertyBean (internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
 		seqPropMgr.insert(seqTerminatedBean);
 		
 		
-		cleanSendingSideData(context,sequenceID,serverside);
+		cleanSendingSideData(context,internalSequenceID,serverside);
 	}
 	
-	private static void cleanSendingSideData (ConfigurationContext configContext,String sequenceID,
boolean serverSide) throws SandeshaException {
+	private static void cleanSendingSideData (ConfigurationContext configContext,String internalSequenceID,
boolean serverSide) throws SandeshaException {
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
 		
 		SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
 		SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
 		CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
 		
-		SequencePropertyBean sequenceTerminatedBean = new SequencePropertyBean (sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
-		sequencePropertyBeanMgr.insert(sequenceTerminatedBean);
+		String outSequenceID = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,configContext);
 		
 		if (!serverSide) {
-			//stpoing the listner for the client side.	
-			
-			//SequencePropertyBean outGoingAcksToBean  = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQ_ACKSTO);
-			
 			boolean stopListnerForAsyncAcks = false;
-			SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-			if (internalSequenceBean!=null) {
-				String internalSequenceID = internalSequenceBean.getValue();
-				SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+			SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
 				
-				String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configContext);
-				String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+			String addressingNamespace = SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configContext);
+			String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
 				
-				if (acksToBean!=null) {
-					String acksTo = acksToBean.getValue();
-					if (acksTo!=null && !anonymousURI.equals(acksTo)) {
-						stopListnerForAsyncAcks = true;
-					}
+			if (acksToBean!=null) {
+				String acksTo = acksToBean.getValue();
+				if (acksTo!=null && !anonymousURI.equals(acksTo)) {
+					stopListnerForAsyncAcks = true;
 				}
 			}
 		}
 		
-		SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-		if (internalSequenceBean==null)
-			throw new SandeshaException ("TempSequence entry not found");
-		
-		String internalSequenceId = (String) internalSequenceBean.getValue();
-		
 		//removing retransmitterMgr entries
 		//SenderBean findRetransmitterBean = new SenderBean ();
 		//findRetransmitterBean.setInternalSequenceID(internalSequenceId);
-		Collection collection = retransmitterBeanMgr.find(internalSequenceId);
+		Collection collection = retransmitterBeanMgr.find(internalSequenceID);
 		Iterator iterator = collection.iterator();
 		while (iterator.hasNext()) {
 			SenderBean retransmitterBean = (SenderBean) iterator.next();
@@ -306,7 +312,7 @@
 		
 		//removing the createSeqMgrEntry
 		CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
-		findCreateSequenceBean.setInternalSequenceID(internalSequenceId);
+		findCreateSequenceBean.setInternalSequenceID(internalSequenceID);
 		collection = createSeqBeanMgr.find(findCreateSequenceBean);
 		iterator = collection.iterator();
 		while (iterator.hasNext()) {
@@ -316,19 +322,19 @@
 		
 		//removing sequence properties
 		SequencePropertyBean findSequencePropertyBean1 = new SequencePropertyBean ();
-		findSequencePropertyBean1.setSequenceID(internalSequenceId);
+		findSequencePropertyBean1.setSequenceID(internalSequenceID);
 		collection = sequencePropertyBeanMgr.find(findSequencePropertyBean1);
 		iterator = collection.iterator();
 		while (iterator.hasNext()) {
 			SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next();
-			doUpdatesIfNeeded (sequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
+			doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
 			
 			if (isProportyDeletable(sequencePropertyBean.getName())) {
 				sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
 			}
 		}
 		
-		SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
+		SandeshaUtil.stopSenderForTheSequence(internalSequenceID);
 	}
 	
 	public static void addTerminateSequenceMessage(RMMsgContext referenceMessage,

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Apr 17 23:57:27
2006
@@ -124,7 +124,9 @@
 				
 
 				MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
-				retransmitterAdjuster.adjustRetransmittion(senderBean, context);
+				boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context);
+				if (!continueSending)
+					continue;
 				
 				pickMessagesToSendTransaction.commit();
 				
@@ -169,17 +171,7 @@
 				if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
 					Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
 					String sequenceID = sequence.getIdentifier().getIdentifier();
-					// checking weather the sequence has been timed out.
-					boolean sequenceTimedOut = SequenceManager.hasSequenceTimedOut(sequenceID, rmMsgCtx);
-					if (sequenceTimedOut) {
-						// sequence has been timed out.
-						// do time out processing.
-						// TODO uncomment below line
-						TerminateManager.timeOutSendingSideSequence(context,sequenceID, msgCtx.isServerSide());
-						String message = "Sequence timed out";
-						log.debug(message);
-						throw new SandeshaException(message);
-					}
+	
 				}
 				
 				//checking weather this message can carry piggybacked acks
@@ -235,7 +227,9 @@
 					TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
 					String sequenceID = terminateSequence.getIdentifier().getIdentifier();
 					ConfigurationContext configContext = msgCtx.getConfigurationContext();
-					TerminateManager.terminateSendingSide(configContext,sequenceID, msgCtx.isServerSide());
+					
+					String internalSequenceID = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configContext);
+					TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide());
 				}
 
 				terminateCleaningTransaction.commit();



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message