cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject svn commit: r1566555 [1/2] - in /cxf/trunk: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/io/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/ rt/ws/rm/src/main/java/or...
Date Mon, 10 Feb 2014 10:08:35 GMT
Author: dsosnoski
Date: Mon Feb 10 10:08:34 2014
New Revision: 1566555

URL: http://svn.apache.org/r1566555
Log:
CXF-4866, CXF-352 Restructure WS-RM so that updated WS-RM headers are
generated for retransmission and security is re-applied to the message.

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
      - copied, changed from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
      - copied, changed from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptorTest.java
Removed:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java
Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java
    cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java Mon Feb 10 10:08:34 2014
@@ -52,7 +52,7 @@ public class MessageSenderInterceptor ex
         message.getInterceptorChain().add(ending);
     }
     
-    public class MessageSenderEndingInterceptor extends AbstractPhaseInterceptor<Message> {
+    public static class MessageSenderEndingInterceptor extends AbstractPhaseInterceptor<Message> {
         public MessageSenderEndingInterceptor() {
             super(Phase.PREPARE_SEND_ENDING);
         }
@@ -66,7 +66,7 @@ public class MessageSenderInterceptor ex
         }
     }
     
-    private Conduit getConduit(Message message) {
+    public static Conduit getConduit(Message message) {
         Exchange exchange = message.getExchange();
         Conduit conduit = exchange.getConduit(message);
         if (conduit == null

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java Mon Feb 10 10:08:34 2014
@@ -575,6 +575,10 @@ public class CachedOutputStream extends 
     public void setOutputDir(File outputDir) throws IOException {
         this.outputDir = outputDir;
     }
+    
+    public long getThreshold() {
+        return threshold;
+    }
     public void setThreshold(long threshold) {
         this.threshold = threshold;
     }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Mon Feb 10 10:08:34 2014
@@ -107,7 +107,7 @@ public abstract class AbstractRMIntercep
     
     /**
      * Asserts all RMAssertion assertions for the current message, regardless their attributes
-     * (if there is more thsn one we have ensured that they are all supported by considering
+     * (if there is more than one we have ensured that they are all supported by considering
      * e.g. the minimum acknowledgment interval).
      * @param message the current message
      */

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Mon Feb 10 10:08:34 2014
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -33,7 +34,6 @@ import org.apache.cxf.common.logging.Log
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
-import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -167,7 +167,7 @@ public class DestinationSequence extends
             RMMessage msg = null;
             if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
                 msg = new RMMessage();
-                msg.setContent((CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT));
+                msg.setContent((InputStream)message.get(RMMessageConstants.SAVED_CONTENT));
                 msg.setMessageNumber(st.getMessageNumber());
             }
             store.persistIncoming(this, msg);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java Mon Feb 10 10:08:34 2014
@@ -19,10 +19,17 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.util.Collection;
+
+import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
 import javax.xml.namespace.QName;
 
+import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
 
 import org.apache.cxf.ws.rm.v200702.AckRequestedType;
 import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
@@ -34,70 +41,131 @@ import org.apache.cxf.ws.rm.v200702.Sequ
 import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
 
 /**
- * Interface for converting WS-ReliableMessaging structures to and from XML. Implementations of this interface
- * provide version-specific encoding and decoding.
+ * Base class for converting WS-ReliableMessaging structures to and from XML. Subclasses provide version-specific
+ * encoding and decoding.
  */
-public interface EncoderDecoder {
+public abstract class EncoderDecoder {
+
+    /**
+     * Get context for JAXB marshalling/unmarshalling.
+     * 
+     * @return context
+     * @throws JAXBException
+     */
+    protected abstract JAXBContext getContext() throws JAXBException;
+    
+    /**
+     * Add WS-RM namespace declaration to element.
+     * 
+     * @param element
+     */
+    protected abstract void addNamespaceDecl(Element element);
     
     /**
      * Get the WS-ReliableMessaging namespace used by this encoder/decoder.
      * 
      * @return URI
      */
-    String getWSRMNamespace();
+    public abstract String getWSRMNamespace();
     
     /**
      * Get the WS-Addressing namespace used by this encoder/decoder.
      * 
      * @return URI
      */
-    String getWSANamespace();
+    public abstract String getWSANamespace();
     
     /**
      * Get the WS-ReliableMessaging constants used by this encoder/decoder.
      * 
      * @return
      */
-    RMConstants getConstants();
+    public abstract RMConstants getConstants();
     
     /**
      * Get the class used for the CreateSequenceType.
      * 
      * @return class
      */
-    Class<?> getCreateSequenceType();
+    public abstract Class<?> getCreateSequenceType();
     
     /**
      * Get the class used for the CreateSequenceResponseType.
      * 
      * @return class
      */
-    Class<?> getCreateSequenceResponseType();
+    public abstract Class<?> getCreateSequenceResponseType();
     
     /**
      * Get the class used for the TerminateSequenceType.
      * 
      * @return class
      */
-    Class<?> getTerminateSequenceType();
+    public abstract Class<?> getTerminateSequenceType();
     
     /**
      * Get the class used for the TerminateSequenceResponseType.
      * 
      * @return class
      */
-    Class<?> getTerminateSequenceResponseType();
+    public abstract Class<?> getTerminateSequenceResponseType();
     
     /**
-     * Builds an element containing WS-RM headers. This adds the appropriate WS-RM and WS-A namespace
-     * declarations to the element, and then adds any WS-RM headers set in the supplied properties as child
-     * elements.
+     * Insert WS-RM headers into a SOAP message. This adds the appropriate WS-RM namespace declaration to the
+     * SOAP:Header element (which must be present), and then adds any WS-RM headers set in the supplied properties as
+     * child elements.
      * 
      * @param rmps
-     * @param qname constructed element name
-     * @return element
+     * @param doc
+     * @return <code>true</code> if headers added, <code>false</code> if not
+     */
+    public boolean insertHeaders(RMProperties rmps, Document doc) throws JAXBException {
+        
+        // check if there's anything to insert
+        SequenceType seq = rmps.getSequence();
+        Collection<SequenceAcknowledgement> acks = rmps.getAcks();
+        Collection<AckRequestedType> reqs = rmps.getAcksRequested();
+        if (seq == null && acks == null && reqs == null) {
+            return false;
+        }
+        
+        // get the SOAP:Header element
+        NodeList nodes = doc.getDocumentElement().getChildNodes();
+        Element header = null;
+        for (int i = 0; i < nodes.getLength(); i++) {
+            Node node = nodes.item(i);
+            if (node.getNodeType() == Node.ELEMENT_NODE && "Header".equals(node.getLocalName())) {
+                header = (Element)node;
+                break;
+            }
+        }
+        if (header == null) {
+            throw new JAXBException("No SOAP:Header element in message");
+        }
+        
+        // add WSRM namespace declaration to header, instead of repeating in each individual child node
+        addNamespaceDecl(header);
+        
+        // build individual headers
+        Marshaller marshaller = getContext().createMarshaller();
+        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
+        buildHeaders(seq, acks, reqs, rmps.isLastMessage(), header, marshaller);
+        return true;
+    }
+
+    /**
+     * Build all required headers, using the correct protocol variation.
+     * 
+     * @param seq
+     * @param acks
+     * @param reqs
+     * @param last
+     * @param header
+     * @param marshaller
+     * @throws JAXBException
      */
-    Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException;
+    protected abstract void buildHeaders(SequenceType seq, Collection<SequenceAcknowledgement> acks,
+        Collection<AckRequestedType> reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException;
     
     /**
      * Builds an element containing a WS-RM Fault. This adds the appropriate WS-RM namespace declaration to
@@ -107,7 +175,7 @@ public interface EncoderDecoder {
      * @param qname constructed element name
      * @return element
      */
-    Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException;
+    public abstract Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException;
     
     /**
      * Marshals a SequenceAcknowledgement to the appropriate external form.
@@ -116,7 +184,7 @@ public interface EncoderDecoder {
      * @return element
      * @throws JAXBException
      */
-    Element encodeSequenceAcknowledgement(SequenceAcknowledgement ack) throws JAXBException;
+    public abstract Element encodeSequenceAcknowledgement(SequenceAcknowledgement ack) throws JAXBException;
     
     /**
      * Marshals an Identifier to the appropriate external form.
@@ -125,7 +193,7 @@ public interface EncoderDecoder {
      * @return element
      * @throws JAXBException
      */
-    Element encodeIdentifier(Identifier id) throws JAXBException;
+    public abstract Element encodeIdentifier(Identifier id) throws JAXBException;
     
     /**
      * Unmarshals a SequenceType, converting it if necessary to the internal form.
@@ -134,7 +202,7 @@ public interface EncoderDecoder {
      * @return
      * @throws JAXBException
      */
-    SequenceType decodeSequenceType(Element elem) throws JAXBException;
+    public abstract SequenceType decodeSequenceType(Element elem) throws JAXBException;
     
     /**
      * Generates a CloseSequenceType if a SequenceType represents a last message state.
@@ -143,7 +211,7 @@ public interface EncoderDecoder {
      * @return CloseSequenceType if last message state, else <code>null</code>
      * @throws JAXBException
      */
-    CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException;
+    public abstract CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException;
     
     /**
      * Unmarshals a SequenceAcknowledgement, converting it if necessary to the internal form.
@@ -152,7 +220,7 @@ public interface EncoderDecoder {
      * @return
      * @throws JAXBException
      */
-    SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException;
+    public abstract SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException;
     
     /**
      * Unmarshals a AckRequestedType, converting it if necessary to the internal form.
@@ -161,7 +229,7 @@ public interface EncoderDecoder {
      * @return
      * @throws JAXBException
      */
-    AckRequestedType decodeAckRequestedType(Element elem) throws JAXBException;
+    public abstract AckRequestedType decodeAckRequestedType(Element elem) throws JAXBException;
     
     /**
      * Convert a CreateSequence message to the correct format for transmission.
@@ -169,7 +237,7 @@ public interface EncoderDecoder {
      * @param create
      * @return converted
      */
-    Object convertToSend(CreateSequenceType create);
+    public abstract Object convertToSend(CreateSequenceType create);
     
     /**
      * Convert a CreateSequenceResponse message to the correct format for transmission.
@@ -177,7 +245,7 @@ public interface EncoderDecoder {
      * @param create
      * @return converted
      */
-    Object convertToSend(CreateSequenceResponseType create);
+    public abstract Object convertToSend(CreateSequenceResponseType create);
     
     /**
      * Convert a TerminateSequence message to the correct format for transmission.
@@ -185,7 +253,7 @@ public interface EncoderDecoder {
      * @param term
      * @return converted
      */
-    Object convertToSend(TerminateSequenceType term);
+    public abstract Object convertToSend(TerminateSequenceType term);
     
     /**
      * Convert a received TerminateSequence message to internal form.
@@ -193,7 +261,7 @@ public interface EncoderDecoder {
      * @param term
      * @return converted
      */
-    TerminateSequenceType convertReceivedTerminateSequence(Object term);
+    public abstract TerminateSequenceType convertReceivedTerminateSequence(Object term);
     
     /**
      * Convert a received CreateSequence message to internal form.
@@ -201,7 +269,7 @@ public interface EncoderDecoder {
      * @param create
      * @return converted
      */
-    CreateSequenceType convertReceivedCreateSequence(Object create);
+    public abstract CreateSequenceType convertReceivedCreateSequence(Object create);
     
     /**
      * Convert a received CreateSequenceResponse message to internal form.
@@ -209,5 +277,5 @@ public interface EncoderDecoder {
      * @param create
      * @return converted
      */
-    CreateSequenceResponseType convertReceivedCreateSequenceResponse(Object create);
+    public abstract CreateSequenceResponseType convertReceivedCreateSequenceResponse(Object create);
 }
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java Mon Feb 10 10:08:34 2014
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -51,11 +52,11 @@ import org.apache.cxf.ws.rm.v200702.Term
  * WS-ReliableMessaging 1.0 encoding and decoding. This converts between the standard WS-RM objects and the
  * 1.0 representation using the WS-Addressing recommendation 200508 namespace.
  */
-public final class EncoderDecoder10AImpl implements EncoderDecoder {
+public final class EncoderDecoder10AImpl extends EncoderDecoder {
     
     public static final EncoderDecoder10AImpl INSTANCE = new EncoderDecoder10AImpl();
 
-    private static JAXBContext jaxbContext;
+    private static AtomicReference<JAXBContext> jaxbContextReference = new AtomicReference<JAXBContext>();
 
     private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder10AImpl.class);
     
@@ -90,50 +91,47 @@ public final class EncoderDecoder10AImpl
         return null;
     }
 
-    private static JAXBContext getContext() throws JAXBException {
-        synchronized (EncoderDecoder10AImpl.class) {
-            if (jaxbContext == null) {
-                Class<?> clas = RMUtils.getWSRM200502WSA200508Factory().getClass();
-                jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
-                    clas.getClassLoader());
+    protected JAXBContext getContext() throws JAXBException {
+        JAXBContext jaxbContext = jaxbContextReference.get();
+        if (jaxbContext == null) {
+            synchronized (EncoderDecoder10AImpl.class) {
+                jaxbContext = jaxbContextReference.get();
+                if (jaxbContext == null) {
+                    Class<?> clas = RMUtils.getWSRM200502WSA200508Factory().getClass();
+                    jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
+                                                          clas.getClassLoader());
+                    jaxbContextReference.set(jaxbContext);
+                }
             }
         }
         return jaxbContext;
     }
     
-    public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException {
-        
-        Document doc = DOMUtils.createDocument();
-        Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart());
-        // add WSRM namespace declaration to header, instead of
-        // repeating in each individual child node
-        Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", 
+    protected void addNamespaceDecl(Element element) {
+        Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", 
             "xmlns:" + RMConstants.NAMESPACE_PREFIX);
         attr.setValue(RM10Constants.NAMESPACE_URI);
-        header.setAttributeNodeNS(attr);
+        element.setAttributeNodeNS(attr);
+    }
 
-        Marshaller marshaller = getContext().createMarshaller();
-        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-       
-        SequenceType seq = rmps.getSequence();
+    protected void buildHeaders(SequenceType seq, Collection<SequenceAcknowledgement> acks,
+        Collection<AckRequestedType> reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException {
         if (null != seq) {
             LOG.log(Level.FINE, "encoding sequence into RM header");
             org.apache.cxf.ws.rm.v200502wsa15.SequenceType toseq = VersionTransformer.convert200502wsa15(seq);
-            if (rmps.isLastMessage()) {
+            if (last) {
                 toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502wsa15.SequenceType.LastMessage());
             }
             JAXBElement<org.apache.cxf.ws.rm.v200502wsa15.SequenceType> element 
                 = RMUtils.getWSRM200502WSA200508Factory().createSequence(toseq);
             marshaller.marshal(element, header);
         } 
-        Collection<SequenceAcknowledgement> acks = rmps.getAcks();
         if (null != acks) {
             LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header");
             for (SequenceAcknowledgement ack : acks) {
                 marshaller.marshal(VersionTransformer.convert200502wsa15(ack), header);
             }
         }
-        Collection<AckRequestedType> reqs = rmps.getAcksRequested();
         if (null != reqs) {
             LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header");
             for (AckRequestedType req : reqs) {
@@ -141,7 +139,6 @@ public final class EncoderDecoder10AImpl
                     .createAckRequested(VersionTransformer.convert200502wsa15(req)), header);
             }
         }
-        return header;
     }
 
     public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java Mon Feb 10 10:08:34 2014
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -51,11 +52,11 @@ import org.apache.cxf.ws.rm.v200702.Term
  * WS-ReliableMessaging 1.0 encoding and decoding. This converts between the standard WS-RM objects and the
  * 1.0 representation using the WS-Addressing 200408 namespace specified in the WS-RM 1.0 recommendation.
  */
-public final class EncoderDecoder10Impl implements EncoderDecoder {
+public final class EncoderDecoder10Impl extends EncoderDecoder {
     
     public static final EncoderDecoder10Impl INSTANCE = new EncoderDecoder10Impl();
 
-    private static JAXBContext jaxbContext;
+    private static AtomicReference<JAXBContext> jaxbContextReference = new AtomicReference<JAXBContext>();
 
     private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder10Impl.class);
     
@@ -90,50 +91,46 @@ public final class EncoderDecoder10Impl 
         return null;
     }
 
-    private static JAXBContext getContext() throws JAXBException {
-        synchronized (EncoderDecoder10Impl.class) {
-            if (jaxbContext == null) {
-                Class<?> clas = RMUtils.getWSRM200502Factory().getClass();
-                jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
-                    clas.getClassLoader());
+    protected JAXBContext getContext() throws JAXBException {
+        JAXBContext jaxbContext = jaxbContextReference.get();
+        if (jaxbContext == null) {
+            synchronized (EncoderDecoder10Impl.class) {
+                jaxbContext = jaxbContextReference.get();
+                if (jaxbContext == null) {
+                    Class<?> clas = RMUtils.getWSRM200502Factory().getClass();
+                    jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
+                                                          clas.getClassLoader());
+                    jaxbContextReference.set(jaxbContext);
+                }
             }
         }
         return jaxbContext;
     }
     
-    public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException {
-        
-        Document doc = DOMUtils.createDocument();
-        Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart());
-        // add WSRM namespace declaration to header, instead of
-        // repeating in each individual child node
-        Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", 
+    protected void addNamespaceDecl(Element element) {
+        Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", 
             "xmlns:" + RMConstants.NAMESPACE_PREFIX);
         attr.setValue(RM10Constants.NAMESPACE_URI);
-        header.setAttributeNodeNS(attr);
+        element.setAttributeNodeNS(attr);
+    }
 
-        Marshaller marshaller = getContext().createMarshaller();
-        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-       
-        SequenceType seq = rmps.getSequence();
+    protected void buildHeaders(SequenceType seq, Collection<SequenceAcknowledgement> acks,
+        Collection<AckRequestedType> reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException {
         if (null != seq) {
             LOG.log(Level.FINE, "encoding sequence into RM header");
             org.apache.cxf.ws.rm.v200502.SequenceType toseq = VersionTransformer.convert200502(seq);
-            if (rmps.isLastMessage()) {
+            if (last) {
                 toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502.SequenceType.LastMessage());
             }
-            JAXBElement<org.apache.cxf.ws.rm.v200502.SequenceType> element 
-                = RMUtils.getWSRM200502Factory().createSequence(toseq);
+            JAXBElement<?> element = RMUtils.getWSRM200502Factory().createSequence(toseq);
             marshaller.marshal(element, header);
         } 
-        Collection<SequenceAcknowledgement> acks = rmps.getAcks();
         if (null != acks) {
             LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header");
             for (SequenceAcknowledgement ack : acks) {
                 marshaller.marshal(VersionTransformer.convert200502(ack), header);
             }
         }
-        Collection<AckRequestedType> reqs = rmps.getAcksRequested();
         if (null != reqs) {
             LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header");
             for (AckRequestedType req : reqs) {
@@ -141,7 +138,6 @@ public final class EncoderDecoder10Impl 
                     .createAckRequested(VersionTransformer.convert200502(req)), header);
             }
         }
-        return header;
     }
 
     public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java Mon Feb 10 10:08:34 2014
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -49,14 +50,14 @@ import org.apache.cxf.ws.rm.v200702.Sequ
 import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
 
 /**
- * WS-ReliableMessaging 1.1 encoding and decoding. This just works with the standard internal form of the
+ * WS-ReliableMessaging 1.1/1.2 encoding and decoding. This just works with the standard internal form of the
  * WS-RM data structures.
  */
-public final class EncoderDecoder11Impl implements EncoderDecoder {
+public final class EncoderDecoder11Impl extends EncoderDecoder {
     
     public static final EncoderDecoder11Impl INSTANCE = new EncoderDecoder11Impl();
 
-    private static JAXBContext jaxbContext;
+    private static AtomicReference<JAXBContext> jaxbContextReference = new AtomicReference<JAXBContext>();
 
     private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder11Impl.class);
     
@@ -91,65 +92,58 @@ public final class EncoderDecoder11Impl 
         return org.apache.cxf.ws.rm.v200702.TerminateSequenceResponseType.class;
     }
 
-    private static JAXBContext getContext() throws JAXBException {
-        synchronized (EncoderDecoder11Impl.class) {
-            if (jaxbContext == null) {
-                Class<?> clas = RMUtils.getWSRMFactory().getClass();
-                jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
-                    clas.getClassLoader());
+    protected JAXBContext getContext() throws JAXBException {
+        JAXBContext jaxbContext = jaxbContextReference.get();
+        if (jaxbContext == null) {
+            synchronized (EncoderDecoder11Impl.class) {
+                jaxbContext = jaxbContextReference.get();
+                if (jaxbContext == null) {
+                    Class<?> clas = RMUtils.getWSRMFactory().getClass();
+                    jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas),
+                                                          clas.getClassLoader());
+                    jaxbContextReference.set(jaxbContext);
+                }
             }
         }
         return jaxbContext;
     }
-    
-    public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException {
-        
-        Document doc = DOMUtils.createDocument();
-        Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart());
-        // add WSRM namespace declaration to header, instead of
-        // repeating in each individual child node
-        Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", 
-            "xmlns:" + RMConstants.NAMESPACE_PREFIX);
-        attr.setValue(RM10Constants.NAMESPACE_URI);
-        header.setAttributeNodeNS(attr);
 
-        Marshaller marshaller = getContext().createMarshaller();
-        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
+    protected void buildHeaders(SequenceType seq, Collection<SequenceAcknowledgement> acks,
+        Collection<AckRequestedType> reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException {
        
-        SequenceType seq = rmps.getSequence();
         if (null != seq) {
             LOG.log(Level.FINE, "encoding sequence into RM header");
             JAXBElement<SequenceType> element = RMUtils.getWSRMFactory().createSequence(seq);
             marshaller.marshal(element, header);
         } 
-        Collection<SequenceAcknowledgement> acks = rmps.getAcks();
         if (null != acks) {
             LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header");
             for (SequenceAcknowledgement ack : acks) {
                 marshaller.marshal(ack, header);
             }
         }
-        Collection<AckRequestedType> reqs = rmps.getAcksRequested();
         if (null != reqs) {
             LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header");
             for (AckRequestedType req : reqs) {
                 marshaller.marshal(RMUtils.getWSRMFactory().createAckRequested(req), header);
             }
         }
-        return header;
+    }
+
+    protected void addNamespaceDecl(Element element) {
+        Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", 
+            "xmlns:" + RMConstants.NAMESPACE_PREFIX);
+        attr.setValue(RM10Constants.NAMESPACE_URI);
+        element.setAttributeNodeNS(attr);
     }
 
     public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException {
         
         Document doc = DOMUtils.createDocument();
         Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart());
-        // add WSRM namespace declaration to header, instead of
-        // repeating in each individual child node
         
-        Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", 
-            "xmlns:" + RMConstants.NAMESPACE_PREFIX);
-        attr.setValue(RM11Constants.NAMESPACE_URI);
-        header.setAttributeNodeNS(attr);
+        // add WSRM namespace declaration to header, instead of repeating in each individual child node
+        addNamespaceDecl(header);
 
         Marshaller marshaller = getContext().createMarshaller();
         marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java Mon Feb 10 10:08:34 2014
@@ -58,7 +58,7 @@ public class RMCaptureInInterceptor exte
 
                     message.setContent(InputStream.class, saved.getInputStream());
                     LOG.fine("Capturing the original RM message");
-                    message.put(RMMessageConstants.SAVED_CONTENT, saved);
+                    message.put(RMMessageConstants.SAVED_CONTENT, saved.getInputStream());
                 } catch (Exception e) {
                     throw new Fault(e);
                 }

Copied: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java (from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java)
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java?p2=cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java&p1=cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java&r1=1566482&r2=1566555&rev=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java Mon Feb 10 10:08:34 2014
@@ -19,22 +19,29 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.soap.SOAPException;
+import javax.xml.soap.SOAPMessage;
+
 import org.apache.cxf.Bus;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageContentsList;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.Phase;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.BindingOperationInfo;
@@ -42,7 +49,8 @@ import org.apache.cxf.service.model.Oper
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
-import org.apache.cxf.ws.addressing.MAPAggregator;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
+import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
@@ -50,12 +58,13 @@ import org.apache.cxf.ws.rm.v200702.Term
 /**
  * 
  */
-public class RMOutInterceptor extends AbstractRMInterceptor<Message>  {
+public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
     
-    private static final Logger LOG = LogUtils.getL7dLogger(RMOutInterceptor.class);
+    private static final Logger LOG = LogUtils.getL7dLogger(RMCaptureOutInterceptor.class);
  
-    public RMOutInterceptor() {
-        addAfter(MAPAggregator.class.getName());
+    public RMCaptureOutInterceptor() {
+        super(Phase.POST_PROTOCOL);
+        addBefore(RMOutInterceptor.class.getName());
     }
     
     protected void handle(Message msg) throws SequenceFault, RMException {  
@@ -84,7 +93,6 @@ public class RMOutInterceptor extends Ab
         ProtocolVariation protocol = ProtocolVariation.findVariant(rmNamespace, wsaNamespace);
         RMContextUtils.setProtocolVariation(msg, protocol);
         maps.exposeAs(wsaNamespace);
-        Destination destination = getManager().getDestination(msg);
 
         String action = null;
         if (null != maps.getAction()) {
@@ -100,10 +108,6 @@ public class RMOutInterceptor extends Ab
         RMConstants constants = protocol.getConstants();
         boolean isLastMessage = constants.getCloseSequenceAction().equals(action);
         
-        if (isApplicationMessage && !isPartialResponse) {
-            addRetransmissionInterceptor(msg);
-        }
-        
         RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true);
         if (null == rmpsOut) {
             rmpsOut = new RMProperties();
@@ -182,37 +186,57 @@ public class RMOutInterceptor extends Ab
             }
         }
         
-        // add Acknowledgements (to application messages or explicitly 
-        // created Acknowledgement messages only)
-        if (isApplicationMessage || constants.getSequenceAckAction().equals(action)) {
-            AttributedURIType to = maps.getTo();
-            assert null != to;
-            addAcknowledgements(destination, rmpsOut, inSeqId, to);
-            if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size() > 0) {
-                setAction(maps, constants.getSequenceAckAction());
-                msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
-            }
-        } 
-        if (constants.getSequenceAckAction().equals(action)
-            || constants.getTerminateSequenceAction().equals(action)) {
-            maps.setReplyTo(RMUtils.createNoneReference());
+        // capture message if retranmission possible
+        if (isApplicationMessage && !isPartialResponse) {
+            captureMessage(msg);
+            getManager().initializeInterceptorChain(msg);
         }
-        
-        assertReliability(msg);
     }
 
-    private void addRetransmissionInterceptor(Message msg) {
-        RetransmissionInterceptor ri = new RetransmissionInterceptor();
-        ri.setManager(getManager());
-        // TODO:
-        // On the server side: If a fault occurs after this interceptor we will switch 
-        // interceptor chains (if this is not already a fault message) and therefore need to 
-        // make sure the retransmission interceptor is added to the fault chain
-        // 
-        msg.getInterceptorChain().add(ri);
-        LOG.fine("Added RetransmissionInterceptor to chain.");
-        
-        getManager().getRetransmissionQueue().start();
+    private void captureMessage(Message message) {
+        SOAPMessage content = message.getContent(SOAPMessage.class);
+        try {
+            LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
+            content.writeTo(bos);
+            bos.close();
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Captured message: " + bos.toString("UTF-8"));
+            }
+            ByteArrayInputStream bis = bos.createInputStream();
+            message.put(RMMessageConstants.SAVED_CONTENT, bis);
+            RMManager manager = getManager();
+            manager.getRetransmissionQueue().start();
+            manager.getRetransmissionQueue().addUnacknowledged(message);
+            
+            RMStore store = manager.getStore();
+            if (null != store) {
+                try {
+                    Source s = manager.getSource(message);
+                    RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+                    Identifier sid = rmps.getSequence().getIdentifier();
+                    SourceSequence ss = s.getSequence(sid);
+                    RMMessage msg = new RMMessage();
+                    msg.setMessageNumber(rmps.getSequence().getMessageNumber());
+                    if (!MessageUtils.isRequestor(message)) {
+                        AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true);
+                        if (null != maps && null != maps.getTo()) {
+                            msg.setTo(maps.getTo().getValue());
+                        }
+                    }
+                    msg.setContent(bis);
+                    store.persistOutgoing(ss, msg);
+                } catch (RMException e) {
+                    // ignore
+                } 
+            }
+            
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (SOAPException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
     }
 
     private String getAddressingNamespace(AddressingProperties maps) {
@@ -223,47 +247,6 @@ public class RMOutInterceptor extends Ab
         return wsaNamespace;
     }
     
-    void addAcknowledgements(Destination destination, 
-                             RMProperties rmpsOut, 
-                             Identifier inSeqId, 
-                             AttributedURIType to) {
-        for (DestinationSequence seq : destination.getAllSequences()) {
-            if (!seq.sendAcknowledgement()) {
-                if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("no need to add acknowledgements for sequence "
-                        + seq.getIdentifier().getValue()); 
-                }
-                continue;
-            }
-            String address = seq.getAcksTo().getAddress().getValue();
-            if (!to.getValue().equals(address)) {
-                if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("sequences acksTo address (" + address
-                        + ") does not match to address (" + to.getValue() + ")");
-                }
-                continue;
-            }
-            // there may be multiple sources with anonymous acksTo 
-            if (RMUtils.getAddressingConstants().getAnonymousURI().equals(address)
-                && !AbstractSequence.identifierEquals(seq.getIdentifier(), inSeqId)) {                
-                if (LOG.isLoggable(Level.FINE)) {
-                    LOG.fine("sequence identifier does not match inbound sequence identifier");
-                }
-                continue;
-            }
-            rmpsOut.addAck(seq);
-        }
-
-        if (LOG.isLoggable(Level.FINE)) {
-            Collection<SequenceAcknowledgement> acks = rmpsOut.getAcks();
-            if (null == acks) {
-                LOG.fine("No acknowledgements added.");
-            } else {
-                LOG.fine("Added " + acks.size() + " acknowledgements.");
-            }
-        }
-    }
-    
     boolean isRuntimeFault(Message message) {
         FaultMode mode = MessageUtils.getFaultMode(message);
         if (null == mode) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Feb 10 10:08:34 2014
@@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -48,6 +49,7 @@ import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.InterfaceInfo;
@@ -116,6 +118,7 @@ public class RMManager {
     private DestinationPolicyType destinationPolicy;
     private InstrumentationManager instrumentationManager;
     private ManagedRMManager managedManager;
+    private PhaseInterceptorChain retransmitChain;
     
     // ServerLifeCycleListener
     
@@ -605,7 +608,7 @@ public class RMManager {
                 RMContextUtils.storeMAPs(maps, message, true, false);
             }
                                     
-            message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
+//            message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream());
             RMContextUtils.setProtocolVariation(message, ss.getProtocol());
             
             retransmissionQueue.addUnacknowledged(message);
@@ -710,4 +713,35 @@ public class RMManager {
             return sid;
         }
     }
+
+    /**
+     * Clones and saves the interceptor chain the first time this is called, so that it can be used for retransmission.
+     * Calls after the first are ignored.
+     * 
+     * @param msg
+     */
+    public void initializeInterceptorChain(Message msg) {
+        if (retransmitChain == null) {
+            LOG.info("Setting retransmit chain from message");
+            Set<Class<?>> formats = msg.getContentFormats();
+            int i = 0;
+            for (Class<?> clas: formats) {
+                Object content = msg.getContent(clas);
+                LOG.info("Found content " + content + " of type " + clas.getName());
+                i++;
+            }
+            LOG.info("Total " + i);
+            PhaseInterceptorChain chain = (PhaseInterceptorChain)msg.getInterceptorChain();
+            retransmitChain = chain.cloneChain();
+        }
+    }
+    
+    /**
+     * Get interceptor chain for retransmitting a message.
+     * 
+     * @return chain
+     */
+    public PhaseInterceptorChain getRetransmitChain() {
+        return retransmitChain.cloneChain();
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Mon Feb 10 10:08:34 2014
@@ -20,32 +20,20 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.cxf.Bus;
-import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageContentsList;
 import org.apache.cxf.message.MessageUtils;
-import org.apache.cxf.service.Service;
-import org.apache.cxf.service.model.BindingInfo;
-import org.apache.cxf.service.model.BindingOperationInfo;
-import org.apache.cxf.service.model.OperationInfo;
+import org.apache.cxf.phase.Phase;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
-import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
-import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
+import org.apache.cxf.ws.security.wss4j.PolicyBasedWSS4JOutInterceptor;
 
 /**
  * 
@@ -55,7 +43,8 @@ public class RMOutInterceptor extends Ab
     private static final Logger LOG = LogUtils.getL7dLogger(RMOutInterceptor.class);
  
     public RMOutInterceptor() {
-        addAfter(MAPAggregator.class.getName());
+        super(Phase.POST_PROTOCOL);
+        addBefore(PolicyBasedWSS4JOutInterceptor.PolicyBasedWSS4JOutInterceptorInternal.class.getName());
     }
     
     protected void handle(Message msg) throws SequenceFault, RMException {  
@@ -66,24 +55,14 @@ public class RMOutInterceptor extends Ab
         }
         
         if (isRuntimeFault(msg)) {
-            LogUtils.log(LOG, Level.WARNING, "RUNTIME_FAULT_MSG");
-            // in case of a SequenceFault or other WS-RM related fault, set action appropriately.
-            // the received inbound maps is available to extract some values in case if needed.
-            Throwable cause = msg.getContent(Exception.class).getCause();
-            if (cause instanceof SequenceFault || cause instanceof RMException) {
-                maps.getAction().setValue(getAddressingNamespace(maps) + "/fault");
-            }
             return;
         }
-
-        Source source = getManager().getSource(msg);
         
         RMConfiguration config = getManager().getEffectiveConfiguration(msg);
         String wsaNamespace = config.getAddressingNamespace();
         String rmNamespace = config.getRMNamespace();
         ProtocolVariation protocol = ProtocolVariation.findVariant(rmNamespace, wsaNamespace);
         RMContextUtils.setProtocolVariation(msg, protocol);
-        maps.exposeAs(wsaNamespace);
         Destination destination = getManager().getDestination(msg);
 
         String action = null;
@@ -98,87 +77,14 @@ public class RMOutInterceptor extends Ab
         boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action);
         boolean isPartialResponse = MessageUtils.isPartialResponse(msg);
         RMConstants constants = protocol.getConstants();
-        boolean isLastMessage = constants.getCloseSequenceAction().equals(action);
-        
-        if (isApplicationMessage && !isPartialResponse) {
-            addRetransmissionInterceptor(msg);
-        }
-        
         RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true);
-        if (null == rmpsOut) {
-            rmpsOut = new RMProperties();
-            rmpsOut.exposeAs(protocol.getWSRMNamespace());
-            RMContextUtils.storeRMProperties(msg, rmpsOut, true);
-        }
-        
-        // Activate process response for oneWay
-        if (msg.getExchange().isOneWay()) {
-            msg.getExchange().put(Message.PROCESS_ONEWAY_RESPONSE, true);
-        }
         
-        RMProperties rmpsIn = null;
         Identifier inSeqId = null;
-        long inMessageNumber = 0;
-        
+
         if (isApplicationMessage) {
-            rmpsIn = RMContextUtils.retrieveRMProperties(msg, false);
+            RMProperties rmpsIn = RMContextUtils.retrieveRMProperties(msg, false);
             if (null != rmpsIn && null != rmpsIn.getSequence()) {
                 inSeqId = rmpsIn.getSequence().getIdentifier();
-                inMessageNumber = rmpsIn.getSequence().getMessageNumber();
-            }
-            ContextUtils.storeDeferUncorrelatedMessageAbort(msg);
-        }
-        
-        Map<?, ?> invocationContext = (Map<?, ?>)msg.get(Message.INVOCATION_CONTEXT);
-        if ((isApplicationMessage || (isLastMessage && invocationContext != null)) && !isPartialResponse) {
-            if (LOG.isLoggable(Level.FINE)) {
-                LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
-            }
-            
-            // get the current sequence, requesting the creation of a new one if necessary
-            
-            synchronized (source) {
-                SourceSequence seq = null;
-                if (isLastMessage) {
-                    seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName());
-                } else {
-                    seq = getManager().getSequence(inSeqId, msg, maps);
-                }
-                assert null != seq;
-
-                // increase message number and store a sequence type object in
-                // context
-                seq.nextMessageNumber(inSeqId, inMessageNumber, isLastMessage);
-                
-                if (Boolean.TRUE.equals(msg.getContextualProperty(RMManager.WSRM_LAST_MESSAGE_PROPERTY))) {
-                    // mark the message as the last one
-                    seq.setLastMessage(true);
-                }
-                
-                rmpsOut.setSequence(seq);
-
-                // if this was the last message in the sequence, reset the
-                // current sequence so that a new one will be created next
-                // time the handler is invoked
-
-                if (seq.isLastMessage()) {
-                    source.setCurrent(null);
-                }
-            }
-        } else if (!MessageUtils.isRequestor(msg) && constants.getCreateSequenceAction().equals(action)) {
-            maps.getAction().setValue(constants.getCreateSequenceResponseAction());
-        } else if (isPartialResponse && action == null
-            && isResponseToAction(msg, constants.getSequenceAckAction())) {
-            Collection<SequenceAcknowledgement> acks = rmpsIn.getAcks();
-            if (acks.size() == 1) {
-                SourceSequence ss = source.getSequence(acks.iterator().next().getIdentifier());
-                if (ss != null && ss.allAcknowledged()) {
-                    setAction(maps, constants.getTerminateSequenceAction());
-                    setTerminateSequence(msg, ss.getIdentifier(), protocol);
-                    msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
-                    // removing this sequence now. See the comment in SourceSequence.setAcknowledged()
-                    source.removeSequence(ss);
-                }
             }
         }
         
@@ -200,28 +106,6 @@ public class RMOutInterceptor extends Ab
         
         assertReliability(msg);
     }
-
-    private void addRetransmissionInterceptor(Message msg) {
-        RetransmissionInterceptor ri = new RetransmissionInterceptor();
-        ri.setManager(getManager());
-        // TODO:
-        // On the server side: If a fault occurs after this interceptor we will switch 
-        // interceptor chains (if this is not already a fault message) and therefore need to 
-        // make sure the retransmission interceptor is added to the fault chain
-        // 
-        msg.getInterceptorChain().add(ri);
-        LOG.fine("Added RetransmissionInterceptor to chain.");
-        
-        getManager().getRetransmissionQueue().start();
-    }
-
-    private String getAddressingNamespace(AddressingProperties maps) {
-        String wsaNamespace = maps.getNamespaceURI();
-        if (wsaNamespace == null) {
-            getManager().getConfiguration().getAddressingNamespace();
-        }
-        return wsaNamespace;
-    }
     
     void addAcknowledgements(Destination destination, 
                              RMProperties rmpsOut, 
@@ -272,49 +156,6 @@ public class RMOutInterceptor extends Ab
         return FaultMode.CHECKED_APPLICATION_FAULT != mode;
     }
 
-    private boolean isResponseToAction(Message msg, String action) {
-        AddressingProperties inMaps = RMContextUtils.retrieveMAPs(msg, false, false);
-        String inAction = null;
-        if (null != inMaps.getAction()) {
-            inAction = inMaps.getAction().getValue();
-        }
-        return action.equals(inAction);
-    }
-    
-    private void setTerminateSequence(Message msg, Identifier identifier, ProtocolVariation protocol) 
-        throws RMException {
-        TerminateSequenceType ts = new TerminateSequenceType();
-        ts.setIdentifier(identifier);
-        MessageContentsList contents = 
-            new MessageContentsList(new Object[]{protocol.getCodec().convertToSend(ts)});
-        msg.setContent(List.class, contents);
-
-        // create a new exchange for this output-only exchange
-        Exchange newex = new ExchangeImpl();
-        Exchange oldex = msg.getExchange();
-        
-        newex.put(Bus.class, oldex.getBus());
-        newex.put(Endpoint.class, oldex.getEndpoint());
-        newex.put(Service.class, oldex.getEndpoint().getService());
-        newex.put(Binding.class, oldex.getEndpoint().getBinding());
-        newex.setConduit(oldex.getConduit(msg));
-        newex.setDestination(oldex.getDestination());
-        
-        //Setup the BindingOperationInfo
-        RMEndpoint rmep = getManager().getReliableEndpoint(msg);
-        OperationInfo oi = rmep.getEndpoint(protocol).getEndpointInfo().getService().getInterface()
-            .getOperation(protocol.getConstants().getTerminateSequenceAnonymousOperationName());
-        BindingInfo bi = rmep.getBindingInfo(protocol);
-        BindingOperationInfo boi = bi.getOperation(oi);
-        
-        newex.put(BindingInfo.class, bi);
-        newex.put(BindingOperationInfo.class, boi);
-        newex.put(OperationInfo.class, boi.getOperationInfo());
-        
-        msg.setExchange(newex);
-        newex.setOutMessage(msg);
-    }
-
     private static void setAction(AddressingProperties maps, String action) {
         AttributedURIType actionURI = new AttributedURIType();
         actionURI.setValue(action);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Mon Feb 10 10:08:34 2014
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.io.OutputStream;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -28,9 +27,7 @@ import javax.management.ObjectName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.io.WriteOnCloseOutputStream;
 import org.apache.cxf.management.ManagementConstants;
-import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.AddressingConstants;
 
 public final class RMUtils {
@@ -136,17 +133,6 @@ public final class RMUtils {
             + endpoint.getEndpointInfo().getName() + "@" + busId;
     }
     
-    public static WriteOnCloseOutputStream createCachedStream(Message message, OutputStream os) {
-        // We need to ensure that we have an output stream which won't start writing the 
-        // message until we have a chance to send a createsequence
-        if (!(os instanceof WriteOnCloseOutputStream)) {
-            WriteOnCloseOutputStream cached = new WriteOnCloseOutputStream(os);
-            message.setContent(OutputStream.class, cached);
-            os = cached;
-        }
-        return (WriteOnCloseOutputStream) os;
-    }
-    
     public static ObjectName getManagedObjectName(RMManager manager) throws JMException {
         StringBuilder buffer = new StringBuilder();
         writeTypeProperty(buffer, manager.getBus(), "WSRM.Manager");

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Mon Feb 10 10:08:34 2014
@@ -24,6 +24,7 @@ import org.apache.cxf.common.injection.N
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.interceptor.InterceptorProvider;
 import org.apache.cxf.ws.rm.RMCaptureInInterceptor;
+import org.apache.cxf.ws.rm.RMCaptureOutInterceptor;
 import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMManager;
@@ -33,7 +34,8 @@ import org.apache.cxf.ws.rm.manager.Dest
 import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
 import org.apache.cxf.ws.rm.persistence.RMStore;
-import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
+import org.apache.cxf.ws.rm.soap.RMSoapInInterceptor;
+import org.apache.cxf.ws.rm.soap.RMSoapOutInterceptor;
 import org.apache.cxf.ws.rmp.v200502.RMAssertion;
 
 /**
@@ -53,8 +55,10 @@ public class RMFeature extends AbstractF
     private RMInInterceptor rmLogicalIn = new RMInInterceptor();
     private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
     private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
-    private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
+    private RMSoapOutInterceptor rmOutCodec = new RMSoapOutInterceptor();
+    private RMSoapInInterceptor rmInCodec = new RMSoapInInterceptor();
     private RMCaptureInInterceptor rmCaptureIn = new RMCaptureInInterceptor();
+    private RMCaptureOutInterceptor rmCaptureOut = new RMCaptureOutInterceptor();
 
     public void setDeliveryAssurance(DeliveryAssuranceType da) {
         deliveryAssurance = da;
@@ -114,23 +118,26 @@ public class RMFeature extends AbstractF
         rmLogicalOut.setBus(bus);
         rmDelivery.setBus(bus);
         rmCaptureIn.setBus(bus);
+        rmCaptureOut.setBus(bus);
 
         provider.getInInterceptors().add(rmLogicalIn);
-        provider.getInInterceptors().add(rmCodec);
+        provider.getInInterceptors().add(rmInCodec);
         provider.getInInterceptors().add(rmDelivery);
         if (null != store) {
             provider.getInInterceptors().add(rmCaptureIn);
         }
 
         provider.getOutInterceptors().add(rmLogicalOut);
-        provider.getOutInterceptors().add(rmCodec);
+        provider.getOutInterceptors().add(rmOutCodec);
+        provider.getOutInterceptors().add(rmCaptureOut);
 
         provider.getInFaultInterceptors().add(rmLogicalIn);
-        provider.getInFaultInterceptors().add(rmCodec);
+        provider.getInFaultInterceptors().add(rmInCodec);
         provider.getInFaultInterceptors().add(rmDelivery);
 
         provider.getOutFaultInterceptors().add(rmLogicalOut);
-        provider.getOutFaultInterceptors().add(rmCodec);
+        provider.getOutFaultInterceptors().add(rmOutCodec);
+        provider.getOutFaultInterceptors().add(rmCaptureOut);
 
     }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java Mon Feb 10 10:08:34 2014
@@ -18,15 +18,14 @@
  */
 package org.apache.cxf.ws.rm.persistence;
 
-import java.io.IOException;
 import java.io.InputStream;
-
-import org.apache.cxf.helpers.IOUtils;
-import org.apache.cxf.io.CachedOutputStream;
+import java.util.Collections;
+import java.util.List;
 
 public class RMMessage {
     
-    private CachedOutputStream content;
+    private InputStream content;
+    private List<InputStream> attachments = Collections.emptyList();
     private long messageNumber;
     private String to;
     
@@ -46,55 +45,12 @@ public class RMMessage {
         messageNumber = mn;
     }
     
-
-    /**
-     * Returns the content of the message as an input stream.
-     * @return the content
-     * @deprecated
-     */
-    public byte[] getContent() {
-        byte[] bytes = null;
-        try {
-            bytes = content != null ? content.getBytes() : null;
-        } catch (IOException e) {
-            // ignore and treat it as null
-        }
-        return bytes;
-    }
-
-
-    /**
-     * Sets the message content as an input stream.
-     * @param content the message content
-     * @deprecated
-     */
-    public void setContent(byte[] c) {
-        content = new CachedOutputStream();
-        content.holdTempFile();
-        try {
-            content.write(c);
-        } catch (IOException e) {
-            // ignore
-        }
-    }
-    
     /**
      * Sets the message content using the input stream.
      * @param in
-     * @throws IOException
-     */
-    public void setContent(InputStream in) throws IOException {
-        content = new CachedOutputStream();
-        content.holdTempFile();
-        IOUtils.copy(in, content);
-    }
-
-    /**
-     * Sets the message content using the cached output stream.
-     * @param c
      */
-    public void setContent(CachedOutputStream c) {
-        content = c;
+    public void setContent(InputStream in) {
+        content = in;
     }
     
     /**
@@ -119,24 +75,24 @@ public class RMMessage {
      * @return
      * @throws IOException
      */
-    public InputStream getInputStream() throws IOException {
-        return content != null ? content.getInputStream() : null;
+    public InputStream getContent() {
+        return content;
     }
-    
+
     /**
-     * Returns the associated cached output stream.
-     * @return
+     * Returns the list of attachments.
+     * @return list (non-null)
      */
-    public CachedOutputStream getCachedOutputStream() {
-        return content;
+    public List<InputStream> getAttachments() {
+        return attachments;
     }
-    
+
     /**
-     * Returns the length of the message content in bytes.
-     * 
-     * @return
+     * Set the list of attachments.
+     * @param attaches (non-null)
      */
-    public long getSize() {
-        return content != null ? content.size() : -1L;
+    public void setAttachments(List<InputStream> attaches) {
+        assert attaches != null;
+        attachments = attaches;
     }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Mon Feb 10 10:08:34 2014
@@ -90,12 +90,19 @@ public class RMTxStore implements RMStor
            {"SEND_TO", "VARCHAR(256)"},
            {"CONTENT", "BLOB"}};
     private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
-    
+    private static final String[][] ATTACHMENTS_TABLE_COLS
+        = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+           {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
+           {"ATTACHMENT_NO", "DECIMAL(19, 0) NOT NULL"},
+           {"DATA", "BLOB"}};
+    private static final String[] ATTACHMENTS_TABLE_KEYS = {"SEQ_ID", "MSG_NO", "ATTACHMENT_NO"};
 
     private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES"; 
     private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES";
     private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
     private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";    
+    private static final String INBOUND_ATTS_TABLE_NAME = "CXF_RM_INBOUND_ATTACHMENTS";
+    private static final String OUTBOUND_ATTS_TABLE_NAME = "CXF_RM_OUTBOUND_ATTACHMENTS";    
     
     private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = 
         buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME, 
@@ -105,8 +112,9 @@ public class RMTxStore implements RMStor
         buildCreateTableStatement(SRC_SEQUENCES_TABLE_NAME, 
                                   SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS);
     private static final String CREATE_MESSAGES_TABLE_STMT =
-        buildCreateTableStatement("{0}", 
-                                  MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
+        buildCreateTableStatement("{0}", MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
+    private static final String CREATE_ATTACHMENTS_TABLE_STMT =
+        buildCreateTableStatement("{0}", ATTACHMENTS_TABLE_COLS, ATTACHMENTS_TABLE_KEYS);
 
     private static final String CREATE_DEST_SEQUENCE_STMT_STR 
         = "INSERT INTO CXF_RM_DEST_SEQUENCES "
@@ -128,6 +136,10 @@ public class RMTxStore implements RMStor
         = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT) VALUES(?, ?, ?, ?)";
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
+    private static final String CREATE_ATTACHMENT_STMT_STR 
+        = "INSERT INTO {0} (SEQ_ID, MSG_NO, ATTACHMENT_NO, DATA) VALUES(?, ?, ?, ?)";
+    private static final String DELETE_ATTACHMENTS_STMT_STR =
+        "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
     private static final String SELECT_DEST_SEQUENCE_STMT_STR =
         "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE SEQ_ID = ?";
@@ -142,6 +154,8 @@ public class RMTxStore implements RMStor
         + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
     private static final String SELECT_MESSAGES_STMT_STR =
         "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
+    private static final String SELECT_ATTACHMENTS_STMT_STR =
+        "SELECT ATTACHMENT_NO, DATA FROM {0} WHERE SEQ_ID = ?, MSG_ID = ?";
     private static final String ALTER_TABLE_STMT_STR =
         "ALTER TABLE {0} ADD {1} {2}";
     private static final String CREATE_INBOUND_MESSAGE_STMT_STR = 
@@ -156,6 +170,18 @@ public class RMTxStore implements RMStor
         MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME);
     private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR =
         MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME);
+    private static final String CREATE_INBOUND_ATTACHMENT_STMT_STR = 
+        MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, INBOUND_ATTS_TABLE_NAME);
+    private static final String CREATE_OUTBOUND_ATTACHMENT_STMT_STR = 
+        MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
+    private static final String DELETE_INBOUND_ATTACHMENTS_STMT_STR =
+        MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
+    private static final String DELETE_OUTBOUND_ATTACHMENTS_STMT_STR =
+        MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
+    private static final String SELECT_INBOUND_ATTACHMENTS_STMT_STR =
+        MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME);
+    private static final String SELECT_OUTBOUND_ATTACHMENTS_STMT_STR =
+        MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME);
     
     // create_schema may not work for several reasons, if so, create one manually
     private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}";
@@ -587,34 +613,44 @@ public class RMTxStore implements RMStor
     
     public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
         Connection con = verifyConnection();
-        PreparedStatement stmt = null;
+        PreparedStatement stmt1 = null;
+        PreparedStatement stmt2 = null;
         SQLException conex = null;
         Collection<RMMessage> msgs = new ArrayList<RMMessage>();
-        ResultSet res = null;
+        ResultSet res1 = null;
+        ResultSet res2 = null;
         try {
-            stmt = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
+            stmt1 = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR);
 
-            stmt.setString(1, sid.getValue());
-            res = stmt.executeQuery();
-            while (res.next()) {
-                long mn = res.getLong(1);
-                String to = res.getString(2);
-                Blob blob = res.getBlob(3);
+            stmt1.setString(1, sid.getValue());
+            res1 = stmt1.executeQuery();
+            while (res1.next()) {
+                long mn = res1.getLong(1);
+                String to = res1.getString(2);
+                Blob blob = res1.getBlob(3);
                 RMMessage msg = new RMMessage();
                 msg.setMessageNumber(mn);
                 msg.setTo(to);
                 msg.setContent(blob.getBinaryStream());
                 msgs.add(msg);
+                stmt2 = getStatement(con, outbound
+                     ? SELECT_OUTBOUND_ATTACHMENTS_STMT_STR : SELECT_INBOUND_ATTACHMENTS_STMT_STR);
+                stmt2.setString(1, sid.getValue());
+                stmt2.setLong(2, mn);
+                res2 = stmt2.executeQuery();
+                List<InputStream> attaches = new ArrayList<InputStream>();
+                while (res2.next()) {
+                    attaches.add(res2.getBinaryStream(1));
+                }
+                msg.setAttachments(attaches);
             }
         } catch (SQLException ex) {
             conex = ex;
             LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
                 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
-        } catch (IOException ex) {
-            LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
-                : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex);
         } finally {
-            releaseResources(stmt, res);
+            releaseResources(stmt2, res2);
+            releaseResources(stmt1, res1);
             updateConnectionState(con, conex);
         }
         return msgs;
@@ -628,7 +664,7 @@ public class RMTxStore implements RMStor
             
             updateDestinationSequence(con, seq);
             
-            if (msg != null && msg.getCachedOutputStream() != null) {
+            if (msg != null && msg.getContent() != null) {
                 storeMessage(con, seq.getIdentifier(), msg, false);
             }
             
@@ -653,7 +689,7 @@ public class RMTxStore implements RMStor
             
             updateSourceSequence(con, seq);
             
-            if (msg != null && msg.getCachedOutputStream() != null) {
+            if (msg != null && msg.getContent() != null) {
                 storeMessage(con, seq.getIdentifier(), msg, true);
             }
             
@@ -673,18 +709,24 @@ public class RMTxStore implements RMStor
     
     public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) {
         Connection con = verifyConnection();
-        PreparedStatement stmt = null;
+        PreparedStatement stmt1 = null;
+        PreparedStatement stmt2 = null;
         SQLException conex = null;
         try {
-            stmt = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
+            stmt1 = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR);
+            stmt2 = getStatement(con, outbound
+                ? DELETE_OUTBOUND_ATTACHMENTS_STMT_STR : DELETE_INBOUND_ATTACHMENTS_STMT_STR);
 
             beginTransaction();
 
-            stmt.setString(1, sid.getValue());
+            stmt1.setString(1, sid.getValue());
+            stmt2.setString(1, sid.getValue());
                         
             for (Long messageNr : messageNrs) {
-                stmt.setLong(2, messageNr);
-                stmt.execute();
+                stmt2.setLong(2, messageNr);
+                stmt2.execute();
+                stmt1.setLong(2, messageNr);
+                stmt1.execute();
             }
             
             commit(con);
@@ -694,7 +736,8 @@ public class RMTxStore implements RMStor
             abort(con);
             throw new RMStoreException(ex);
         } finally {
-            releaseResources(stmt, null);
+            releaseResources(stmt2, null);
+            releaseResources(stmt1, null);
             updateConnectionState(con, conex);
         }
     }
@@ -740,31 +783,43 @@ public class RMTxStore implements RMStor
             LOG.log(Level.FINE, "Storing {0} message number {1} for sequence {2}, to = {3}",
                     new Object[] {outbound ? "outbound" : "inbound", nr, id, to});
         }
-        InputStream msgin = null;
-        PreparedStatement stmt = null;
+        PreparedStatement stmt1 = null;
+        PreparedStatement stmt2 = null;
         try {
-            msgin = msg.getInputStream();
-            stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
+            InputStream msgin = msg.getContent();
+            stmt1 = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR);
 
-            int i = 1;
-            stmt.setString(i++, id);  
-            stmt.setLong(i++, nr);
-            stmt.setString(i++, to); 
-            stmt.setBinaryStream(i++, msgin, (int)msg.getSize());
-            stmt.execute();
+            stmt1.setString(1, id);  
+            stmt1.setLong(2, nr);
+            stmt1.setString(3, to); 
+            stmt1.setBinaryStream(4, msgin);
+            stmt1.execute();
+            
+            List<InputStream> attachments = msg.getAttachments();
+            if (attachments.size() > 0) {
+                stmt2 = getStatement(con, outbound
+                     ? CREATE_OUTBOUND_ATTACHMENT_STMT_STR : CREATE_INBOUND_ATTACHMENT_STMT_STR);
+                stmt2.setString(1, id);
+                stmt2.setLong(2, nr);
+                for (int i = 0; i < attachments.size(); i++) {
+                    stmt2.setLong(3, i);
+                    stmt2.setBinaryStream(4, attachments.get(i));
+                    stmt2.execute();
+                    if (LOG.isLoggable(Level.FINE)) {
+                        LOG.log(Level.FINE,
+                            "Successfully stored {0} attachment {1} for message number {2} in sequence {3}",
+                            new Object[] {outbound ? "outbound" : "inbound", i, nr, id});
+                    }
+                }
+            }
+            
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
                         new Object[] {outbound ? "outbound" : "inbound", nr, id});
             }
         } finally  {
-            if (msgin != null) {
-                try {
-                    msgin.close();
-                } catch (IOException e) {
-                    // ignore
-                }
-            }
-            releaseResources(stmt, null);
+            releaseResources(stmt1, null);
+            releaseResources(stmt2, null);
         }
     }
     
@@ -806,10 +861,9 @@ public class RMTxStore implements RMStor
 
             long lastMessageNr = seq.getLastMessageNumber();
             stmt.setLong(1, lastMessageNr); 
-            InputStream is = PersistenceUtils.getInstance()
-                .serialiseAcknowledgment(seq.getAcknowledgment());
+            InputStream is = PersistenceUtils.getInstance().serialiseAcknowledgment(seq.getAcknowledgment());
             stmt.setBinaryStream(2, is, is.available()); 
-            stmt.setString(3, seq.getIdentifier() .getValue());
+            stmt.setString(3, seq.getIdentifier().getValue());
             stmt.execute();
         } finally {
             releaseResources(stmt, null);
@@ -876,6 +930,24 @@ public class RMTxStore implements RMStor
                     stmt.close();
                 }
             }
+
+            for (String tableName : new String[] {OUTBOUND_ATTS_TABLE_NAME, INBOUND_ATTS_TABLE_NAME}) {
+                stmt = con.createStatement();
+                try {
+                    stmt.executeUpdate(MessageFormat.format(CREATE_ATTACHMENTS_TABLE_STMT, tableName));
+                } catch (SQLException ex) {
+                    if (!isTableExistsError(ex)) {
+                        throw ex;
+                    } else {
+                        if (LOG.isLoggable(Level.FINE)) {
+                            LOG.fine("Table " + tableName + " already exists.");
+                        }
+                        verifyTable(con, tableName, ATTACHMENTS_TABLE_COLS);
+                    }
+                } finally {
+                    stmt.close();
+                }
+            }
         } finally {
             if (connection == null && con != null) {
                 con.close();

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1566555&r1=1566554&r2=1566555&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Mon Feb 10 10:08:34 2014
@@ -25,10 +25,12 @@ import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider;
+import org.apache.cxf.ws.rm.RMCaptureOutInterceptor;
 import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMOutInterceptor;
-import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
+import org.apache.cxf.ws.rm.soap.RMSoapInInterceptor;
+import org.apache.cxf.ws.rm.soap.RMSoapOutInterceptor;
 
 public class RMPolicyInterceptorProvider extends AbstractPolicyInterceptorProvider {
 
@@ -36,7 +38,9 @@ public class RMPolicyInterceptorProvider
     private static final Collection<QName> ASSERTION_TYPES;
     private RMInInterceptor rmIn = new RMInInterceptor();
     private RMOutInterceptor rmOut = new RMOutInterceptor();
-    private RMSoapInterceptor rmSoap = new RMSoapInterceptor();
+    private RMCaptureOutInterceptor rmCaptureOut = new RMCaptureOutInterceptor();
+    private RMSoapOutInterceptor rmOutSoap = new RMSoapOutInterceptor();
+    private RMSoapInInterceptor rmInSoap = new RMSoapInInterceptor();
     private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
 
     static {
@@ -51,20 +55,23 @@ public class RMPolicyInterceptorProvider
         super(ASSERTION_TYPES);
         rmIn.setBus(bus);
         rmOut.setBus(bus);
+        rmCaptureOut.setBus(bus);
         rmDelivery.setBus(bus);
         
         getInInterceptors().add(rmIn);
-        getInInterceptors().add(rmSoap);
+        getInInterceptors().add(rmInSoap);
         getInInterceptors().add(rmDelivery);
 
         getOutInterceptors().add(rmOut);
-        getOutInterceptors().add(rmSoap);
+        getOutInterceptors().add(rmCaptureOut);
+        getOutInterceptors().add(rmOutSoap);
 
         getInFaultInterceptors().add(rmIn);
-        getInFaultInterceptors().add(rmSoap);
+        getInFaultInterceptors().add(rmInSoap);
         getInFaultInterceptors().add(rmDelivery);
 
         getOutFaultInterceptors().add(rmOut);
-        getOutFaultInterceptors().add(rmSoap);
+        getOutFaultInterceptors().add(rmCaptureOut);
+        getOutFaultInterceptors().add(rmOutSoap);
     }
 }



Mime
View raw message