ws-sandesha-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew K Gatford <GATF...@uk.ibm.com>
Subject Re: svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java
Date Thu, 27 Sep 2007 08:08:31 GMT
To check the fix, I'd like to understand what response is being returned 
when sending out a CreateSequence message that isn't a 
CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it 
something else ?

Andrew Gatford

Hursley MP211
Telephone : 
Internal (7) 245743 
External 01962 815743
Internet : gatfora@uk.ibm.com



deepal@apache.org 
26/09/2007 17:20

To
sandesha-cvs@ws.apache.org
cc

Subject
svn commit: r579708 - in 
/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2:

msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java 
workers/SenderWorker.java






Author: deepal
Date: Wed Sep 26 09:20:24 2007
New Revision: 579708

URL: http://svn.apache.org/viewvc?rev=579708&view=rev
Log:
- Fixing issue when we send a CS request and receive non create sequence 
response , in that case we need to stop sending CS req and need to notify 
client abt that
- When the timeout happen it never going to notify to the client 

- This commit fix both of the above , Chamikara or someone who expert 
about the code base please validate the patch 

 

Modified:
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java

(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java

Wed Sep 26 09:20:24 2007
@@ -247,7 +247,6 @@
                                                                 // with 
the same internal sequenceid
                                                                 // Check 
that someone hasn't created the bean
                                                                 rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
- 
                                                                 // if 
first message - setup the sending side sequence - both for the
                                                                 // server 
and the client sides.
                                                                 if 
(rmsBean == null) {
@@ -363,7 +362,8 @@
                                 }
 
                                 // Update the rmsBean
- storageManager.getRMSBeanMgr().update(rmsBean);
+ rmsBean.setApplicationMessageMessageId(msgContext.getMessageID());
+        storageManager.getRMSBeanMgr().update(rmsBean);
 
                                 if(startPolling) {
 
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), 
rmsBean);

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java

(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java

Wed Sep 26 09:20:24 2007
@@ -144,7 +144,10 @@
                  */
                 private boolean avoidAutoTermination = false;
 
-                /**
+    //To store the message id if the outgoing appliction message
+    private String applicationMessageMessageId ;
+
+    /**
                  * Flags that are used to check if the primitive types on 
this bean
                  * have been set. If a primitive type has not been set 
then it will
                  * be ignored within the match method.
@@ -512,4 +515,12 @@
                                 return match;
                 }
 
+
+    public String getApplicationMessageMessageId() {
+        return applicationMessageMessageId;
+    }
+
+    public void setApplicationMessageMessageId(String 
applicationMessageMessageId) {
+        this.applicationMessageMessageId = applicationMessageMessageId;
+    }
 }

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff

==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

(original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Wed Sep 26 09:20:24 2007
@@ -7,6 +7,9 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.client.async.Callback;
+import org.apache.axis2.client.async.AxisCallback;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -17,6 +20,7 @@
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.OutOnlyAxisOperation;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
@@ -45,13 +49,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.sandesha2.wsrm.*;
 
 public class SenderWorker extends SandeshaWorker implements Runnable {
 
@@ -82,7 +80,7 @@
                                 try {
                                                 StorageManager 
storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext, 
configurationContext.getAxisConfiguration());
                                                 SenderBeanMgr 
senderBeanMgr = storageManager.getSenderBeanMgr();
- 
+
                                                 transaction = 
storageManager.getTransaction();
 
                                                 String key = 
senderBean.getMessageContextRefKey();
@@ -195,8 +193,8 @@
                                                 }
 
                                                 //if the message belong 
to the Replay Model, it will be send out only if 
- 
- 
+
+
                                                 boolean continueSending = 
updateMessage(rmMsgCtx,senderBean,storageManager);
                                                 //save changes done @ 
updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion
 storageManager.getSenderBeanMgr().update(senderBean);
@@ -210,7 +208,7 @@
  transaction.commit();
  transaction = null;
                                                                 }
- 
+ invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, 
!continueSending");
                                                                 return;
                                                 }
 
@@ -236,7 +234,7 @@
  senderBeanMgr.update(bean2);
                                                                 }
                                                 }
- 
+
                                                 // have to commit the 
transaction before sending. This may
                                                 // get changed when WS-AT 
is available.
                                                 if(transaction != null) {
@@ -335,10 +333,15 @@
 
                                                 transaction = null;
 
-                                                if 
((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) 

- checkForSyncResponses(msgCtx);
- 
-                                                if 
((rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
+            if ((processResponseForFaults || successfullySent) && 
!msgCtx.isServerSide()) {
+                boolean  validCs = checkForSyncResponses(msgCtx );
+                if (!validCs) {
+                    invokeCallBackObject(storageManager,msgCtx ,
+                            "Sandesha2 sender thread has not received a 
valid CreateSequnceResponse");
+                }
+            }
+
+            if ((rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
  &&
  
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) 
{
                                                                 try {
@@ -505,8 +508,13 @@
                                                 log.debug("Exit: 
SenderWorker::isAckPiggybackableMsgType, " + piggybackable);
                                 return piggybackable;
                 }
- 
-                private void checkForSyncResponses(MessageContext msgCtx) 
{
+
+    /**
+     * return value will be false if the create sequence fails else it 
will be true
+     * @param msgCtx
+     * @return
+     */
+    private boolean checkForSyncResponses(MessageContext msgCtx ) {
                                 if (log.isDebugEnabled())
                                                 log.debug("Enter: 
SenderWorker::checkForSyncResponses, " + 
msgCtx.getEnvelope().getHeader());
 
@@ -522,7 +530,7 @@
                                                 boolean 
transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != 
null);
                                                 if (!transportInPresent 
&& (responseMessageContext==null || 
responseMessageContext.getEnvelope()==null)) {
 if(log.isDebugEnabled()) log.debug("Exit: 
SenderWorker::checkForSyncResponses, no response present");
-                                                                return;
+                                                                return 
true;
                                                 }
 
                                                 //to find out weather the 
response was built by me.
@@ -592,7 +600,7 @@
  log.error ("Caught exception", e);
  }
 
-  return;
+  return true;
                                                                 }
 
                                                                 //If 
addressing is disabled we will be adding this message simply as the 
application response of the request message.
@@ -631,7 +639,7 @@
                                                 //if the syncResponseWas 
not built here and the client was not expecting a sync response. We will 
not try to execute 
                                                 //here. Doing so will 
cause a double invocation for a async message. 
                                                 if 
(msgCtx.getOptions().isUseSeparateListener()==true &&  !syncResponseBuilt) 
{
-                                                                return;
+                                                                return 
true;
                                                 }
 
 
@@ -650,13 +658,22 @@
                                                 }
 
                                 } catch (Exception e) {
-                                                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
-                                                if (log.isWarnEnabled())
- log.warn(message, e);
+
+            String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);
+            if (msgCtx != null &&! msgCtx.isServerSide() &&
+ 
(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())
+                            || 
Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction()))

){
+                // We have not received a valid createSequnce reponse for 
the request we send so we need to terminate the seunce here
+                return false;
+            } else {
+                if (log.isWarnEnabled())
+                    log.warn(message, e);
+            }
                                 }
                                 if (log.isDebugEnabled())
                                                 log.debug("Exit: 
SenderWorker::checkForSyncResponses");
-                }
+        return true;
+    }
 
                 private void recordError (Exception e, RMMsgContext 
outRMMsg, StorageManager storageManager) throws SandeshaStorageException {
                                 // Store the Exception as a sequence 
property to enable the client to lookup the last 
@@ -702,5 +719,60 @@
                                                 }
                                 }
                 }
- 
+
+    private void invokeCallBackObject(StorageManager storageManager,
+                                      MessageContext msgCtx,
+                                      String message) throws 
SandeshaStorageException {
+        Transaction transaction = null;
+        if (msgCtx.isServerSide()) {
+            return;
+        }
+        try {
+            transaction = storageManager.getTransaction();
+            //terminate message sent using the SandeshaClient. Since the 
terminate message will simply get the
+            //InFlow of the reference message get called which could be 
zero sized (OutOnly operations).
+
+            // terminate sending side if this is the WSRM 1.0 spec.
+            // If the WSRM versoion is 1.1 termination will happen in the 
terminate sequence response message.
+
+            String internalSequenceId = (String) 
msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
+            if (internalSequenceId == null) internalSequenceId = 
senderBean.getInternalSequenceID();
+            if (internalSequenceId != null) {
+                // Create a new Transaction
+                transaction = storageManager.getTransaction();
+                RMSBean bean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
+                TerminateManager.terminateSendingSide(bean, 
storageManager);
+
+                OperationContext opCtx =
+ 
configurationContext.getOperationContext(bean.getApplicationMessageMessageId());
+                if (opCtx != null) {
+                    AxisOperation applicationAxisOperation = 
opCtx.getAxisOperation();
+                    if (applicationAxisOperation != null) {
+                        MessageReceiver msgReceiver = 
applicationAxisOperation.getMessageReceiver();
+                        if ((msgReceiver != null) && (msgReceiver 
instanceof CallbackReceiver)) {
+                            Object callback = ((CallbackReceiver) 
msgReceiver)
+ .lookupCallback(bean.getApplicationMessageMessageId());
+                            if (callback != null) {
+                                AxisCallback axisCallback = 
((AxisCallback) callback);
+                                axisCallback.onError(new 
Exception(message));
+                                axisCallback.onComplete();
+                            }
+                        }
+                    }
+                }
+                if (transaction != null && transaction.isActive()) 
transaction.commit();
+                transaction = null;
+            }
+
+        } catch (Exception e) {
+            if (log.isWarnEnabled())
+                log.warn(e);
+        } finally {
+            if (transaction != null && transaction.isActive()) {
+                transaction.rollback();
+                transaction = null;
+            }
+        }
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org








Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU







---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org


Mime
View raw message