cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject svn commit: r1142428 - in /cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm: ./ soap/
Date Sun, 03 Jul 2011 13:07:17 GMT
Author: dsosnoski
Date: Sun Jul  3 13:07:16 2011
New Revision: 1142428

URL: http://svn.apache.org/viewvc?rev=1142428&view=rev
Log:
LastMessage handling and other cleanup

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Sun Jul  3 13:07:16
2011
@@ -197,8 +197,9 @@ public class Destination extends Abstrac
         DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
         if (null != seq) {
-            seq.processingComplete(sequenceType.getMessageNumber());
-            seq.purgeAcknowledged(sequenceType.getMessageNumber());
+            long mn = sequenceType.getMessageNumber().longValue();
+            seq.processingComplete(mn);
+            seq.purgeAcknowledged(mn);
         }
     }
     

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Sun Jul
 3 13:07:16 2011
@@ -20,7 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.TimerTask;
@@ -144,14 +144,21 @@ public class DestinationSequence extends
             }
 
             if (!done) {
+                
+                // need new acknowledgement range
                 AcknowledgementRange range = RMUtils.getWSRMFactory()
                     .createSequenceAcknowledgementAcknowledgementRange();
                 range.setLower(messageNumber);
                 range.setUpper(messageNumber);
                 acknowledgement.getAcknowledgementRange().add(i, range);
+                if (acknowledgement.getAcknowledgementRange().size() > 1) {
+                    
+                    // acknowledge out-of-order at first opportunity
+                    scheduleImmediateAcknowledgement();
+                    
+                }
             }
             mergeRanges();
-            wakeupAll();
         }
         
         RMAssertion rma = RM10PolicyUtils.getRMAssertion(destination.getManager().getRMAssertion(),
message);
@@ -231,19 +238,22 @@ public class DestinationSequence extends
     boolean applyDeliveryAssurance(long mn, Message message) throws RMException {
         Continuation cont = getContinuation(message);
         DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
+        boolean canSkip = !da.isSetAtLeastOnce() && !da.isSetExactlyOnce();
         if (cont != null && da.isSetInOrder() && !cont.isNew()) {
-            return waitInQueue(mn, da.isSetAtLeastOnce() && da.isSetExactlyOnce(),
-                               message, cont);
+            return waitInQueue(mn, canSkip, message, cont);
         }
         if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn))
{            
+            
+            // acknowledge at first opportunity following duplicate message
+            scheduleImmediateAcknowledgement();
             org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
                 "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
             LOG.log(Level.INFO, msg.toString());
             throw new RMException(msg);
+            
         } 
         if (da.isSetInOrder()) {
-            return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
-                               message, cont);
+            return waitInQueue(mn, canSkip, message, cont);
         }
         return true;
     }
@@ -262,7 +272,7 @@ public class DestinationSequence extends
             // can process now if no other in process and this one is next
             if (inProcessNumber == 0) {
                 long diff = mn - highNumberCompleted;
-                if (diff == 1 || canSkip && diff > 0) {
+                if (diff == 1 || (canSkip && diff > 0)) {
                     inProcessNumber = mn;
                     return true;
                 }
@@ -318,9 +328,7 @@ public class DestinationSequence extends
         if (null == store) {
             return;
         }
-        Collection<Long> messageNrs = new ArrayList<Long>();
-        messageNrs.add(messageNr);
-        store.removeMessages(getIdentifier(), messageNrs, false);
+        store.removeMessages(getIdentifier(), Collections.singleton(new Long(messageNr)),
false);
     }
 
     /**

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java Sun Jul  3 13:07:16
2011
@@ -25,6 +25,7 @@ import javax.xml.namespace.QName;
 import org.w3c.dom.Element;
 
 import org.apache.cxf.ws.rm.v200702.AckRequestedType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -129,6 +130,15 @@ public interface EncoderDecoder {
     SequenceType decodeSequenceType(Element elem) throws JAXBException;
     
     /**
+     * Generates a CloseSequenceType if a SequenceType represents a last message state.
+     * 
+     * @param elem
+     * @return CloseSequenceType if last message state, else <code>null</code>
+     * @throws JAXBException
+     */
+    CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException;
+    
+    /**
      * Unmarshals a SequenceAcknowledgement, converting it if necessary to the internal form.
      * 
      * @param elem

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java Sun Jul
 3 13:07:16 2011
@@ -39,6 +39,7 @@ import org.apache.cxf.common.util.Packag
 import org.apache.cxf.helpers.DOMUtils;
 import org.apache.cxf.ws.addressing.Names;
 import org.apache.cxf.ws.rm.v200702.AckRequestedType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -114,6 +115,9 @@ public final class EncoderDecoder10AImpl
         if (null != seq) {
             LOG.log(Level.FINE, "encoding sequence into RM header");
             org.apache.cxf.ws.rm.v200502wsa15.SequenceType toseq = VersionTransformer.convert200502wsa15(seq);
+            if (rmps.isLastMessage()) {
+                toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502wsa15.SequenceType.LastMessage());
+            }
             JAXBElement element = RMUtils.getWSRM200502WSA200508Factory().createSequence(toseq);
             marshaller.marshal(element, header);
         } 
@@ -191,6 +195,21 @@ public final class EncoderDecoder10AImpl
             = unmarshaller.unmarshal(elem, org.apache.cxf.ws.rm.v200502wsa15.SequenceType.class);
         return VersionTransformer.convert(jaxbElement.getValue());
     }
+    
+    public CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException
{
+        Unmarshaller unmarshaller = getContext().createUnmarshaller();
+        JAXBElement<org.apache.cxf.ws.rm.v200502wsa15.SequenceType> jaxbElement
+            = unmarshaller.unmarshal(elem, org.apache.cxf.ws.rm.v200502wsa15.SequenceType.class);
+        org.apache.cxf.ws.rm.v200502wsa15.SequenceType seq = jaxbElement.getValue();
+        if (seq.isSetLastMessage()) {
+            CloseSequenceType close = new CloseSequenceType();
+            close.setIdentifier(VersionTransformer.convert(seq.getIdentifier()));
+            close.setLastMsgNumber(seq.getMessageNumber());
+            return close;
+        } else {
+            return null;
+        }
+    }
 
     public SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException
{
         Unmarshaller unmarshaller = getContext().createUnmarshaller();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java Sun Jul
 3 13:07:16 2011
@@ -39,6 +39,7 @@ import org.apache.cxf.common.util.Packag
 import org.apache.cxf.helpers.DOMUtils;
 import org.apache.cxf.ws.addressing.VersionTransformer.Names200408;
 import org.apache.cxf.ws.rm.v200702.AckRequestedType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -114,6 +115,9 @@ public final class EncoderDecoder10Impl 
         if (null != seq) {
             LOG.log(Level.FINE, "encoding sequence into RM header");
             org.apache.cxf.ws.rm.v200502.SequenceType toseq = VersionTransformer.convert200502(seq);
+            if (rmps.isLastMessage()) {
+                toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502.SequenceType.LastMessage());
+            }
             JAXBElement element = RMUtils.getWSRM200502Factory().createSequence(toseq);
             marshaller.marshal(element, header);
         } 
@@ -190,6 +194,21 @@ public final class EncoderDecoder10Impl 
             = unmarshaller.unmarshal(elem, org.apache.cxf.ws.rm.v200502.SequenceType.class);
         return VersionTransformer.convert(jaxbElement.getValue());
     }
+    
+    public CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException
{
+        Unmarshaller unmarshaller = getContext().createUnmarshaller();
+        JAXBElement<org.apache.cxf.ws.rm.v200502.SequenceType> jaxbElement
+            = unmarshaller.unmarshal(elem, org.apache.cxf.ws.rm.v200502.SequenceType.class);
+        org.apache.cxf.ws.rm.v200502.SequenceType seq = jaxbElement.getValue();
+        if (seq.isSetLastMessage()) {
+            CloseSequenceType close = new CloseSequenceType();
+            close.setIdentifier(VersionTransformer.convert(seq.getIdentifier()));
+            close.setLastMsgNumber(seq.getMessageNumber());
+            return close;
+        } else {
+            return null;
+        }
+    }
 
     public SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException
{
         Unmarshaller unmarshaller = getContext().createUnmarshaller();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java Sun Jul
 3 13:07:16 2011
@@ -39,6 +39,7 @@ import org.apache.cxf.common.util.Packag
 import org.apache.cxf.helpers.DOMUtils;
 import org.apache.cxf.ws.addressing.Names;
 import org.apache.cxf.ws.rm.v200702.AckRequestedType;
+import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
 import org.apache.cxf.ws.rm.v200702.CreateSequenceType;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -190,6 +191,10 @@ public final class EncoderDecoder11Impl 
         JAXBElement<SequenceType> jaxbElement = unmarshaller.unmarshal(elem, SequenceType.class);
         return jaxbElement.getValue();
     }
+    
+    public CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException
{
+        return null;
+    }
 
     public SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException
{
         Unmarshaller unmarshaller = getContext().createUnmarshaller();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Sun Jul  3 13:07:16 2011
@@ -90,7 +90,7 @@ public class Proxy {
         OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
             .getOperation(constants.getTerminateSequenceOperationName());
         
-        TerminateSequenceType ts = RMUtils.getWSRMFactory().createTerminateSequenceType();
+        TerminateSequenceType ts = new TerminateSequenceType();
         ts.setIdentifier(ss.getIdentifier());
         EncoderDecoder codec = reliableEndpoint.getEncoderDecoder();
         invoke(oi, new Object[] {codec.convertToSend(ts)}, null);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Sun Jul  3 13:07:16
2011
@@ -25,9 +25,13 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.wsdl.extensions.ExtensibilityElement;
+import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.namespace.QName;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
 
 import org.apache.cxf.binding.soap.SoapVersion;
 import org.apache.cxf.binding.soap.model.SoapBindingInfo;
@@ -74,6 +78,8 @@ public class RMEndpoint {
     private static final String CREATE_PART_NAME = "create";
     private static final String CREATE_RESPONSE_PART_NAME = "createResponse";
     private static final String TERMINATE_PART_NAME = "terminate";
+    
+    private static Schema rmSchema;
 
     private RMManager manager;
     private Endpoint applicationEndpoint;
@@ -274,6 +280,7 @@ public class RMEndpoint {
 
     void createService() {
         ServiceInfo si = new ServiceInfo();
+        si.setProperty(Schema.class.getName(), getSchema());
         serviceQName = new QName(encoderDecoder.getWSRMNamespace(), SERVICE_NAME);
         si.setName(serviceQName);
         
@@ -294,6 +301,27 @@ public class RMEndpoint {
         service.setInvoker(servant);
     }
 
+    private static synchronized Schema getSchema() {
+        if (rmSchema == null) {
+            try {
+                SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                javax.xml.transform.Source ad = new StreamSource(RMEndpoint.class
+                                             .getResource("/schemas/wsdl/addressing.xsd")
+                                             .openStream(),
+                                             "http://schemas.xmlsoap.org/ws/2004/08/addressing");
+                javax.xml.transform.Source rm = new StreamSource(RMEndpoint.class
+                                                                 .getResource("/schemas/wsdl/wsrm.xsd")
+                                                                 .openStream());
+                
+                javax.xml.transform.Source schemas[] = new javax.xml.transform.Source[] {ad,
rm};
+                rmSchema = factory.newSchema(schemas);
+            } catch (Exception ex) {
+                //ignore
+            }
+        }
+        return rmSchema;
+    }
+
     void createEndpoint(org.apache.cxf.transport.Destination d) {
         ServiceInfo si = service.getServiceInfo();
         buildBindingInfo(si);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Sun Jul  3 13:07:16
2011
@@ -461,7 +461,8 @@ public class RMManager implements Server
         
         LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
                 new Object[] {null == conduit ? "client" : "server", id});
-        RMEndpoint rme = createReliableEndpoint(endpoint, null);
+        EncoderDecoder codec = VersionTransformer.getEncoderDecoder(rmNamespace, rmAddressingNamespace);
+        RMEndpoint rme = createReliableEndpoint(endpoint, codec);
         rme.initialise(conduit, null, null);
         reliableEndpoints.put(endpoint, rme);
         SourceSequence css = null;

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Sun Jul  3
13:07:16 2011
@@ -55,8 +55,7 @@ public class RMOutInterceptor extends Ab
             return;
         }
        
-        AddressingPropertiesImpl maps =
-            RMContextUtils.retrieveMAPs(msg, false, true);
+        AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(msg, false, true);
         if (null == maps) {
             LogUtils.log(LOG, Level.WARNING, "MAPS_RETRIEVAL_FAILURE_MSG");
             return;
@@ -149,7 +148,6 @@ public class RMOutInterceptor extends Ab
 
                 // increase message number and store a sequence type object in
                 // context
-
                 seq.nextMessageNumber(inSeqId, inMessageNumber, isLastMessage);
                 
                 rmpsOut.setSequence(seq);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java Sun Jul  3 13:07:16
2011
@@ -34,6 +34,7 @@ public class RMProperties {
     private Collection<AckRequestedType> acksRequested;
     private CloseSequenceType closeSequence;
     private String namespaceURI;
+    private boolean lastMessage;
     
     public Collection<SequenceAcknowledgement> getAcks() {
         return acks;
@@ -51,6 +52,10 @@ public class RMProperties {
         return sequence;
     }
     
+    public boolean isLastMessage() {
+        return lastMessage;
+    }
+    
     public void setAcks(Collection<SequenceAcknowledgement> a) {
         // use threadsafe implementation for working copy, to avoid concurrent modifications
         acks = new CopyOnWriteArrayList<SequenceAcknowledgement>(a);
@@ -70,10 +75,11 @@ public class RMProperties {
     }
     
     public void setSequence(SourceSequence seq) {
-        SequenceType s = RMUtils.getWSRMFactory().createSequenceType();
+        SequenceType s = new SequenceType();
         s.setIdentifier(seq.getIdentifier());
-        s.setMessageNumber(seq.getCurrentMessageNr());   
+        s.setMessageNumber(seq.getCurrentMessageNr());
         setSequence(s);
+        lastMessage = seq.isLastMessage();
     }
     
     public void addAck(DestinationSequence seq) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Sun Jul  3 13:07:16
2011
@@ -57,6 +57,7 @@ public class Servant implements Invoker 
         reliableEndpoint = rme;
     }
     
+    
     public Object invoke(Exchange exchange, Object o) {
         LOG.fine("Invoking on RM Endpoint");
         OperationInfo oi = exchange.get(OperationInfo.class);
@@ -197,9 +198,7 @@ public class Servant implements Invoker 
         LOG.fine("Terminating sequence");
         
         EncoderDecoder codec = reliableEndpoint.getEncoderDecoder();
-        org.apache.cxf.ws.rm.v200502.TerminateSequenceType external =
-            (org.apache.cxf.ws.rm.v200502.TerminateSequenceType)getParameter(message);
-        TerminateSequenceType terminate = codec.convertReceivedTerminateSequence(external);
+        TerminateSequenceType terminate = codec.convertReceivedTerminateSequence(getParameter(message));
         
         // check if the terminated sequence was created in response to a a createSequence
         // request

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java?rev=1142428&r1=1142427&r2=1142428&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Sun
Jul  3 13:07:16 2011
@@ -291,6 +291,7 @@ public class RMSoapInterceptor extends A
                         LOG.log(Level.FINE, "decoding RM header {0}", localName);
                         if (RMConstants.SEQUENCE_NAME.equals(localName)) {
                             rmps.setSequence(codec.decodeSequenceType(elem));
+                            rmps.setCloseSequence(codec.decodeSequenceTypeCloseSequence(elem));
                         } else if (RMConstants.SEQUENCE_ACK_NAME.equals(localName)) {
                             acks.add(codec.decodeSequenceAcknowledgement(elem));
                         } else if (RMConstants.ACK_REQUESTED_NAME.equals(localName)) {



Mime
View raw message