cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject [2/2] git commit: [CXF-5636] Move AckRequested code to RMOutInterceptor, use Source configuration option as default to control with override in message property.
Date Thu, 01 May 2014 21:38:50 GMT
[CXF-5636] Move AckRequested code to RMOutInterceptor, use Source
configuration option as default to control with override in message
property.

Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e4801064
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e4801064
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e4801064

Branch: refs/heads/master
Commit: e4801064af6316d7632dccad4e4b7b121777488d
Parents: 2382e8c
Author: dsosnoski <dsosnoski@apache.org>
Authored: Fri May 2 09:19:24 2014 +1200
Committer: dsosnoski <dsosnoski@apache.org>
Committed: Fri May 2 09:19:24 2014 +1200

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/RMMessageConstants.java    |  4 ++
 .../org/apache/cxf/ws/rm/RMOutInterceptor.java  | 51 +++++++++++++++++++-
 .../cxf/ws/rm/soap/RMSoapOutInterceptor.java    | 26 ----------
 .../configuration/wsrm-manager-types.xsd        | 22 +++++++++
 .../org/apache/cxf/systest/ws/rm/atmostonce.xml |  2 +-
 5 files changed, 76 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e4801064/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
index 4012c5b..eb6789c 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
@@ -48,6 +48,10 @@ public final class RMMessageConstants {
     /** Client callback (must be instance of {@link MessageCallback}). */
     public static final String RM_CLIENT_CALLBACK = "org.apache.cxf.rm.clientCallback";
     
+    /** Mode for requesting acknowledgements ({@link org.apache.cxf.ws.rm.managerAckRequestMode}
value,
+     *  overrides SourcePolicy configuration). */
+    public static final String ACK_REQUEST_MODE = "org.apache.cxf.rm.ackRequestMode";
+    
     static final String RM_PROTOCOL_VARIATION = "org.apache.cxf.ws.rm.protocol";
 
     // keep this constant in the ws-rm package until it finds a general use outside of ws-rm

http://git-wip-us.apache.org/repos/asf/cxf/blob/e4801064/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
index 606db31..95eb200 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,8 +32,12 @@ import org.apache.cxf.phase.Phase;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.rm.manager.AckRequestModeType;
+import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.v200702.AckRequestedType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.v200702.SequenceType;
 
 /**
  * 
@@ -90,11 +95,18 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
             RMProperties rmpsIn = RMContextUtils.retrieveRMProperties(msg, false);
             if (null != rmpsIn && null != rmpsIn.getSequence()) {
                 inSeqId = rmpsIn.getSequence().getIdentifier();
+                
+                SourceSequence seq = rmpsIn.getSourceSequence();
+                SequenceType sequence = rmpsIn.getSequence();
+                if (seq == null || sequence == null) {
+                    LOG.warning("sequence not set for outbound message, skipped acknowledgement
request"); 
+                } else {
+                    addAckRequest(msg, rmpsIn, seq, sequence);
+                }
             }
         }
         
-        // add Acknowledgements (to application messages or explicitly 
-        // created Acknowledgement messages only)
+        // add Acknowledgements (to application messages or explicitly created Acknowledgement
messages only)
         boolean isAck = constants.getSequenceAckAction().equals(action);
         if (isApplicationMessage || isAck) {
             AttributedURIType to = maps.getTo();
@@ -113,6 +125,41 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
         
         assertReliability(msg);
     }
+
+    /**
+     * Add AcknowledgementRequested to message if needed. The AckRequest mode set either
in the message
+     * properties or in the source policy is used to determine whether AcknowledgementRequested
is always
+     * added, never added, or added only when we're waiting for the acknowledgement to a
previously-sent
+     * message.
+     *  
+     * @param msg
+     * @param rmpsIn
+     * @param seq
+     * @param sequence
+     */
+    protected void addAckRequest(Message msg, RMProperties rmpsIn, SourceSequence seq, SequenceType
sequence) {
+        AckRequestModeType mode = (AckRequestModeType)msg.get(RMMessageConstants.ACK_REQUEST_MODE);
+        if (mode == null) {
+            mode = AckRequestModeType.PENDING;
+            SourcePolicyType policy = getManager().getSourcePolicy();
+            if (policy.isSetAckRequestMode()) {
+                mode = policy.getAckRequestMode();
+            }
+        }
+        if (AckRequestModeType.ALWAYS == mode
+            || (mode == AckRequestModeType.PENDING && seq.needAcknowledge(rmpsIn.getMessageNumber())))
{
+            Collection<AckRequestedType> reqs = rmpsIn.getAcksRequested();
+            if (reqs == null) {
+                reqs = new ArrayList<AckRequestedType>();
+            }
+            Identifier identifier = new Identifier();
+            identifier.setValue(sequence.getIdentifier().getValue());
+            AckRequestedType ackRequest = new AckRequestedType();
+            ackRequest.setIdentifier(identifier);
+            reqs.add(ackRequest);
+            rmpsIn.setAcksRequested(reqs);
+        }
+    }
     
     void addAcknowledgements(Destination destination, RMProperties rmpsOut, Identifier inSeqId,
 AttributedURIType to) {
         for (DestinationSequence seq : destination.getAllSequences()) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/e4801064/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
index 7df0c8c..26342d4 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
@@ -20,8 +20,6 @@
 package org.apache.cxf.ws.rm.soap;
 
 import java.net.HttpURLConnection;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -55,10 +53,6 @@ import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.SequenceFault;
-import org.apache.cxf.ws.rm.SourceSequence;
-import org.apache.cxf.ws.rm.v200702.AckRequestedType;
-import org.apache.cxf.ws.rm.v200702.Identifier;
-import org.apache.cxf.ws.rm.v200702.SequenceType;
 
 /**
  * Protocol Handler responsible for {en|de}coding the RM 
@@ -133,26 +127,6 @@ public class RMSoapOutInterceptor extends AbstractSoapInterceptor {
         if (null == rmps) {
             return;
         }
-        
-        SourceSequence seq = rmps.getSourceSequence();
-        SequenceType sequence = rmps.getSequence();
-        if (seq == null || sequence == null) {
-            LOG.warning("sequence not set for outbound message, skipped acknowledgement request");

-        } else if (seq.needAcknowledge(rmps.getMessageNumber())) {
-            
-            // waiting for prior acknowledgments, add AckRequested
-            Collection<AckRequestedType> reqs = rmps.getAcksRequested();
-            if (reqs == null) {
-                reqs = new ArrayList<AckRequestedType>();
-            }
-            Identifier identifier = new Identifier();
-            identifier.setValue(sequence.getIdentifier().getValue());
-            AckRequestedType ackRequest = new AckRequestedType();
-            ackRequest.setIdentifier(identifier);
-            reqs.add(ackRequest);
-            rmps.setAcksRequested(reqs);
-        }
-        
         LOG.log(Level.FINE, "encoding RMPs in SOAP headers");
         try {
             

http://git-wip-us.apache.org/repos/asf/cxf/blob/e4801064/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
index 841135b..99b60c4 100644
--- a/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
+++ b/rt/ws/rm/src/main/resources/schemas/configuration/wsrm-manager-types.xsd
@@ -86,6 +86,15 @@
                 </xs:documentation>
             </xs:annotation>      
         </xs:attribute>
+        <xs:attribute name="ackRequestMode" type="tns:AckRequestModeType" use="optional"
default="pending">
+            <xs:annotation>
+                <xs:documentation>
+                    When AckRequested will be included in a message being sent. The default
value "pending"
+                    means AckRequested will only be included when a previously-send message
is waiting for
+                    acknowledgement.
+                </xs:documentation>
+            </xs:annotation>      
+        </xs:attribute>
     </xs:complexType>
     
     <xs:complexType name="DestinationPolicyType">
@@ -250,6 +259,19 @@
         </xs:simpleType>
       </xs:attribute>
     </xs:complexType>
+    
+    <xs:simpleType name="AckRequestModeType">
+      <xs:annotation>
+        <xs:documentation>
+          Mode controlling when AckRequested will be included in a message being sent.
+        </xs:documentation>
+      </xs:annotation>
+      <xs:restriction base="xs:NMTOKEN">
+        <xs:enumeration value="never" />
+        <xs:enumeration value="always" />
+        <xs:enumeration value="pending" />
+      </xs:restriction>
+    </xs:simpleType>
 
  
     <xs:element name="deliveryAssurance" type="tns:DeliveryAssuranceType"/>

http://git-wip-us.apache.org/repos/asf/cxf/blob/e4801064/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce.xml
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce.xml b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce.xml
index f725fa1..9b2308c 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce.xml
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce.xml
@@ -20,7 +20,7 @@
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager" xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
xsi:schemaLocation=" http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
     <import resource="rminterceptors.xml"/>
     <wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
-        <wsrm-policy:RMAssertion>
+        <wsrm-policy:RMAssertion ackRequestMode="never">
             <wsrm-policy:BaseRetransmissionInterval Milliseconds="60000"/>
             <wsrm-policy:AcknowledgementInterval Milliseconds="10000"/>
         </wsrm-policy:RMAssertion>


Mime
View raw message