cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject svn commit: r1326949 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/
Date Tue, 17 Apr 2012 06:54:09 GMT
Author: ay
Date: Tue Apr 17 06:54:08 2012
New Revision: 1326949

URL: http://svn.apache.org/viewvc?rev=1326949&view=rev
Log:
[CXF-4191] RM broken in synchronous Mode

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM10Constants.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Messages.properties Tue Apr 17 06:54:08
2012
@@ -26,6 +26,7 @@ SEQ_TERMINATION_FAILURE = Failed to term
 STANDALONE_ANON_ACKS_NOT_SUPPORTED = It is not possible to send out-of-band acknowledgments
to the anonymous address.\nAn acknowledgement will be piggybacked on the next response. 
 STANDALONE_CLOSE_SEQUENCE_NO_TARGET_MSG = No target address to send out-of-band close sequence
to.
 STANDALONE_CLOSE_SEQUENCE_ANON_TARGET_MSG = It is not possible to send an out-of-band close
sequence to the anonymous address.
+STANDALONE_ANON_TERMINATE_SEQUENCE_MSG = It is not possible to send out-of-band terminate
sequence message to the anonymous address.\nIt will be piggybacked on the next response. 
 
 POLICY_PROVIDER_CREATION_EXC = Failed to create provider for RM assertion.
 POLICY_REFERENCE_RESOLUTION_EXC = Policy reference {0} cannot be resolved.

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM10Constants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM10Constants.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM10Constants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM10Constants.java Tue Apr 17 06:54:08
2012
@@ -64,6 +64,9 @@ public final class RM10Constants extends
     public static final QName TERMINATE_SEQUENCE_QNAME =
         new QName(NAMESPACE_URI, "TerminateSequence");
     
+    public static final QName TERMINATE_SEQUENCE_ANONYMOUS_QNAME =
+        new QName(NAMESPACE_URI, "TerminateSequenceAnonymous");
+    
     public static final QName SEQUENCE_ACKNOWLEDGEMENT_QNAME =
         new QName(NAMESPACE_URI, "SequenceAcknowledgement");
     
@@ -206,6 +209,10 @@ public final class RM10Constants extends
         return TERMINATE_SEQUENCE_QNAME;
     }
     
+    public QName getTerminateSequenceAnonymousOperationName() {
+        return TERMINATE_SEQUENCE_ANONYMOUS_QNAME;
+    }
+    
     public QName getSequenceAckOperationName() {
         return SEQUENCE_ACKNOWLEDGEMENT_QNAME;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RM11Constants.java Tue Apr 17 06:54:08
2012
@@ -64,6 +64,9 @@ public final class RM11Constants extends
     public static final QName TERMINATE_SEQUENCE_QNAME =
         new QName(NAMESPACE_URI, "TerminateSequence");
     
+    public static final QName TERMINATE_SEQUENCE_ANONYMOUS_QNAME =
+        new QName(NAMESPACE_URI, "TerminateSequenceAnonymous");
+    
     public static final QName SEQUENCE_ACKNOWLEDGEMENT_QNAME =
         new QName(NAMESPACE_URI, "SequenceAcknowledgement");
     
@@ -220,6 +223,10 @@ public final class RM11Constants extends
         return TERMINATE_SEQUENCE_QNAME;
     }
     
+    public QName getTerminateSequenceAnonymousOperationName() {
+        return TERMINATE_SEQUENCE_ANONYMOUS_QNAME;
+    }
+    
     public QName getCloseSequenceOperationName() {
         return CLOSE_SEQUENCE_QNAME;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMConstants.java Tue Apr 17 06:54:08
2012
@@ -99,6 +99,8 @@ public abstract class RMConstants {
     
     public abstract QName getTerminateSequenceOperationName();
     
+    public abstract QName getTerminateSequenceAnonymousOperationName();
+    
     public abstract QName getSequenceAckOperationName();
     
     public abstract QName getAckRequestedOperationName();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Tue Apr 17 06:54:08
2012
@@ -467,6 +467,17 @@ public class RMEndpoint {
         partInfo.setElementQName(consts.getTerminateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(protocol.getCodec().getTerminateSequenceType());
+        
+        // for the TerminateSequence operation to an anonymous endpoint
+        operationInfo = ii.addOperation(consts.getTerminateSequenceAnonymousOperationName());
+        messageInfo = operationInfo.createMessage(consts.getTerminateSequenceAnonymousOperationName(),
+                                                  MessageInfo.Type.OUTPUT);
+        operationInfo.setOutput(messageInfo.getName().getLocalPart(), messageInfo);
+        partInfo = messageInfo.addMessagePart(TERMINATE_PART_NAME);
+        partInfo.setElementQName(consts.getTerminateSequenceOperationName());
+        partInfo.setElement(true);
+        partInfo.setTypeClass(protocol.getCodec().getTerminateSequenceType());
+        
     }
 
     void buildSequenceAckOperationInfo(InterfaceInfo ii, ProtocolVariation protocol) {
@@ -531,6 +542,11 @@ public class RMEndpoint {
             addAction(boi, consts.getTerminateSequenceAction());
             bi.addOperation(boi);
 
+            boi = bi.buildOperation(consts.getTerminateSequenceAnonymousOperationName(),
+                                    null, consts.getTerminateSequenceAnonymousOperationName().getLocalPart());
+            addAction(boi, consts.getTerminateSequenceAction());
+            bi.addOperation(boi);
+
             boi = bi.buildOperation(consts.getSequenceAckOperationName(), null, null);
             addAction(boi, consts.getSequenceAckAction());
             bi.addOperation(boi);
@@ -569,7 +585,9 @@ public class RMEndpoint {
         boi.addExtensor(soi);
 
         MessageInfo info = boi.getOperationInfo().getInput();
-        info.addExtensionAttribute(JAXWSAConstants.WSAW_ACTION_QNAME, action);
+        if (info != null) {
+            info.addExtensionAttribute(JAXWSAConstants.WSAW_ACTION_QNAME, action);
+        }
         
         info = boi.getOperationInfo().getOutput();
         if (info != null) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Tue Apr 17
06:54:08 2012
@@ -20,20 +20,32 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageContentsList;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.service.model.OperationInfo;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
 
 /**
  * 
@@ -157,6 +169,19 @@ public class RMOutInterceptor extends Ab
             }
         } else if (!MessageUtils.isRequestor(msg) && constants.getCreateSequenceAction().equals(action))
{
             maps.getAction().setValue(constants.getCreateSequenceResponseAction());
+        } else if (isPartialResponse && action == null
+            && isResponseToAction(msg, constants.getSequenceAckAction())) {
+            Collection<SequenceAcknowledgement> acks = rmpsIn.getAcks();
+            if (acks.size() == 1) {
+                SourceSequence ss = source.getSequence(acks.iterator().next().getIdentifier());
+                if (ss != null && ss.allAcknowledged()) {
+                    setAction(maps, constants.getTerminateSequenceAction());
+                    setTerminateSequence(msg, ss.getIdentifier(), protocol);
+                    msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
+                    // removing this sequence now. See the comment in SourceSequence.setAcknowledged()
+                    source.removeSequence(ss);
+                }
+            }
         }
         
         // add Acknowledgements (to application messages or explicitly 
@@ -166,13 +191,10 @@ public class RMOutInterceptor extends Ab
             assert null != to;
             addAcknowledgements(destination, rmpsOut, inSeqId, to);
             if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size()
> 0) {
-                AttributedURIType actionURI = new AttributedURIType();
-                actionURI.setValue(constants.getSequenceAckAction());
-                maps.setAction(actionURI);
+                setAction(maps, constants.getSequenceAckAction());
                 msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
             }
         } 
-        
         if (constants.getSequenceAckAction().equals(action)
             || constants.getTerminateSequenceAction().equals(action)) {
             maps.setReplyTo(RMUtils.createNoneReference());
@@ -229,4 +251,53 @@ public class RMOutInterceptor extends Ab
         }
         return FaultMode.CHECKED_APPLICATION_FAULT != mode;
     }
+
+    private boolean isResponseToAction(Message msg, String action) {
+        AddressingProperties inMaps = RMContextUtils.retrieveMAPs(msg, false, false);
+        String inAction = null;
+        if (null != inMaps.getAction()) {
+            inAction = inMaps.getAction().getValue();
+        }
+        return action.equals(inAction);
+    }
+    
+    private void setTerminateSequence(Message msg, Identifier identifier, ProtocolVariation
protocol) 
+        throws RMException {
+        TerminateSequenceType ts = new TerminateSequenceType();
+        ts.setIdentifier(identifier);
+        MessageContentsList contents = 
+            new MessageContentsList(new Object[]{protocol.getCodec().convertToSend(ts)});
+        msg.setContent(List.class, contents);
+
+        // create a new exchange for this output-only exchange
+        Exchange newex = new ExchangeImpl();
+        Exchange oldex = msg.getExchange();
+        
+        newex.put(Bus.class, oldex.getBus());
+        newex.put(Endpoint.class, oldex.getEndpoint());
+        newex.put(Service.class, oldex.getEndpoint().getService());
+        newex.put(Binding.class, oldex.getEndpoint().getBinding());
+        newex.setConduit(oldex.getConduit(msg));
+        newex.setDestination(oldex.getDestination());
+        
+        //Setup the BindingOperationInfo
+        RMEndpoint rmep = getManager().getReliableEndpoint(msg);
+        OperationInfo oi = rmep.getEndpoint(protocol).getEndpointInfo().getService().getInterface()
+            .getOperation(protocol.getConstants().getTerminateSequenceAnonymousOperationName());
+        BindingInfo bi = rmep.getBindingInfo(protocol);
+        BindingOperationInfo boi = bi.getOperation(oi);
+        
+        newex.put(BindingInfo.class, bi);
+        newex.put(BindingOperationInfo.class, boi);
+        newex.put(OperationInfo.class, boi.getOperationInfo());
+        
+        msg.setExchange(newex);
+        newex.setOutMessage(msg);
+    }
+
+    private static void setAction(AddressingProperties maps, String action) {
+        AttributedURIType actionURI = new AttributedURIType();
+        actionURI.setValue(action);
+        maps.setAction(actionURI);
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java Tue Apr 17 06:54:08
2012
@@ -144,10 +144,17 @@ public class SourceSequence extends Abst
         acknowledgement = a;
         source.getManager().getRetransmissionQueue().purgeAcknowledged(this);
         if (allAcknowledged()) {
-            RMEndpoint rme = source.getReliableEndpoint();
-            Proxy proxy = rme.getProxy();
-            proxy.terminate(this);
-            source.removeSequence(this);
+            if (null == target || RMUtils.getAddressingConstants().getAnonymousURI().equals(
+                target.getAddress().getValue())) {
+                LOG.log(Level.WARNING, "STANDALONE_ANON_TERMINATE_SEQUENCE_MSG");
+                // keep the sequence and let RMOutInterceptor remove it after building the
TS message
+                // if we remove the sequence here, RMOutInterceptor should check for a null
sequence
+            } else {
+                RMEndpoint rme = source.getReliableEndpoint();
+                Proxy proxy = rme.getProxy();
+                proxy.terminate(this);
+                source.removeSequence(this);
+            }
         }
     }
 

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Tue Apr 17 06:54:08
2012
@@ -368,7 +368,7 @@ public class RMEndpointTest extends Asse
 
         InterfaceInfo intf = si.getInterface();
 
-        assertEquals(7, intf.getOperations().size());
+        assertEquals(8, intf.getOperations().size());
 
         String ns = RM10Constants.NAMESPACE_URI;
         OperationInfo oi = intf.getOperation(new QName(ns, "CreateSequence"));
@@ -382,6 +382,10 @@ public class RMEndpointTest extends Asse
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());
 
+        oi = intf.getOperation(new QName(ns, "TerminateSequenceAnonymous"));
+        assertNotNull("No operation info.", oi);
+        assertTrue("Operation is oneway.", !oi.isOneWay());
+
         oi = intf.getOperation(new QName(ns, "SequenceAcknowledgement"));
         assertNotNull("No operation info.", oi);
         assertTrue("Operation is toway.", oi.isOneWay());

Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1326949&r1=1326948&r2=1326949&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Tue Apr 17 06:54:08 2012
@@ -503,6 +503,44 @@ public class SequenceTest extends Abstra
         verifyTwowayNonAnonymous();
     }
 
+    @Test
+    public void testTwowayAnonymousSequenceLength1() throws Exception {
+        init("org/apache/cxf/systest/ws/rm/seqlength1.xml");
+
+        String v = greeter.greetMe("once");
+        assertEquals("Unexpected response", "ONCE", v);
+        // outbound: CS, greetReq,     TS, SA
+        // inbound: CSR, greetResp+SA,   , TS
+
+        awaitMessages(4, 3);
+        
+        MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(),
+            inRecorder.getInboundMessages(), Names200408.WSA_NAMESPACE_NAME, RM10Constants.NAMESPACE_URI);
+
+        mf.verifyMessages(4, true);
+        String[] expectedActions = new String[] {RM10Constants.CREATE_SEQUENCE_ACTION, 
+                                                 GREETME_ACTION,
+                                                 RM10Constants.TERMINATE_SEQUENCE_ACTION,
+                                                 RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION};
+
+        mf.verifyActions(expectedActions, true);
+        mf.verifyMessageNumbers(new String[] {null, "1", null, null}, true);
+        mf.verifyLastMessage(new boolean[] {false, true, false, false}, true);
+        mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, true);
+
+        mf.verifyMessages(3, false);
+
+        expectedActions = new String[] {RM10Constants.CREATE_SEQUENCE_RESPONSE_ACTION, 
+                                        GREETME_RESPONSE_ACTION,
+                                        RM10Constants.TERMINATE_SEQUENCE_ACTION};
+
+        mf.verifyActions(expectedActions, false);
+        mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
+        mf.verifyLastMessage(new boolean[] {false, true, false}, false);
+        mf.verifyAcknowledgements(new boolean[] {false, true, false}, false);
+        
+    }
+    
     private void verifyTwowayNonAnonymous() throws Exception {
     
         // CreateSequence and three greetMe messages



Mime
View raw message