ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dee...@apache.org
Subject svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Date Wed, 26 Sep 2007 16:20:27 GMT
Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708

URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence response , in that
case we need to stop sending CS req and need to notify client abt that
- When the timeout happen it never going to notify to the client 

- This commit fix both of the above , Chamikara or someone who expert about the code base
please validate the patch 

 

Modified:
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
    webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
 				// with the same internal sequenceid
 				// Check that someone hasn't created the bean
 				rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
-				
 				// if first message - setup the sending side sequence - both for the
 				// server and the client sides.
 				if (rmsBean == null) {
@@ -363,7 +362,8 @@
 		}
 
 		// Update the rmsBean
-		storageManager.getRMSBeanMgr().update(rmsBean);
+        rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+        storageManager.getRMSBeanMgr().update(rmsBean);
 		
 		if(startPolling) {
 			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
 	 */
 	private boolean avoidAutoTermination = false;
 
-	/**
+    //To store the message id if the outgoing appliction message
+    private String applicationMessageMessageId ;
+
+    /**
 	 * Flags that are used to check if the primitive types on this bean
 	 * have been set. If a primitive type has not been set then it will
 	 * be ignored within the match method.
@@ -512,4 +515,12 @@
 		return match;
 	}
 
+
+    public String getApplicationMessageMessageId() {
+        return applicationMessageMessageId;
+    }
+
+    public void setApplicationMessageMessageId(String applicationMessageMessageId) {
+        this.applicationMessageMessageId = applicationMessageMessageId;
+    }
 }

Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
 
@@ -82,7 +80,7 @@
 		try {
 			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration());
 			SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-			
+
 			transaction = storageManager.getTransaction();
 
 			String key = senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
 			}
 			
 			//if the message belong to the Replay Model, it will be send out only if 
-			
-			
+
+
 			boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager);
 			//save changes done @ updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
 			storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
 					transaction.commit();
 					transaction = null;
 				}
-				
+				invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, !continueSending");
 				return;
 			}
 
@@ -236,7 +234,7 @@
 					senderBeanMgr.update(bean2);
 				}
 			}
-			
+
 			// have to commit the transaction before sending. This may
 			// get changed when WS-AT is available.
 			if(transaction != null) {
@@ -335,10 +333,15 @@
 			
 			transaction = null;
 
-			if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide())

-				checkForSyncResponses(msgCtx);
-			
-			if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+            if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide())
{
+                boolean  validCs = checkForSyncResponses(msgCtx );
+                if (!validCs) {
+                    invokeCallBackObject(storageManager,msgCtx ,
+                            "Sandesha2 sender thread has not received a valid CreateSequnceResponse");
+                }
+            }
+
+            if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
 					&&
 					 (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) {
 				try {
@@ -505,8 +508,13 @@
 			log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
 		return piggybackable;
 	}
-	
-	private void checkForSyncResponses(MessageContext msgCtx) {
+
+    /**
+     * return value will be false if the create sequence fails else it will be true
+     * @param msgCtx
+     * @return
+     */
+    private boolean checkForSyncResponses(MessageContext msgCtx ) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SenderWorker::checkForSyncResponses, " + msgCtx.getEnvelope().getHeader());
 
@@ -522,7 +530,7 @@
 			boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null);
 			if (!transportInPresent && (responseMessageContext==null || responseMessageContext.getEnvelope()==null))
{
 				if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, no response
present");
-				return;
+				return true;
 			}
 			
 			//to find out weather the response was built by me.
@@ -592,7 +600,7 @@
 					log.error ("Caught exception", e);
 					}
 				
-					return;
+					return true;
 				}
 				
 				//If addressing is disabled we will be adding this message simply as the application
response of the request message.
@@ -631,7 +639,7 @@
 			//if the syncResponseWas not built here and the client was not expecting a sync response.
We will not try to execute 
 			//here. Doing so will cause a double invocation for a async message. 
 			if (msgCtx.getOptions().isUseSeparateListener()==true &&  !syncResponseBuilt)
{
-				return;
+				return true;
 			}
 			
 			
@@ -650,13 +658,22 @@
 			}
 
 		} catch (Exception e) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
-			if (log.isWarnEnabled())
-				log.warn(message, e);
+
+            String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+            if (msgCtx != null &&! msgCtx.isServerSide() &&
+                    (Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+                            || Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction()))
){
+                // We have not received a valid createSequnce reponse for the request we
send so we need to terminate the seunce here
+                return false;
+            } else {
+                if (log.isWarnEnabled())
+                    log.warn(message, e);
+            }
 		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: SenderWorker::checkForSyncResponses");
-	}
+        return true;
+    }
 	
 	private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager storageManager)
throws SandeshaStorageException {
 		// Store the Exception as a sequence property to enable the client to lookup the last 
@@ -702,5 +719,60 @@
 			}
 		}
 	}
-	
+
+    private void invokeCallBackObject(StorageManager storageManager,
+                                      MessageContext msgCtx,
+                                      String message) throws SandeshaStorageException {
+        Transaction transaction = null;
+        if (msgCtx.isServerSide()) {
+            return;
+        }
+        try {
+            transaction = storageManager.getTransaction();
+            //terminate message sent using the SandeshaClient. Since the terminate message
will simply get the
+            //InFlow of the reference message get called which could be zero sized (OutOnly
operations).
+
+            // terminate sending side if this is the WSRM 1.0 spec.
+            // If the WSRM versoion is 1.1 termination will happen in the terminate sequence
response message.
+
+            String internalSequenceId = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+            if (internalSequenceId == null) internalSequenceId = senderBean.getInternalSequenceID();
+            if (internalSequenceId != null) {
+                // Create a new Transaction
+                transaction = storageManager.getTransaction();
+                RMSBean bean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+                TerminateManager.terminateSendingSide(bean, storageManager);
+
+                OperationContext opCtx =
+                        configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+                if (opCtx != null) {
+                    AxisOperation applicationAxisOperation = opCtx.getAxisOperation();
+                    if (applicationAxisOperation != null) {
+                        MessageReceiver msgReceiver = applicationAxisOperation.getMessageReceiver();
+                        if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver))
{
+                            Object callback = ((CallbackReceiver) msgReceiver)
+                                    .lookupCallback(bean.getApplicationMessageMessageId());
+                            if (callback != null) {
+                                AxisCallback axisCallback = ((AxisCallback) callback);
+                                axisCallback.onError(new Exception(message));
+                                axisCallback.onComplete();
+                            }
+                        }
+                    }
+                }
+                if (transaction != null && transaction.isActive()) transaction.commit();
+                transaction = null;
+            }
+
+        } catch (Exception e) {
+            if (log.isWarnEnabled())
+                log.warn(e);
+        } finally {
+            if (transaction != null && transaction.isActive()) {
+                transaction.rollback();
+                transaction = null;
+            }
+        }
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message