cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andreasm...@apache.org
Subject svn commit: r534829 - in /incubator/cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ systests/src/test/java/org/apache/cxf/systest/ws/rm/
Date Thu, 03 May 2007 12:48:11 GMT
Author: andreasmyth
Date: Thu May  3 05:48:11 2007
New Revision: 534829

URL: http://svn.apache.org/viewvc?view=rev&rev=534829
Log:
[JIRA CXF-479] Server-side resend.
Also fixed problem that caused the MultiClientOneway test to fail and made the address manipulation
for server-side originated out-of-band RM protocol messages in Proxy thread safe.

Modified:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Thu May  3
05:48:11 2007
@@ -204,7 +204,7 @@
                                   final org.apache.cxf.ws.addressing.EndpointReferenceType
address) {
         ConduitSelector cs = new DeferredConduitSelector(conduit) {
             @Override
-            public Conduit selectConduit(Message message) {
+            public synchronized Conduit selectConduit(Message message) {
                 Conduit conduit = null;
                 EndpointInfo endpointInfo = getEndpoint().getEndpointInfo();
                 org.apache.cxf.ws.addressing.EndpointReferenceType original = 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
(original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
Thu May  3 05:48:11 2007
@@ -105,14 +105,16 @@
         Identifier inSeqId = null;
         BigInteger inMessageNumber = null;
         
-        if (isApplicationMessage && !isPartialResponse) {
-                        
+        if (isApplicationMessage) {
             rmpsIn = (RMProperties)RMContextUtils.retrieveRMProperties(message, false);
-            
             if (null != rmpsIn && null != rmpsIn.getSequence()) {
                 inSeqId = rmpsIn.getSequence().getIdentifier();
                 inMessageNumber = rmpsIn.getSequence().getMessageNumber();
             }
+        }
+        
+        if (isApplicationMessage && !isPartialResponse) {
+      
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
             }
@@ -166,23 +168,30 @@
                              RMProperties rmpsOut, 
                              Identifier inSeqId, 
                              AttributedURI to) {
-
         for (DestinationSequence seq : destination.getAllSequences()) {
-            if (seq.sendAcknowledgement()
-                && ((seq.getAcksTo().getAddress().getValue().equals(RMUtils.getAddressingConstants()
-                    .getAnonymousURI()) && AbstractSequence.identifierEquals(seq.getIdentifier(),

-                                                                                inSeqId))
-                    || to.getValue().equals(seq.getAcksTo().getAddress().getValue()))) {
-                rmpsOut.addAck(seq);
-            } else if (LOG.isLoggable(Level.FINE)) {
-                if (!seq.sendAcknowledgement()) {
+            if (!seq.sendAcknowledgement()) {
+                if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("no need to add acknowledgements for sequence "
-                             + seq.getIdentifier().getValue());
-                } else {
+                        + seq.getIdentifier().getValue()); 
+                }
+                continue;
+            }
+            if (!to.getValue().equals(seq.getAcksTo().getAddress().getValue())) {
+                if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("sequences acksTo address (" + seq.getAcksTo().getAddress().getValue()
-                             + ") does not match to address (" + to.getValue() + ")");
+                        + ") does not match to address (" + to.getValue() + ")");
+                }
+                continue;
+            }
+            // there may be multiple sources with anonymous acksTo 
+            if (RMConstants.getAnonymousAddress().equals(seq.getAcksTo().getAddress().getValue())
+                && !AbstractSequence.identifierEquals(seq.getIdentifier(), inSeqId))
{                
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.fine("sequence identifier does not match inbound sequence identifier");
                 }
+                continue;
             }
+            rmpsOut.addAck(seq);
         }
 
         if (LOG.isLoggable(Level.FINE)) {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
(original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
Thu May  3 05:48:11 2007
@@ -46,6 +46,7 @@
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.policy.AssertionInfo;
@@ -237,7 +238,6 @@
     }
 
     private void serverResend(Message message) {
-        final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint();
         
         // get the message's to address
         
@@ -254,9 +254,11 @@
         final String address = to.getValue();
         LOG.fine("Resending to address: " + address);
         
+        final Endpoint reliableEndpoint = manager.getReliableEndpoint(message).getEndpoint();
+
         ConduitSelector cs = new DeferredConduitSelector() {
             @Override
-            public Conduit selectConduit(Message message) {
+            public synchronized Conduit selectConduit(Message message) {
                 Conduit conduit = null;
                 EndpointInfo endpointInfo = reliableEndpoint.getEndpointInfo();
                 org.apache.cxf.ws.addressing.EndpointReferenceType original = 
@@ -273,8 +275,16 @@
             }
         };
         
-        
-        Conduit c = cs.selectConduit(message);
+        cs.setEndpoint(reliableEndpoint);
+        Conduit c = cs.selectConduit(message);   
+        // REVISIT
+        // use application endpoint message observer instead?
+        c.setMessageObserver(new MessageObserver() {
+            public void onMessage(Message message) {
+                LOG.fine("Ignoring response to resent message.");
+            }
+            
+        });
         resend(c, message);
     }
     

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=534829&r1=534828&r2=534829
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Thu May  3 05:48:11 2007
@@ -989,7 +989,6 @@
         mf.verifyActions(expectedActions, true);
     }
   
-    @Ignore
     @Test
     public void testMultiClientOneway() throws Exception {
         if (!doTestMultiClientOneway) {
@@ -1164,7 +1163,6 @@
         }        
     }
     
-    @Ignore
     @Test
     public void testServerSideMessageLoss() throws Exception {
         if (!doTestServerSideMessageLoss) {



Mime
View raw message