cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject [1/2] git commit: CXF-5636 Add AckRequested to outbound header when messages are outstanding (depending on AckRequestMode)
Date Thu, 01 May 2014 21:38:49 GMT
Repository: cxf
Updated Branches:
  refs/heads/master d78f8c657 -> e4801064a


CXF-5636 Add AckRequested to outbound header when messages are outstanding (depending on AckRequestMode)


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2382e8c9
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2382e8c9
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2382e8c9

Branch: refs/heads/master
Commit: 2382e8c9753b2ec41a4c870e92828420ab4cac48
Parents: d78f8c6
Author: dsosnoski <dsosnoski@apache.org>
Authored: Sat Mar 22 14:03:11 2014 +1300
Committer: dsosnoski <dsosnoski@apache.org>
Committed: Fri May 2 09:14:19 2014 +1200

----------------------------------------------------------------------
 .../org/apache/cxf/ws/rm/RMOutInterceptor.java  |  6 ++---
 .../java/org/apache/cxf/ws/rm/RMProperties.java | 14 ++++++++++-
 .../org/apache/cxf/ws/rm/SourceSequence.java    | 20 +++++++++++++++
 .../cxf/ws/rm/soap/RMSoapOutInterceptor.java    | 26 ++++++++++++++++++++
 4 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/2382e8c9/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
index 344bb21..606db31 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
@@ -86,7 +86,6 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
         }
         
         Identifier inSeqId = null;
-
         if (isApplicationMessage) {
             RMProperties rmpsIn = RMContextUtils.retrieveRMProperties(msg, false);
             if (null != rmpsIn && null != rmpsIn.getSequence()) {
@@ -96,7 +95,8 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
         
         // add Acknowledgements (to application messages or explicitly 
         // created Acknowledgement messages only)
-        if (isApplicationMessage || constants.getSequenceAckAction().equals(action)) {
+        boolean isAck = constants.getSequenceAckAction().equals(action);
+        if (isApplicationMessage || isAck) {
             AttributedURIType to = maps.getTo();
             assert null != to;
             addAcknowledgements(destination, rmpsOut, inSeqId, to);
@@ -105,7 +105,7 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
                 msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
             }
         } 
-        if (constants.getSequenceAckAction().equals(action)
+        if (isAck || constants.getSequenceAckAction().equals(action)
             || (constants.getTerminateSequenceAction().equals(action)
                 && RM10Constants.NAMESPACE_URI.equals(rmNamespace))) {
             maps.setReplyTo(RMUtils.createNoneReference());

http://git-wip-us.apache.org/repos/asf/cxf/blob/2382e8c9/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
index 9740326..448bfc6 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
@@ -29,6 +29,8 @@ import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 import org.apache.cxf.ws.rm.v200702.SequenceType;
 
 public class RMProperties {
+    private SourceSequence sourceSequence;
+    private long messageNumber;
     private SequenceType sequence;
     private Collection<SequenceAcknowledgement> acks;
     private Collection<AckRequestedType> acksRequested;
@@ -52,6 +54,14 @@ public class RMProperties {
         return sequence;
     }
     
+    public long getMessageNumber() {
+        return messageNumber;
+    }
+    
+    public SourceSequence getSourceSequence() {
+        return sourceSequence;
+    }
+    
     public boolean isLastMessage() {
         return lastMessage;
     }
@@ -75,9 +85,11 @@ public class RMProperties {
     }
     
     public void setSequence(SourceSequence seq) {
+        sourceSequence = seq;
+        messageNumber = seq.getCurrentMessageNr();
         SequenceType s = new SequenceType();
         s.setIdentifier(seq.getIdentifier());
-        s.setMessageNumber(seq.getCurrentMessageNr());
+        s.setMessageNumber(messageNumber);
         setSequence(s);
         lastMessage = seq.isLastMessage();
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2382e8c9/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
index 170d78e..9224465 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/SourceSequence.java
@@ -117,6 +117,26 @@ public class SourceSequence extends AbstractSequence {
     }
 
     /**
+     * Returns true if any messages other than the number supplied are waiting for acknowledgment.
+     * 
+     * @param num message number to check
+     * @return true if all messages have been acknowledged.
+     */
+    public boolean needAcknowledge(long num) {
+        if (currentMessageNumber != num) {
+            return true;
+        }
+        if (acknowledgement.getAcknowledgementRange().size() == 0) {
+            return false;
+        }
+        if (acknowledgement.getAcknowledgementRange().size() == 1) {
+            AcknowledgementRange r = acknowledgement.getAcknowledgementRange().get(0);
+            return r.getLower().longValue() != 1 || r.getUpper().longValue() < (num -
1);
+        }
+        return true;
+    }
+
+    /**
      * Returns true if a last message had been sent for this sequence and if all
      * messages for this sequence have been acknowledged.
      * 

http://git-wip-us.apache.org/repos/asf/cxf/blob/2382e8c9/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
index 26342d4..7df0c8c 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
@@ -20,6 +20,8 @@
 package org.apache.cxf.ws.rm.soap;
 
 import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -53,6 +55,10 @@ import org.apache.cxf.ws.rm.RMConstants;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMProperties;
 import org.apache.cxf.ws.rm.SequenceFault;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.v200702.AckRequestedType;
+import org.apache.cxf.ws.rm.v200702.Identifier;
+import org.apache.cxf.ws.rm.v200702.SequenceType;
 
 /**
  * Protocol Handler responsible for {en|de}coding the RM 
@@ -127,6 +133,26 @@ public class RMSoapOutInterceptor extends AbstractSoapInterceptor {
         if (null == rmps) {
             return;
         }
+        
+        SourceSequence seq = rmps.getSourceSequence();
+        SequenceType sequence = rmps.getSequence();
+        if (seq == null || sequence == null) {
+            LOG.warning("sequence not set for outbound message, skipped acknowledgement request");

+        } else if (seq.needAcknowledge(rmps.getMessageNumber())) {
+            
+            // waiting for prior acknowledgments, add AckRequested
+            Collection<AckRequestedType> reqs = rmps.getAcksRequested();
+            if (reqs == null) {
+                reqs = new ArrayList<AckRequestedType>();
+            }
+            Identifier identifier = new Identifier();
+            identifier.setValue(sequence.getIdentifier().getValue());
+            AckRequestedType ackRequest = new AckRequestedType();
+            ackRequest.setIdentifier(identifier);
+            reqs.add(ackRequest);
+            rmps.setAcksRequested(reqs);
+        }
+        
         LOG.log(Level.FINE, "encoding RMPs in SOAP headers");
         try {
             


Mime
View raw message