cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From egl...@apache.org
Subject svn commit: r746306 - in /cxf/trunk/rt/ws: addr/src/main/java/org/apache/cxf/ws/addressing/ addr/src/main/java/org/apache/cxf/ws/addressing/soap/ rm/src/main/java/org/apache/cxf/ws/rm/ rm/src/test/java/org/apache/cxf/ws/rm/
Date Fri, 20 Feb 2009 17:49:23 GMT
Author: eglynn
Date: Fri Feb 20 17:49:22 2009
New Revision: 746306

URL: http://svn.apache.org/viewvc?rev=746306&view=rev
Log:
Allow the spuriously resent messages to be passed through to WS-RM in certain circumstances
by WS-A, so that the additional ACKs are not lost.

Modified:
    cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
    cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java

Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/ContextUtils.java Fri
Feb 20 17:49:22 2009
@@ -569,6 +569,58 @@
     }
 
     /**
+     * Store indication that a deferred uncorrelated message abort is
+     * supported
+     *
+     * @param message the current message
+     */
+    public static void storeDeferUncorrelatedMessageAbort(Message message) {
+        if (message.getExchange() != null) { 
+            message.getExchange().put("defer.uncorrelated.message.abort", Boolean.TRUE);
+        }
+    }
+
+    /**
+     * Retrieve indication that a deferred uncorrelated message abort is
+     * supported
+     *
+     * @param message the current message
+     * @returned the retrieved indication 
+     */
+    public static boolean retrieveDeferUncorrelatedMessageAbort(Message message) {
+        Boolean ret = message.getExchange() != null 
+                      ? (Boolean)message.getExchange().get("defer.uncorrelated.message.abort")
+                      : null;
+        return ret != null && ret.booleanValue();
+    }
+
+    /**
+     * Store indication that a deferred uncorrelated message abort should
+     * occur
+     *
+     * @param message the current message
+     */
+    public static void storeDeferredUncorrelatedMessageAbort(Message message) {
+        if (message.getExchange() != null) { 
+            message.getExchange().put("deferred.uncorrelated.message.abort", Boolean.TRUE);
+        }
+    }
+
+    /**
+     * Retrieve indication that a deferred uncorrelated message abort should
+     * occur.
+     *
+     * @param message the current message
+     * @returned the retrieved indication 
+     */
+    public static boolean retrieveDeferredUncorrelatedMessageAbort(Message message) {
+        Boolean ret = message.getExchange() != null 
+                      ? (Boolean)message.getExchange().get("deferred.uncorrelated.message.abort")
+                      : null;
+        return ret != null && ret.booleanValue();
+    }
+
+    /**
      * Retrieve indication that an async post-response service invocation
      * is required.
      * 

Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Fri
Feb 20 17:49:22 2009
@@ -46,7 +46,6 @@
 import org.apache.cxf.binding.soap.interceptor.AbstractSoapInterceptor;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
-import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.headers.Header;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.helpers.DOMUtils;
@@ -733,20 +732,16 @@
                 uncorrelatedExchanges.remove(maps.getRelatesTo().getValue());
             if (correlatedExchange != null) {
                 synchronized (correlatedExchange) {
-                    Exchange tmpExchange = message.getExchange();
                     message.setExchange(correlatedExchange);
-                    Endpoint endpoint = correlatedExchange.get(Endpoint.class);
-                    if (Boolean.TRUE.equals(tmpExchange.get("deferred.fault.observer.notification"))
-                        && endpoint != null) {
-                        message.getInterceptorChain().abort();
-                        if (endpoint.getInFaultObserver() != null) {
-                            endpoint.getInFaultObserver().onMessage(message);           
                
-                        }
-                    }
                 }
             } else {
-                LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
-                message.getInterceptorChain().abort();
+                if (ContextUtils.retrieveDeferUncorrelatedMessageAbort(message)) {
+                    LOG.fine("deferring uncorrelated message abort");
+                    ContextUtils.storeDeferredUncorrelatedMessageAbort(message);
+                } else {
+                    LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
+                    message.getInterceptorChain().abort();
+                }
             }
         } else if (maps == null && isRequestor(message)) {
             Message m = message.getExchange().getOutMessage();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Fri Feb 20
17:49:22 2009
@@ -26,6 +26,7 @@
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.MAPAggregator;
 
 /**
@@ -88,7 +89,12 @@
                 processSequence(destination, message);
                 processDeliveryAssurance(rmps);
             }
-            rme.receivedApplicationMessage();
+            if (ContextUtils.retrieveDeferredUncorrelatedMessageAbort(message)) {
+                LOG.info("deferred uncorrelated message abort");
+                message.getInterceptorChain().abort();
+            } else {
+                rme.receivedApplicationMessage();
+            }
         } else {
             rme.receivedControlMessage();
             if (RMConstants.getSequenceAckAction().equals(action)) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Fri Feb 20
17:49:22 2009
@@ -31,6 +31,7 @@
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
+import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.addressing.VersionTransformer;
 import org.apache.cxf.ws.addressing.v200408.AttributedURI;
@@ -110,6 +111,7 @@
                 inSeqId = rmpsIn.getSequence().getIdentifier();
                 inMessageNumber = rmpsIn.getSequence().getMessageNumber();
             }
+            ContextUtils.storeDeferUncorrelatedMessageAbort(message);
         }
         
         if ((isApplicationMessage || isLastMessage)

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMInInterceptorTest.java Fri Feb
20 17:49:22 2009
@@ -26,6 +26,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.Phase;
@@ -170,8 +171,19 @@
     public void testAppResponse() throws SequenceFault, RMException, NoSuchMethodException
{
         testAppMessage(false);
     }
+
+    @Test
+    public void testDefferedAbort() throws SequenceFault, RMException, NoSuchMethodException
{
+        testAppMessage(false, true);
+    }
     
-    private void testAppMessage(boolean onServer) throws SequenceFault, RMException, NoSuchMethodException
{
+    private void testAppMessage(boolean onServer) 
+        throws SequenceFault, RMException, NoSuchMethodException {
+        testAppMessage(onServer, false);
+    }
+
+    private void testAppMessage(boolean onServer, boolean deferredAbort) 
+        throws SequenceFault, RMException, NoSuchMethodException {
         Method m1 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgments",
                                                             new Class[] {Source.class, RMProperties.class});
         Method m2 = RMInInterceptor.class.getDeclaredMethod("processAcknowledgmentRequests",
@@ -196,7 +208,16 @@
         interceptor.processDeliveryAssurance(rmps);
         EasyMock.expectLastCall();
         EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
-        rme.receivedApplicationMessage();
+               
+        Exchange ex = control.createMock(Exchange.class);
+        message.getExchange();
+        EasyMock.expectLastCall().andReturn(ex).anyTimes();
+        ex.get("deferred.uncorrelated.message.abort");
+        EasyMock.expectLastCall().andReturn(Boolean.TRUE);
+        InterceptorChain chain = control.createMock(InterceptorChain.class);
+        message.getInterceptorChain();
+        EasyMock.expectLastCall().andReturn(chain);
+        chain.abort();
         EasyMock.expectLastCall();
 
         control.replay();

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java?rev=746306&r1=746305&r2=746306&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMOutInterceptorTest.java Fri Feb
20 17:49:22 2009
@@ -114,8 +114,9 @@
         Message message = control.createMock(Message.class);
         EasyMock.expect(interceptor.isRuntimeFault(message)).andReturn(false);
         Exchange ex = control.createMock(Exchange.class);
-        EasyMock.expect(message.getExchange()).andReturn(ex).times(3);
-        EasyMock.expect(ex.getOutMessage()).andReturn(message).times(1);       
+        EasyMock.expect(message.getExchange()).andReturn(ex).anyTimes();
+        EasyMock.expect(ex.getOutMessage()).andReturn(message).times(1);
+        EasyMock.expect(ex.put("defer.uncorrelated.message.abort", Boolean.TRUE)).andReturn(null);
      
         EasyMock.expect(message.get(Message.REQUESTOR_ROLE)).andReturn(Boolean.TRUE).anyTimes();
       
         EasyMock.expect(message.get(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND))
             .andReturn(maps).anyTimes();



Mime
View raw message