Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D14E91083F for ; Mon, 10 Feb 2014 10:09:13 +0000 (UTC) Received: (qmail 40255 invoked by uid 500); 10 Feb 2014 10:09:11 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 40206 invoked by uid 500); 10 Feb 2014 10:09:09 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 40197 invoked by uid 99); 10 Feb 2014 10:09:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Feb 2014 10:09:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Feb 2014 10:08:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2C69123888E4; Mon, 10 Feb 2014 10:08:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1566555 [1/2] - in /cxf/trunk: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/io/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/ rt/ws/rm/src/main/java/or... Date: Mon, 10 Feb 2014 10:08:35 -0000 To: commits@cxf.apache.org From: dsosnoski@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140210100836.2C69123888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dsosnoski Date: Mon Feb 10 10:08:34 2014 New Revision: 1566555 URL: http://svn.apache.org/r1566555 Log: CXF-4866, CXF-352 Restructure WS-RM so that updated WS-RM headers are generated for retransmission and security is re-applied to the message. Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java - copied, changed from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java - copied, changed from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptor.java cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RMSoapOutInterceptorTest.java Removed: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionCallback.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInterceptor.java Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java (original) +++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/MessageSenderInterceptor.java Mon Feb 10 10:08:34 2014 @@ -52,7 +52,7 @@ public class MessageSenderInterceptor ex message.getInterceptorChain().add(ending); } - public class MessageSenderEndingInterceptor extends AbstractPhaseInterceptor { + public static class MessageSenderEndingInterceptor extends AbstractPhaseInterceptor { public MessageSenderEndingInterceptor() { super(Phase.PREPARE_SEND_ENDING); } @@ -66,7 +66,7 @@ public class MessageSenderInterceptor ex } } - private Conduit getConduit(Message message) { + public static Conduit getConduit(Message message) { Exchange exchange = message.getExchange(); Conduit conduit = exchange.getConduit(message); if (conduit == null Modified: cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java (original) +++ cxf/trunk/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java Mon Feb 10 10:08:34 2014 @@ -575,6 +575,10 @@ public class CachedOutputStream extends public void setOutputDir(File outputDir) throws IOException { this.outputDir = outputDir; } + + public long getThreshold() { + return threshold; + } public void setThreshold(long threshold) { this.threshold = threshold; } Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Mon Feb 10 10:08:34 2014 @@ -107,7 +107,7 @@ public abstract class AbstractRMIntercep /** * Asserts all RMAssertion assertions for the current message, regardless their attributes - * (if there is more thsn one we have ensured that they are all supported by considering + * (if there is more than one we have ensured that they are all supported by considering * e.g. the minimum acknowledgment interval). * @param message the current message */ Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Mon Feb 10 10:08:34 2014 @@ -19,6 +19,7 @@ package org.apache.cxf.ws.rm; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -33,7 +34,6 @@ import org.apache.cxf.common.logging.Log import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.continuations.SuspendedInvocationException; -import org.apache.cxf.io.CachedOutputStream; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.ws.addressing.EndpointReferenceType; @@ -167,7 +167,7 @@ public class DestinationSequence extends RMMessage msg = null; if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) { msg = new RMMessage(); - msg.setContent((CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT)); + msg.setContent((InputStream)message.get(RMMessageConstants.SAVED_CONTENT)); msg.setMessageNumber(st.getMessageNumber()); } store.persistIncoming(this, msg); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder.java Mon Feb 10 10:08:34 2014 @@ -19,10 +19,17 @@ package org.apache.cxf.ws.rm; +import java.util.Collection; + +import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; import javax.xml.namespace.QName; +import org.w3c.dom.Document; import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; import org.apache.cxf.ws.rm.v200702.AckRequestedType; import org.apache.cxf.ws.rm.v200702.CloseSequenceType; @@ -34,70 +41,131 @@ import org.apache.cxf.ws.rm.v200702.Sequ import org.apache.cxf.ws.rm.v200702.TerminateSequenceType; /** - * Interface for converting WS-ReliableMessaging structures to and from XML. Implementations of this interface - * provide version-specific encoding and decoding. + * Base class for converting WS-ReliableMessaging structures to and from XML. Subclasses provide version-specific + * encoding and decoding. */ -public interface EncoderDecoder { +public abstract class EncoderDecoder { + + /** + * Get context for JAXB marshalling/unmarshalling. + * + * @return context + * @throws JAXBException + */ + protected abstract JAXBContext getContext() throws JAXBException; + + /** + * Add WS-RM namespace declaration to element. + * + * @param element + */ + protected abstract void addNamespaceDecl(Element element); /** * Get the WS-ReliableMessaging namespace used by this encoder/decoder. * * @return URI */ - String getWSRMNamespace(); + public abstract String getWSRMNamespace(); /** * Get the WS-Addressing namespace used by this encoder/decoder. * * @return URI */ - String getWSANamespace(); + public abstract String getWSANamespace(); /** * Get the WS-ReliableMessaging constants used by this encoder/decoder. * * @return */ - RMConstants getConstants(); + public abstract RMConstants getConstants(); /** * Get the class used for the CreateSequenceType. * * @return class */ - Class getCreateSequenceType(); + public abstract Class getCreateSequenceType(); /** * Get the class used for the CreateSequenceResponseType. * * @return class */ - Class getCreateSequenceResponseType(); + public abstract Class getCreateSequenceResponseType(); /** * Get the class used for the TerminateSequenceType. * * @return class */ - Class getTerminateSequenceType(); + public abstract Class getTerminateSequenceType(); /** * Get the class used for the TerminateSequenceResponseType. * * @return class */ - Class getTerminateSequenceResponseType(); + public abstract Class getTerminateSequenceResponseType(); /** - * Builds an element containing WS-RM headers. This adds the appropriate WS-RM and WS-A namespace - * declarations to the element, and then adds any WS-RM headers set in the supplied properties as child - * elements. + * Insert WS-RM headers into a SOAP message. This adds the appropriate WS-RM namespace declaration to the + * SOAP:Header element (which must be present), and then adds any WS-RM headers set in the supplied properties as + * child elements. * * @param rmps - * @param qname constructed element name - * @return element + * @param doc + * @return true if headers added, false if not + */ + public boolean insertHeaders(RMProperties rmps, Document doc) throws JAXBException { + + // check if there's anything to insert + SequenceType seq = rmps.getSequence(); + Collection acks = rmps.getAcks(); + Collection reqs = rmps.getAcksRequested(); + if (seq == null && acks == null && reqs == null) { + return false; + } + + // get the SOAP:Header element + NodeList nodes = doc.getDocumentElement().getChildNodes(); + Element header = null; + for (int i = 0; i < nodes.getLength(); i++) { + Node node = nodes.item(i); + if (node.getNodeType() == Node.ELEMENT_NODE && "Header".equals(node.getLocalName())) { + header = (Element)node; + break; + } + } + if (header == null) { + throw new JAXBException("No SOAP:Header element in message"); + } + + // add WSRM namespace declaration to header, instead of repeating in each individual child node + addNamespaceDecl(header); + + // build individual headers + Marshaller marshaller = getContext().createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); + buildHeaders(seq, acks, reqs, rmps.isLastMessage(), header, marshaller); + return true; + } + + /** + * Build all required headers, using the correct protocol variation. + * + * @param seq + * @param acks + * @param reqs + * @param last + * @param header + * @param marshaller + * @throws JAXBException */ - Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException; + protected abstract void buildHeaders(SequenceType seq, Collection acks, + Collection reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException; /** * Builds an element containing a WS-RM Fault. This adds the appropriate WS-RM namespace declaration to @@ -107,7 +175,7 @@ public interface EncoderDecoder { * @param qname constructed element name * @return element */ - Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException; + public abstract Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException; /** * Marshals a SequenceAcknowledgement to the appropriate external form. @@ -116,7 +184,7 @@ public interface EncoderDecoder { * @return element * @throws JAXBException */ - Element encodeSequenceAcknowledgement(SequenceAcknowledgement ack) throws JAXBException; + public abstract Element encodeSequenceAcknowledgement(SequenceAcknowledgement ack) throws JAXBException; /** * Marshals an Identifier to the appropriate external form. @@ -125,7 +193,7 @@ public interface EncoderDecoder { * @return element * @throws JAXBException */ - Element encodeIdentifier(Identifier id) throws JAXBException; + public abstract Element encodeIdentifier(Identifier id) throws JAXBException; /** * Unmarshals a SequenceType, converting it if necessary to the internal form. @@ -134,7 +202,7 @@ public interface EncoderDecoder { * @return * @throws JAXBException */ - SequenceType decodeSequenceType(Element elem) throws JAXBException; + public abstract SequenceType decodeSequenceType(Element elem) throws JAXBException; /** * Generates a CloseSequenceType if a SequenceType represents a last message state. @@ -143,7 +211,7 @@ public interface EncoderDecoder { * @return CloseSequenceType if last message state, else null * @throws JAXBException */ - CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException; + public abstract CloseSequenceType decodeSequenceTypeCloseSequence(Element elem) throws JAXBException; /** * Unmarshals a SequenceAcknowledgement, converting it if necessary to the internal form. @@ -152,7 +220,7 @@ public interface EncoderDecoder { * @return * @throws JAXBException */ - SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException; + public abstract SequenceAcknowledgement decodeSequenceAcknowledgement(Element elem) throws JAXBException; /** * Unmarshals a AckRequestedType, converting it if necessary to the internal form. @@ -161,7 +229,7 @@ public interface EncoderDecoder { * @return * @throws JAXBException */ - AckRequestedType decodeAckRequestedType(Element elem) throws JAXBException; + public abstract AckRequestedType decodeAckRequestedType(Element elem) throws JAXBException; /** * Convert a CreateSequence message to the correct format for transmission. @@ -169,7 +237,7 @@ public interface EncoderDecoder { * @param create * @return converted */ - Object convertToSend(CreateSequenceType create); + public abstract Object convertToSend(CreateSequenceType create); /** * Convert a CreateSequenceResponse message to the correct format for transmission. @@ -177,7 +245,7 @@ public interface EncoderDecoder { * @param create * @return converted */ - Object convertToSend(CreateSequenceResponseType create); + public abstract Object convertToSend(CreateSequenceResponseType create); /** * Convert a TerminateSequence message to the correct format for transmission. @@ -185,7 +253,7 @@ public interface EncoderDecoder { * @param term * @return converted */ - Object convertToSend(TerminateSequenceType term); + public abstract Object convertToSend(TerminateSequenceType term); /** * Convert a received TerminateSequence message to internal form. @@ -193,7 +261,7 @@ public interface EncoderDecoder { * @param term * @return converted */ - TerminateSequenceType convertReceivedTerminateSequence(Object term); + public abstract TerminateSequenceType convertReceivedTerminateSequence(Object term); /** * Convert a received CreateSequence message to internal form. @@ -201,7 +269,7 @@ public interface EncoderDecoder { * @param create * @return converted */ - CreateSequenceType convertReceivedCreateSequence(Object create); + public abstract CreateSequenceType convertReceivedCreateSequence(Object create); /** * Convert a received CreateSequenceResponse message to internal form. @@ -209,5 +277,5 @@ public interface EncoderDecoder { * @param create * @return converted */ - CreateSequenceResponseType convertReceivedCreateSequenceResponse(Object create); + public abstract CreateSequenceResponseType convertReceivedCreateSequenceResponse(Object create); } \ No newline at end of file Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10AImpl.java Mon Feb 10 10:08:34 2014 @@ -20,6 +20,7 @@ package org.apache.cxf.ws.rm; import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,11 +52,11 @@ import org.apache.cxf.ws.rm.v200702.Term * WS-ReliableMessaging 1.0 encoding and decoding. This converts between the standard WS-RM objects and the * 1.0 representation using the WS-Addressing recommendation 200508 namespace. */ -public final class EncoderDecoder10AImpl implements EncoderDecoder { +public final class EncoderDecoder10AImpl extends EncoderDecoder { public static final EncoderDecoder10AImpl INSTANCE = new EncoderDecoder10AImpl(); - private static JAXBContext jaxbContext; + private static AtomicReference jaxbContextReference = new AtomicReference(); private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder10AImpl.class); @@ -90,50 +91,47 @@ public final class EncoderDecoder10AImpl return null; } - private static JAXBContext getContext() throws JAXBException { - synchronized (EncoderDecoder10AImpl.class) { - if (jaxbContext == null) { - Class clas = RMUtils.getWSRM200502WSA200508Factory().getClass(); - jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), - clas.getClassLoader()); + protected JAXBContext getContext() throws JAXBException { + JAXBContext jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + synchronized (EncoderDecoder10AImpl.class) { + jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + Class clas = RMUtils.getWSRM200502WSA200508Factory().getClass(); + jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), + clas.getClassLoader()); + jaxbContextReference.set(jaxbContext); + } } } return jaxbContext; } - public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException { - - Document doc = DOMUtils.createDocument(); - Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart()); - // add WSRM namespace declaration to header, instead of - // repeating in each individual child node - Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", + protected void addNamespaceDecl(Element element) { + Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", "xmlns:" + RMConstants.NAMESPACE_PREFIX); attr.setValue(RM10Constants.NAMESPACE_URI); - header.setAttributeNodeNS(attr); + element.setAttributeNodeNS(attr); + } - Marshaller marshaller = getContext().createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); - - SequenceType seq = rmps.getSequence(); + protected void buildHeaders(SequenceType seq, Collection acks, + Collection reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException { if (null != seq) { LOG.log(Level.FINE, "encoding sequence into RM header"); org.apache.cxf.ws.rm.v200502wsa15.SequenceType toseq = VersionTransformer.convert200502wsa15(seq); - if (rmps.isLastMessage()) { + if (last) { toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502wsa15.SequenceType.LastMessage()); } JAXBElement element = RMUtils.getWSRM200502WSA200508Factory().createSequence(toseq); marshaller.marshal(element, header); } - Collection acks = rmps.getAcks(); if (null != acks) { LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header"); for (SequenceAcknowledgement ack : acks) { marshaller.marshal(VersionTransformer.convert200502wsa15(ack), header); } } - Collection reqs = rmps.getAcksRequested(); if (null != reqs) { LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header"); for (AckRequestedType req : reqs) { @@ -141,7 +139,6 @@ public final class EncoderDecoder10AImpl .createAckRequested(VersionTransformer.convert200502wsa15(req)), header); } } - return header; } public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException { Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder10Impl.java Mon Feb 10 10:08:34 2014 @@ -20,6 +20,7 @@ package org.apache.cxf.ws.rm; import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,11 +52,11 @@ import org.apache.cxf.ws.rm.v200702.Term * WS-ReliableMessaging 1.0 encoding and decoding. This converts between the standard WS-RM objects and the * 1.0 representation using the WS-Addressing 200408 namespace specified in the WS-RM 1.0 recommendation. */ -public final class EncoderDecoder10Impl implements EncoderDecoder { +public final class EncoderDecoder10Impl extends EncoderDecoder { public static final EncoderDecoder10Impl INSTANCE = new EncoderDecoder10Impl(); - private static JAXBContext jaxbContext; + private static AtomicReference jaxbContextReference = new AtomicReference(); private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder10Impl.class); @@ -90,50 +91,46 @@ public final class EncoderDecoder10Impl return null; } - private static JAXBContext getContext() throws JAXBException { - synchronized (EncoderDecoder10Impl.class) { - if (jaxbContext == null) { - Class clas = RMUtils.getWSRM200502Factory().getClass(); - jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), - clas.getClassLoader()); + protected JAXBContext getContext() throws JAXBException { + JAXBContext jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + synchronized (EncoderDecoder10Impl.class) { + jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + Class clas = RMUtils.getWSRM200502Factory().getClass(); + jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), + clas.getClassLoader()); + jaxbContextReference.set(jaxbContext); + } } } return jaxbContext; } - public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException { - - Document doc = DOMUtils.createDocument(); - Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart()); - // add WSRM namespace declaration to header, instead of - // repeating in each individual child node - Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", + protected void addNamespaceDecl(Element element) { + Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", "xmlns:" + RMConstants.NAMESPACE_PREFIX); attr.setValue(RM10Constants.NAMESPACE_URI); - header.setAttributeNodeNS(attr); + element.setAttributeNodeNS(attr); + } - Marshaller marshaller = getContext().createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); - - SequenceType seq = rmps.getSequence(); + protected void buildHeaders(SequenceType seq, Collection acks, + Collection reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException { if (null != seq) { LOG.log(Level.FINE, "encoding sequence into RM header"); org.apache.cxf.ws.rm.v200502.SequenceType toseq = VersionTransformer.convert200502(seq); - if (rmps.isLastMessage()) { + if (last) { toseq.setLastMessage(new org.apache.cxf.ws.rm.v200502.SequenceType.LastMessage()); } - JAXBElement element - = RMUtils.getWSRM200502Factory().createSequence(toseq); + JAXBElement element = RMUtils.getWSRM200502Factory().createSequence(toseq); marshaller.marshal(element, header); } - Collection acks = rmps.getAcks(); if (null != acks) { LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header"); for (SequenceAcknowledgement ack : acks) { marshaller.marshal(VersionTransformer.convert200502(ack), header); } } - Collection reqs = rmps.getAcksRequested(); if (null != reqs) { LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header"); for (AckRequestedType req : reqs) { @@ -141,7 +138,6 @@ public final class EncoderDecoder10Impl .createAckRequested(VersionTransformer.convert200502(req)), header); } } - return header; } public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException { Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/EncoderDecoder11Impl.java Mon Feb 10 10:08:34 2014 @@ -20,6 +20,7 @@ package org.apache.cxf.ws.rm; import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -49,14 +50,14 @@ import org.apache.cxf.ws.rm.v200702.Sequ import org.apache.cxf.ws.rm.v200702.TerminateSequenceType; /** - * WS-ReliableMessaging 1.1 encoding and decoding. This just works with the standard internal form of the + * WS-ReliableMessaging 1.1/1.2 encoding and decoding. This just works with the standard internal form of the * WS-RM data structures. */ -public final class EncoderDecoder11Impl implements EncoderDecoder { +public final class EncoderDecoder11Impl extends EncoderDecoder { public static final EncoderDecoder11Impl INSTANCE = new EncoderDecoder11Impl(); - private static JAXBContext jaxbContext; + private static AtomicReference jaxbContextReference = new AtomicReference(); private static final Logger LOG = LogUtils.getL7dLogger(EncoderDecoder11Impl.class); @@ -91,65 +92,58 @@ public final class EncoderDecoder11Impl return org.apache.cxf.ws.rm.v200702.TerminateSequenceResponseType.class; } - private static JAXBContext getContext() throws JAXBException { - synchronized (EncoderDecoder11Impl.class) { - if (jaxbContext == null) { - Class clas = RMUtils.getWSRMFactory().getClass(); - jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), - clas.getClassLoader()); + protected JAXBContext getContext() throws JAXBException { + JAXBContext jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + synchronized (EncoderDecoder11Impl.class) { + jaxbContext = jaxbContextReference.get(); + if (jaxbContext == null) { + Class clas = RMUtils.getWSRMFactory().getClass(); + jaxbContext = JAXBContext.newInstance(PackageUtils.getPackageName(clas), + clas.getClassLoader()); + jaxbContextReference.set(jaxbContext); + } } } return jaxbContext; } - - public Element buildHeaders(RMProperties rmps, QName qname) throws JAXBException { - - Document doc = DOMUtils.createDocument(); - Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart()); - // add WSRM namespace declaration to header, instead of - // repeating in each individual child node - Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", - "xmlns:" + RMConstants.NAMESPACE_PREFIX); - attr.setValue(RM10Constants.NAMESPACE_URI); - header.setAttributeNodeNS(attr); - Marshaller marshaller = getContext().createMarshaller(); - marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); + protected void buildHeaders(SequenceType seq, Collection acks, + Collection reqs, boolean last, Element header, Marshaller marshaller) throws JAXBException { - SequenceType seq = rmps.getSequence(); if (null != seq) { LOG.log(Level.FINE, "encoding sequence into RM header"); JAXBElement element = RMUtils.getWSRMFactory().createSequence(seq); marshaller.marshal(element, header); } - Collection acks = rmps.getAcks(); if (null != acks) { LOG.log(Level.FINE, "encoding sequence acknowledgement(s) into RM header"); for (SequenceAcknowledgement ack : acks) { marshaller.marshal(ack, header); } } - Collection reqs = rmps.getAcksRequested(); if (null != reqs) { LOG.log(Level.FINE, "encoding acknowledgement request(s) into RM header"); for (AckRequestedType req : reqs) { marshaller.marshal(RMUtils.getWSRMFactory().createAckRequested(req), header); } } - return header; + } + + protected void addNamespaceDecl(Element element) { + Attr attr = element.getOwnerDocument().createAttributeNS("http://www.w3.org/2000/xmlns/", + "xmlns:" + RMConstants.NAMESPACE_PREFIX); + attr.setValue(RM10Constants.NAMESPACE_URI); + element.setAttributeNodeNS(attr); } public Element buildHeaderFault(SequenceFault sf, QName qname) throws JAXBException { Document doc = DOMUtils.createDocument(); Element header = doc.createElementNS(qname.getNamespaceURI(), qname.getLocalPart()); - // add WSRM namespace declaration to header, instead of - // repeating in each individual child node - Attr attr = doc.createAttributeNS("http://www.w3.org/2000/xmlns/", - "xmlns:" + RMConstants.NAMESPACE_PREFIX); - attr.setValue(RM11Constants.NAMESPACE_URI); - header.setAttributeNodeNS(attr); + // add WSRM namespace declaration to header, instead of repeating in each individual child node + addNamespaceDecl(header); Marshaller marshaller = getContext().createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java Mon Feb 10 10:08:34 2014 @@ -58,7 +58,7 @@ public class RMCaptureInInterceptor exte message.setContent(InputStream.class, saved.getInputStream()); LOG.fine("Capturing the original RM message"); - message.put(RMMessageConstants.SAVED_CONTENT, saved); + message.put(RMMessageConstants.SAVED_CONTENT, saved.getInputStream()); } catch (Exception e) { throw new Fault(e); } Copied: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java (from r1566482, cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java) URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java?p2=cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java&p1=cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java&r1=1566482&r2=1566555&rev=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java Mon Feb 10 10:08:34 2014 @@ -19,22 +19,29 @@ package org.apache.cxf.ws.rm; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.xml.soap.SOAPException; +import javax.xml.soap.SOAPMessage; + import org.apache.cxf.Bus; import org.apache.cxf.binding.Binding; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.helpers.LoadingByteArrayOutputStream; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.FaultMode; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageContentsList; import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.phase.Phase; import org.apache.cxf.service.Service; import org.apache.cxf.service.model.BindingInfo; import org.apache.cxf.service.model.BindingOperationInfo; @@ -42,7 +49,8 @@ import org.apache.cxf.service.model.Oper import org.apache.cxf.ws.addressing.AddressingProperties; import org.apache.cxf.ws.addressing.AttributedURIType; import org.apache.cxf.ws.addressing.ContextUtils; -import org.apache.cxf.ws.addressing.MAPAggregator; +import org.apache.cxf.ws.rm.persistence.RMMessage; +import org.apache.cxf.ws.rm.persistence.RMStore; import org.apache.cxf.ws.rm.v200702.Identifier; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; import org.apache.cxf.ws.rm.v200702.TerminateSequenceType; @@ -50,12 +58,13 @@ import org.apache.cxf.ws.rm.v200702.Term /** * */ -public class RMOutInterceptor extends AbstractRMInterceptor { +public class RMCaptureOutInterceptor extends AbstractRMInterceptor { - private static final Logger LOG = LogUtils.getL7dLogger(RMOutInterceptor.class); + private static final Logger LOG = LogUtils.getL7dLogger(RMCaptureOutInterceptor.class); - public RMOutInterceptor() { - addAfter(MAPAggregator.class.getName()); + public RMCaptureOutInterceptor() { + super(Phase.POST_PROTOCOL); + addBefore(RMOutInterceptor.class.getName()); } protected void handle(Message msg) throws SequenceFault, RMException { @@ -84,7 +93,6 @@ public class RMOutInterceptor extends Ab ProtocolVariation protocol = ProtocolVariation.findVariant(rmNamespace, wsaNamespace); RMContextUtils.setProtocolVariation(msg, protocol); maps.exposeAs(wsaNamespace); - Destination destination = getManager().getDestination(msg); String action = null; if (null != maps.getAction()) { @@ -100,10 +108,6 @@ public class RMOutInterceptor extends Ab RMConstants constants = protocol.getConstants(); boolean isLastMessage = constants.getCloseSequenceAction().equals(action); - if (isApplicationMessage && !isPartialResponse) { - addRetransmissionInterceptor(msg); - } - RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true); if (null == rmpsOut) { rmpsOut = new RMProperties(); @@ -182,37 +186,57 @@ public class RMOutInterceptor extends Ab } } - // add Acknowledgements (to application messages or explicitly - // created Acknowledgement messages only) - if (isApplicationMessage || constants.getSequenceAckAction().equals(action)) { - AttributedURIType to = maps.getTo(); - assert null != to; - addAcknowledgements(destination, rmpsOut, inSeqId, to); - if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size() > 0) { - setAction(maps, constants.getSequenceAckAction()); - msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE); - } - } - if (constants.getSequenceAckAction().equals(action) - || constants.getTerminateSequenceAction().equals(action)) { - maps.setReplyTo(RMUtils.createNoneReference()); + // capture message if retranmission possible + if (isApplicationMessage && !isPartialResponse) { + captureMessage(msg); + getManager().initializeInterceptorChain(msg); } - - assertReliability(msg); } - private void addRetransmissionInterceptor(Message msg) { - RetransmissionInterceptor ri = new RetransmissionInterceptor(); - ri.setManager(getManager()); - // TODO: - // On the server side: If a fault occurs after this interceptor we will switch - // interceptor chains (if this is not already a fault message) and therefore need to - // make sure the retransmission interceptor is added to the fault chain - // - msg.getInterceptorChain().add(ri); - LOG.fine("Added RetransmissionInterceptor to chain."); - - getManager().getRetransmissionQueue().start(); + private void captureMessage(Message message) { + SOAPMessage content = message.getContent(SOAPMessage.class); + try { + LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream(); + content.writeTo(bos); + bos.close(); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Captured message: " + bos.toString("UTF-8")); + } + ByteArrayInputStream bis = bos.createInputStream(); + message.put(RMMessageConstants.SAVED_CONTENT, bis); + RMManager manager = getManager(); + manager.getRetransmissionQueue().start(); + manager.getRetransmissionQueue().addUnacknowledged(message); + + RMStore store = manager.getStore(); + if (null != store) { + try { + Source s = manager.getSource(message); + RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true); + Identifier sid = rmps.getSequence().getIdentifier(); + SourceSequence ss = s.getSequence(sid); + RMMessage msg = new RMMessage(); + msg.setMessageNumber(rmps.getSequence().getMessageNumber()); + if (!MessageUtils.isRequestor(message)) { + AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, true); + if (null != maps && null != maps.getTo()) { + msg.setTo(maps.getTo().getValue()); + } + } + msg.setContent(bis); + store.persistOutgoing(ss, msg); + } catch (RMException e) { + // ignore + } + } + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (SOAPException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } private String getAddressingNamespace(AddressingProperties maps) { @@ -223,47 +247,6 @@ public class RMOutInterceptor extends Ab return wsaNamespace; } - void addAcknowledgements(Destination destination, - RMProperties rmpsOut, - Identifier inSeqId, - AttributedURIType to) { - for (DestinationSequence seq : destination.getAllSequences()) { - if (!seq.sendAcknowledgement()) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("no need to add acknowledgements for sequence " - + seq.getIdentifier().getValue()); - } - continue; - } - String address = seq.getAcksTo().getAddress().getValue(); - if (!to.getValue().equals(address)) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("sequences acksTo address (" + address - + ") does not match to address (" + to.getValue() + ")"); - } - continue; - } - // there may be multiple sources with anonymous acksTo - if (RMUtils.getAddressingConstants().getAnonymousURI().equals(address) - && !AbstractSequence.identifierEquals(seq.getIdentifier(), inSeqId)) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("sequence identifier does not match inbound sequence identifier"); - } - continue; - } - rmpsOut.addAck(seq); - } - - if (LOG.isLoggable(Level.FINE)) { - Collection acks = rmpsOut.getAcks(); - if (null == acks) { - LOG.fine("No acknowledgements added."); - } else { - LOG.fine("Added " + acks.size() + " acknowledgements."); - } - } - } - boolean isRuntimeFault(Message message) { FaultMode mode = MessageUtils.getFaultMode(message); if (null == mode) { Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Mon Feb 10 10:08:34 2014 @@ -21,6 +21,7 @@ package org.apache.cxf.ws.rm; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -48,6 +49,7 @@ import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; +import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.service.Service; import org.apache.cxf.service.model.BindingInfo; import org.apache.cxf.service.model.InterfaceInfo; @@ -116,6 +118,7 @@ public class RMManager { private DestinationPolicyType destinationPolicy; private InstrumentationManager instrumentationManager; private ManagedRMManager managedManager; + private PhaseInterceptorChain retransmitChain; // ServerLifeCycleListener @@ -605,7 +608,7 @@ public class RMManager { RMContextUtils.storeMAPs(maps, message, true, false); } - message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream()); +// message.put(RMMessageConstants.SAVED_CONTENT, m.getCachedOutputStream()); RMContextUtils.setProtocolVariation(message, ss.getProtocol()); retransmissionQueue.addUnacknowledged(message); @@ -710,4 +713,35 @@ public class RMManager { return sid; } } + + /** + * Clones and saves the interceptor chain the first time this is called, so that it can be used for retransmission. + * Calls after the first are ignored. + * + * @param msg + */ + public void initializeInterceptorChain(Message msg) { + if (retransmitChain == null) { + LOG.info("Setting retransmit chain from message"); + Set> formats = msg.getContentFormats(); + int i = 0; + for (Class clas: formats) { + Object content = msg.getContent(clas); + LOG.info("Found content " + content + " of type " + clas.getName()); + i++; + } + LOG.info("Total " + i); + PhaseInterceptorChain chain = (PhaseInterceptorChain)msg.getInterceptorChain(); + retransmitChain = chain.cloneChain(); + } + } + + /** + * Get interceptor chain for retransmitting a message. + * + * @return chain + */ + public PhaseInterceptorChain getRetransmitChain() { + return retransmitChain.cloneChain(); + } } Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Mon Feb 10 10:08:34 2014 @@ -20,32 +20,20 @@ package org.apache.cxf.ws.rm; import java.util.Collection; -import java.util.List; -import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.cxf.Bus; -import org.apache.cxf.binding.Binding; import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.message.Exchange; -import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.FaultMode; import org.apache.cxf.message.Message; -import org.apache.cxf.message.MessageContentsList; import org.apache.cxf.message.MessageUtils; -import org.apache.cxf.service.Service; -import org.apache.cxf.service.model.BindingInfo; -import org.apache.cxf.service.model.BindingOperationInfo; -import org.apache.cxf.service.model.OperationInfo; +import org.apache.cxf.phase.Phase; import org.apache.cxf.ws.addressing.AddressingProperties; import org.apache.cxf.ws.addressing.AttributedURIType; import org.apache.cxf.ws.addressing.ContextUtils; -import org.apache.cxf.ws.addressing.MAPAggregator; import org.apache.cxf.ws.rm.v200702.Identifier; import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement; -import org.apache.cxf.ws.rm.v200702.TerminateSequenceType; +import org.apache.cxf.ws.security.wss4j.PolicyBasedWSS4JOutInterceptor; /** * @@ -55,7 +43,8 @@ public class RMOutInterceptor extends Ab private static final Logger LOG = LogUtils.getL7dLogger(RMOutInterceptor.class); public RMOutInterceptor() { - addAfter(MAPAggregator.class.getName()); + super(Phase.POST_PROTOCOL); + addBefore(PolicyBasedWSS4JOutInterceptor.PolicyBasedWSS4JOutInterceptorInternal.class.getName()); } protected void handle(Message msg) throws SequenceFault, RMException { @@ -66,24 +55,14 @@ public class RMOutInterceptor extends Ab } if (isRuntimeFault(msg)) { - LogUtils.log(LOG, Level.WARNING, "RUNTIME_FAULT_MSG"); - // in case of a SequenceFault or other WS-RM related fault, set action appropriately. - // the received inbound maps is available to extract some values in case if needed. - Throwable cause = msg.getContent(Exception.class).getCause(); - if (cause instanceof SequenceFault || cause instanceof RMException) { - maps.getAction().setValue(getAddressingNamespace(maps) + "/fault"); - } return; } - - Source source = getManager().getSource(msg); RMConfiguration config = getManager().getEffectiveConfiguration(msg); String wsaNamespace = config.getAddressingNamespace(); String rmNamespace = config.getRMNamespace(); ProtocolVariation protocol = ProtocolVariation.findVariant(rmNamespace, wsaNamespace); RMContextUtils.setProtocolVariation(msg, protocol); - maps.exposeAs(wsaNamespace); Destination destination = getManager().getDestination(msg); String action = null; @@ -98,87 +77,14 @@ public class RMOutInterceptor extends Ab boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action); boolean isPartialResponse = MessageUtils.isPartialResponse(msg); RMConstants constants = protocol.getConstants(); - boolean isLastMessage = constants.getCloseSequenceAction().equals(action); - - if (isApplicationMessage && !isPartialResponse) { - addRetransmissionInterceptor(msg); - } - RMProperties rmpsOut = RMContextUtils.retrieveRMProperties(msg, true); - if (null == rmpsOut) { - rmpsOut = new RMProperties(); - rmpsOut.exposeAs(protocol.getWSRMNamespace()); - RMContextUtils.storeRMProperties(msg, rmpsOut, true); - } - - // Activate process response for oneWay - if (msg.getExchange().isOneWay()) { - msg.getExchange().put(Message.PROCESS_ONEWAY_RESPONSE, true); - } - RMProperties rmpsIn = null; Identifier inSeqId = null; - long inMessageNumber = 0; - + if (isApplicationMessage) { - rmpsIn = RMContextUtils.retrieveRMProperties(msg, false); + RMProperties rmpsIn = RMContextUtils.retrieveRMProperties(msg, false); if (null != rmpsIn && null != rmpsIn.getSequence()) { inSeqId = rmpsIn.getSequence().getIdentifier(); - inMessageNumber = rmpsIn.getSequence().getMessageNumber(); - } - ContextUtils.storeDeferUncorrelatedMessageAbort(msg); - } - - Map invocationContext = (Map)msg.get(Message.INVOCATION_CONTEXT); - if ((isApplicationMessage || (isLastMessage && invocationContext != null)) && !isPartialResponse) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue())); - } - - // get the current sequence, requesting the creation of a new one if necessary - - synchronized (source) { - SourceSequence seq = null; - if (isLastMessage) { - seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName()); - } else { - seq = getManager().getSequence(inSeqId, msg, maps); - } - assert null != seq; - - // increase message number and store a sequence type object in - // context - seq.nextMessageNumber(inSeqId, inMessageNumber, isLastMessage); - - if (Boolean.TRUE.equals(msg.getContextualProperty(RMManager.WSRM_LAST_MESSAGE_PROPERTY))) { - // mark the message as the last one - seq.setLastMessage(true); - } - - rmpsOut.setSequence(seq); - - // if this was the last message in the sequence, reset the - // current sequence so that a new one will be created next - // time the handler is invoked - - if (seq.isLastMessage()) { - source.setCurrent(null); - } - } - } else if (!MessageUtils.isRequestor(msg) && constants.getCreateSequenceAction().equals(action)) { - maps.getAction().setValue(constants.getCreateSequenceResponseAction()); - } else if (isPartialResponse && action == null - && isResponseToAction(msg, constants.getSequenceAckAction())) { - Collection acks = rmpsIn.getAcks(); - if (acks.size() == 1) { - SourceSequence ss = source.getSequence(acks.iterator().next().getIdentifier()); - if (ss != null && ss.allAcknowledged()) { - setAction(maps, constants.getTerminateSequenceAction()); - setTerminateSequence(msg, ss.getIdentifier(), protocol); - msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE); - // removing this sequence now. See the comment in SourceSequence.setAcknowledged() - source.removeSequence(ss); - } } } @@ -200,28 +106,6 @@ public class RMOutInterceptor extends Ab assertReliability(msg); } - - private void addRetransmissionInterceptor(Message msg) { - RetransmissionInterceptor ri = new RetransmissionInterceptor(); - ri.setManager(getManager()); - // TODO: - // On the server side: If a fault occurs after this interceptor we will switch - // interceptor chains (if this is not already a fault message) and therefore need to - // make sure the retransmission interceptor is added to the fault chain - // - msg.getInterceptorChain().add(ri); - LOG.fine("Added RetransmissionInterceptor to chain."); - - getManager().getRetransmissionQueue().start(); - } - - private String getAddressingNamespace(AddressingProperties maps) { - String wsaNamespace = maps.getNamespaceURI(); - if (wsaNamespace == null) { - getManager().getConfiguration().getAddressingNamespace(); - } - return wsaNamespace; - } void addAcknowledgements(Destination destination, RMProperties rmpsOut, @@ -272,49 +156,6 @@ public class RMOutInterceptor extends Ab return FaultMode.CHECKED_APPLICATION_FAULT != mode; } - private boolean isResponseToAction(Message msg, String action) { - AddressingProperties inMaps = RMContextUtils.retrieveMAPs(msg, false, false); - String inAction = null; - if (null != inMaps.getAction()) { - inAction = inMaps.getAction().getValue(); - } - return action.equals(inAction); - } - - private void setTerminateSequence(Message msg, Identifier identifier, ProtocolVariation protocol) - throws RMException { - TerminateSequenceType ts = new TerminateSequenceType(); - ts.setIdentifier(identifier); - MessageContentsList contents = - new MessageContentsList(new Object[]{protocol.getCodec().convertToSend(ts)}); - msg.setContent(List.class, contents); - - // create a new exchange for this output-only exchange - Exchange newex = new ExchangeImpl(); - Exchange oldex = msg.getExchange(); - - newex.put(Bus.class, oldex.getBus()); - newex.put(Endpoint.class, oldex.getEndpoint()); - newex.put(Service.class, oldex.getEndpoint().getService()); - newex.put(Binding.class, oldex.getEndpoint().getBinding()); - newex.setConduit(oldex.getConduit(msg)); - newex.setDestination(oldex.getDestination()); - - //Setup the BindingOperationInfo - RMEndpoint rmep = getManager().getReliableEndpoint(msg); - OperationInfo oi = rmep.getEndpoint(protocol).getEndpointInfo().getService().getInterface() - .getOperation(protocol.getConstants().getTerminateSequenceAnonymousOperationName()); - BindingInfo bi = rmep.getBindingInfo(protocol); - BindingOperationInfo boi = bi.getOperation(oi); - - newex.put(BindingInfo.class, bi); - newex.put(BindingOperationInfo.class, boi); - newex.put(OperationInfo.class, boi.getOperationInfo()); - - msg.setExchange(newex); - newex.setOutMessage(msg); - } - private static void setAction(AddressingProperties maps, String action) { AttributedURIType actionURI = new AttributedURIType(); actionURI.setValue(action); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Mon Feb 10 10:08:34 2014 @@ -19,7 +19,6 @@ package org.apache.cxf.ws.rm; -import java.io.OutputStream; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,9 +27,7 @@ import javax.management.ObjectName; import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Endpoint; -import org.apache.cxf.io.WriteOnCloseOutputStream; import org.apache.cxf.management.ManagementConstants; -import org.apache.cxf.message.Message; import org.apache.cxf.ws.addressing.AddressingConstants; public final class RMUtils { @@ -136,17 +133,6 @@ public final class RMUtils { + endpoint.getEndpointInfo().getName() + "@" + busId; } - public static WriteOnCloseOutputStream createCachedStream(Message message, OutputStream os) { - // We need to ensure that we have an output stream which won't start writing the - // message until we have a chance to send a createsequence - if (!(os instanceof WriteOnCloseOutputStream)) { - WriteOnCloseOutputStream cached = new WriteOnCloseOutputStream(os); - message.setContent(OutputStream.class, cached); - os = cached; - } - return (WriteOnCloseOutputStream) os; - } - public static ObjectName getManagedObjectName(RMManager manager) throws JMException { StringBuilder buffer = new StringBuilder(); writeTypeProperty(buffer, manager.getBus(), "WSRM.Manager"); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Mon Feb 10 10:08:34 2014 @@ -24,6 +24,7 @@ import org.apache.cxf.common.injection.N import org.apache.cxf.feature.AbstractFeature; import org.apache.cxf.interceptor.InterceptorProvider; import org.apache.cxf.ws.rm.RMCaptureInInterceptor; +import org.apache.cxf.ws.rm.RMCaptureOutInterceptor; import org.apache.cxf.ws.rm.RMDeliveryInterceptor; import org.apache.cxf.ws.rm.RMInInterceptor; import org.apache.cxf.ws.rm.RMManager; @@ -33,7 +34,8 @@ import org.apache.cxf.ws.rm.manager.Dest import org.apache.cxf.ws.rm.manager.RM10AddressingNamespaceType; import org.apache.cxf.ws.rm.manager.SourcePolicyType; import org.apache.cxf.ws.rm.persistence.RMStore; -import org.apache.cxf.ws.rm.soap.RMSoapInterceptor; +import org.apache.cxf.ws.rm.soap.RMSoapInInterceptor; +import org.apache.cxf.ws.rm.soap.RMSoapOutInterceptor; import org.apache.cxf.ws.rmp.v200502.RMAssertion; /** @@ -53,8 +55,10 @@ public class RMFeature extends AbstractF private RMInInterceptor rmLogicalIn = new RMInInterceptor(); private RMOutInterceptor rmLogicalOut = new RMOutInterceptor(); private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor(); - private RMSoapInterceptor rmCodec = new RMSoapInterceptor(); + private RMSoapOutInterceptor rmOutCodec = new RMSoapOutInterceptor(); + private RMSoapInInterceptor rmInCodec = new RMSoapInInterceptor(); private RMCaptureInInterceptor rmCaptureIn = new RMCaptureInInterceptor(); + private RMCaptureOutInterceptor rmCaptureOut = new RMCaptureOutInterceptor(); public void setDeliveryAssurance(DeliveryAssuranceType da) { deliveryAssurance = da; @@ -114,23 +118,26 @@ public class RMFeature extends AbstractF rmLogicalOut.setBus(bus); rmDelivery.setBus(bus); rmCaptureIn.setBus(bus); + rmCaptureOut.setBus(bus); provider.getInInterceptors().add(rmLogicalIn); - provider.getInInterceptors().add(rmCodec); + provider.getInInterceptors().add(rmInCodec); provider.getInInterceptors().add(rmDelivery); if (null != store) { provider.getInInterceptors().add(rmCaptureIn); } provider.getOutInterceptors().add(rmLogicalOut); - provider.getOutInterceptors().add(rmCodec); + provider.getOutInterceptors().add(rmOutCodec); + provider.getOutInterceptors().add(rmCaptureOut); provider.getInFaultInterceptors().add(rmLogicalIn); - provider.getInFaultInterceptors().add(rmCodec); + provider.getInFaultInterceptors().add(rmInCodec); provider.getInFaultInterceptors().add(rmDelivery); provider.getOutFaultInterceptors().add(rmLogicalOut); - provider.getOutFaultInterceptors().add(rmCodec); + provider.getOutFaultInterceptors().add(rmOutCodec); + provider.getOutFaultInterceptors().add(rmCaptureOut); } } Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java Mon Feb 10 10:08:34 2014 @@ -18,15 +18,14 @@ */ package org.apache.cxf.ws.rm.persistence; -import java.io.IOException; import java.io.InputStream; - -import org.apache.cxf.helpers.IOUtils; -import org.apache.cxf.io.CachedOutputStream; +import java.util.Collections; +import java.util.List; public class RMMessage { - private CachedOutputStream content; + private InputStream content; + private List attachments = Collections.emptyList(); private long messageNumber; private String to; @@ -46,55 +45,12 @@ public class RMMessage { messageNumber = mn; } - - /** - * Returns the content of the message as an input stream. - * @return the content - * @deprecated - */ - public byte[] getContent() { - byte[] bytes = null; - try { - bytes = content != null ? content.getBytes() : null; - } catch (IOException e) { - // ignore and treat it as null - } - return bytes; - } - - - /** - * Sets the message content as an input stream. - * @param content the message content - * @deprecated - */ - public void setContent(byte[] c) { - content = new CachedOutputStream(); - content.holdTempFile(); - try { - content.write(c); - } catch (IOException e) { - // ignore - } - } - /** * Sets the message content using the input stream. * @param in - * @throws IOException - */ - public void setContent(InputStream in) throws IOException { - content = new CachedOutputStream(); - content.holdTempFile(); - IOUtils.copy(in, content); - } - - /** - * Sets the message content using the cached output stream. - * @param c */ - public void setContent(CachedOutputStream c) { - content = c; + public void setContent(InputStream in) { + content = in; } /** @@ -119,24 +75,24 @@ public class RMMessage { * @return * @throws IOException */ - public InputStream getInputStream() throws IOException { - return content != null ? content.getInputStream() : null; + public InputStream getContent() { + return content; } - + /** - * Returns the associated cached output stream. - * @return + * Returns the list of attachments. + * @return list (non-null) */ - public CachedOutputStream getCachedOutputStream() { - return content; + public List getAttachments() { + return attachments; } - + /** - * Returns the length of the message content in bytes. - * - * @return + * Set the list of attachments. + * @param attaches (non-null) */ - public long getSize() { - return content != null ? content.size() : -1L; + public void setAttachments(List attaches) { + assert attaches != null; + attachments = attaches; } } Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Mon Feb 10 10:08:34 2014 @@ -90,12 +90,19 @@ public class RMTxStore implements RMStor {"SEND_TO", "VARCHAR(256)"}, {"CONTENT", "BLOB"}}; private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"}; - + private static final String[][] ATTACHMENTS_TABLE_COLS + = {{"SEQ_ID", "VARCHAR(256) NOT NULL"}, + {"MSG_NO", "DECIMAL(19, 0) NOT NULL"}, + {"ATTACHMENT_NO", "DECIMAL(19, 0) NOT NULL"}, + {"DATA", "BLOB"}}; + private static final String[] ATTACHMENTS_TABLE_KEYS = {"SEQ_ID", "MSG_NO", "ATTACHMENT_NO"}; private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES"; private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES"; private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES"; private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES"; + private static final String INBOUND_ATTS_TABLE_NAME = "CXF_RM_INBOUND_ATTACHMENTS"; + private static final String OUTBOUND_ATTS_TABLE_NAME = "CXF_RM_OUTBOUND_ATTACHMENTS"; private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME, @@ -105,8 +112,9 @@ public class RMTxStore implements RMStor buildCreateTableStatement(SRC_SEQUENCES_TABLE_NAME, SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS); private static final String CREATE_MESSAGES_TABLE_STMT = - buildCreateTableStatement("{0}", - MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS); + buildCreateTableStatement("{0}", MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS); + private static final String CREATE_ATTACHMENTS_TABLE_STMT = + buildCreateTableStatement("{0}", ATTACHMENTS_TABLE_COLS, ATTACHMENTS_TABLE_KEYS); private static final String CREATE_DEST_SEQUENCE_STMT_STR = "INSERT INTO CXF_RM_DEST_SEQUENCES " @@ -128,6 +136,10 @@ public class RMTxStore implements RMStor = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT) VALUES(?, ?, ?, ?)"; private static final String DELETE_MESSAGE_STMT_STR = "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; + private static final String CREATE_ATTACHMENT_STMT_STR + = "INSERT INTO {0} (SEQ_ID, MSG_NO, ATTACHMENT_NO, DATA) VALUES(?, ?, ?, ?)"; + private static final String DELETE_ATTACHMENTS_STMT_STR = + "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; private static final String SELECT_DEST_SEQUENCE_STMT_STR = "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES " + "WHERE SEQ_ID = ?"; @@ -142,6 +154,8 @@ public class RMTxStore implements RMStor + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?"; private static final String SELECT_MESSAGES_STMT_STR = "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?"; + private static final String SELECT_ATTACHMENTS_STMT_STR = + "SELECT ATTACHMENT_NO, DATA FROM {0} WHERE SEQ_ID = ?, MSG_ID = ?"; private static final String ALTER_TABLE_STMT_STR = "ALTER TABLE {0} ADD {1} {2}"; private static final String CREATE_INBOUND_MESSAGE_STMT_STR = @@ -156,6 +170,18 @@ public class RMTxStore implements RMStor MessageFormat.format(SELECT_MESSAGES_STMT_STR, INBOUND_MSGS_TABLE_NAME); private static final String SELECT_OUTBOUND_MESSAGES_STMT_STR = MessageFormat.format(SELECT_MESSAGES_STMT_STR, OUTBOUND_MSGS_TABLE_NAME); + private static final String CREATE_INBOUND_ATTACHMENT_STMT_STR = + MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, INBOUND_ATTS_TABLE_NAME); + private static final String CREATE_OUTBOUND_ATTACHMENT_STMT_STR = + MessageFormat.format(CREATE_ATTACHMENT_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); + private static final String DELETE_INBOUND_ATTACHMENTS_STMT_STR = + MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME); + private static final String DELETE_OUTBOUND_ATTACHMENTS_STMT_STR = + MessageFormat.format(DELETE_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); + private static final String SELECT_INBOUND_ATTACHMENTS_STMT_STR = + MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, INBOUND_ATTS_TABLE_NAME); + private static final String SELECT_OUTBOUND_ATTACHMENTS_STMT_STR = + MessageFormat.format(SELECT_ATTACHMENTS_STMT_STR, OUTBOUND_ATTS_TABLE_NAME); // create_schema may not work for several reasons, if so, create one manually private static final String CREATE_SCHEMA_STMT_STR = "CREATE SCHEMA {0}"; @@ -587,34 +613,44 @@ public class RMTxStore implements RMStor public Collection getMessages(Identifier sid, boolean outbound) { Connection con = verifyConnection(); - PreparedStatement stmt = null; + PreparedStatement stmt1 = null; + PreparedStatement stmt2 = null; SQLException conex = null; Collection msgs = new ArrayList(); - ResultSet res = null; + ResultSet res1 = null; + ResultSet res2 = null; try { - stmt = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR); + stmt1 = getStatement(con, outbound ? SELECT_OUTBOUND_MESSAGES_STMT_STR : SELECT_INBOUND_MESSAGES_STMT_STR); - stmt.setString(1, sid.getValue()); - res = stmt.executeQuery(); - while (res.next()) { - long mn = res.getLong(1); - String to = res.getString(2); - Blob blob = res.getBlob(3); + stmt1.setString(1, sid.getValue()); + res1 = stmt1.executeQuery(); + while (res1.next()) { + long mn = res1.getLong(1); + String to = res1.getString(2); + Blob blob = res1.getBlob(3); RMMessage msg = new RMMessage(); msg.setMessageNumber(mn); msg.setTo(to); msg.setContent(blob.getBinaryStream()); msgs.add(msg); + stmt2 = getStatement(con, outbound + ? SELECT_OUTBOUND_ATTACHMENTS_STMT_STR : SELECT_INBOUND_ATTACHMENTS_STMT_STR); + stmt2.setString(1, sid.getValue()); + stmt2.setLong(2, mn); + res2 = stmt2.executeQuery(); + List attaches = new ArrayList(); + while (res2.next()) { + attaches.add(res2.getBinaryStream(1)); + } + msg.setAttachments(attaches); } } catch (SQLException ex) { conex = ex; LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG" : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex); - } catch (IOException ex) { - LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG" - : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex); } finally { - releaseResources(stmt, res); + releaseResources(stmt2, res2); + releaseResources(stmt1, res1); updateConnectionState(con, conex); } return msgs; @@ -628,7 +664,7 @@ public class RMTxStore implements RMStor updateDestinationSequence(con, seq); - if (msg != null && msg.getCachedOutputStream() != null) { + if (msg != null && msg.getContent() != null) { storeMessage(con, seq.getIdentifier(), msg, false); } @@ -653,7 +689,7 @@ public class RMTxStore implements RMStor updateSourceSequence(con, seq); - if (msg != null && msg.getCachedOutputStream() != null) { + if (msg != null && msg.getContent() != null) { storeMessage(con, seq.getIdentifier(), msg, true); } @@ -673,18 +709,24 @@ public class RMTxStore implements RMStor public void removeMessages(Identifier sid, Collection messageNrs, boolean outbound) { Connection con = verifyConnection(); - PreparedStatement stmt = null; + PreparedStatement stmt1 = null; + PreparedStatement stmt2 = null; SQLException conex = null; try { - stmt = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR); + stmt1 = getStatement(con, outbound ? DELETE_OUTBOUND_MESSAGE_STMT_STR : DELETE_INBOUND_MESSAGE_STMT_STR); + stmt2 = getStatement(con, outbound + ? DELETE_OUTBOUND_ATTACHMENTS_STMT_STR : DELETE_INBOUND_ATTACHMENTS_STMT_STR); beginTransaction(); - stmt.setString(1, sid.getValue()); + stmt1.setString(1, sid.getValue()); + stmt2.setString(1, sid.getValue()); for (Long messageNr : messageNrs) { - stmt.setLong(2, messageNr); - stmt.execute(); + stmt2.setLong(2, messageNr); + stmt2.execute(); + stmt1.setLong(2, messageNr); + stmt1.execute(); } commit(con); @@ -694,7 +736,8 @@ public class RMTxStore implements RMStor abort(con); throw new RMStoreException(ex); } finally { - releaseResources(stmt, null); + releaseResources(stmt2, null); + releaseResources(stmt1, null); updateConnectionState(con, conex); } } @@ -740,31 +783,43 @@ public class RMTxStore implements RMStor LOG.log(Level.FINE, "Storing {0} message number {1} for sequence {2}, to = {3}", new Object[] {outbound ? "outbound" : "inbound", nr, id, to}); } - InputStream msgin = null; - PreparedStatement stmt = null; + PreparedStatement stmt1 = null; + PreparedStatement stmt2 = null; try { - msgin = msg.getInputStream(); - stmt = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR); + InputStream msgin = msg.getContent(); + stmt1 = getStatement(con, outbound ? CREATE_OUTBOUND_MESSAGE_STMT_STR : CREATE_INBOUND_MESSAGE_STMT_STR); - int i = 1; - stmt.setString(i++, id); - stmt.setLong(i++, nr); - stmt.setString(i++, to); - stmt.setBinaryStream(i++, msgin, (int)msg.getSize()); - stmt.execute(); + stmt1.setString(1, id); + stmt1.setLong(2, nr); + stmt1.setString(3, to); + stmt1.setBinaryStream(4, msgin); + stmt1.execute(); + + List attachments = msg.getAttachments(); + if (attachments.size() > 0) { + stmt2 = getStatement(con, outbound + ? CREATE_OUTBOUND_ATTACHMENT_STMT_STR : CREATE_INBOUND_ATTACHMENT_STMT_STR); + stmt2.setString(1, id); + stmt2.setLong(2, nr); + for (int i = 0; i < attachments.size(); i++) { + stmt2.setLong(3, i); + stmt2.setBinaryStream(4, attachments.get(i)); + stmt2.execute(); + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, + "Successfully stored {0} attachment {1} for message number {2} in sequence {3}", + new Object[] {outbound ? "outbound" : "inbound", i, nr, id}); + } + } + } + if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}", new Object[] {outbound ? "outbound" : "inbound", nr, id}); } } finally { - if (msgin != null) { - try { - msgin.close(); - } catch (IOException e) { - // ignore - } - } - releaseResources(stmt, null); + releaseResources(stmt1, null); + releaseResources(stmt2, null); } } @@ -806,10 +861,9 @@ public class RMTxStore implements RMStor long lastMessageNr = seq.getLastMessageNumber(); stmt.setLong(1, lastMessageNr); - InputStream is = PersistenceUtils.getInstance() - .serialiseAcknowledgment(seq.getAcknowledgment()); + InputStream is = PersistenceUtils.getInstance().serialiseAcknowledgment(seq.getAcknowledgment()); stmt.setBinaryStream(2, is, is.available()); - stmt.setString(3, seq.getIdentifier() .getValue()); + stmt.setString(3, seq.getIdentifier().getValue()); stmt.execute(); } finally { releaseResources(stmt, null); @@ -876,6 +930,24 @@ public class RMTxStore implements RMStor stmt.close(); } } + + for (String tableName : new String[] {OUTBOUND_ATTS_TABLE_NAME, INBOUND_ATTS_TABLE_NAME}) { + stmt = con.createStatement(); + try { + stmt.executeUpdate(MessageFormat.format(CREATE_ATTACHMENTS_TABLE_STMT, tableName)); + } catch (SQLException ex) { + if (!isTableExistsError(ex)) { + throw ex; + } else { + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Table " + tableName + " already exists."); + } + verifyTable(con, tableName, ATTACHMENTS_TABLE_COLS); + } + } finally { + stmt.close(); + } + } } finally { if (connection == null && con != null) { con.close(); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1566555&r1=1566554&r2=1566555&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Mon Feb 10 10:08:34 2014 @@ -25,10 +25,12 @@ import javax.xml.namespace.QName; import org.apache.cxf.Bus; import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider; +import org.apache.cxf.ws.rm.RMCaptureOutInterceptor; import org.apache.cxf.ws.rm.RMDeliveryInterceptor; import org.apache.cxf.ws.rm.RMInInterceptor; import org.apache.cxf.ws.rm.RMOutInterceptor; -import org.apache.cxf.ws.rm.soap.RMSoapInterceptor; +import org.apache.cxf.ws.rm.soap.RMSoapInInterceptor; +import org.apache.cxf.ws.rm.soap.RMSoapOutInterceptor; public class RMPolicyInterceptorProvider extends AbstractPolicyInterceptorProvider { @@ -36,7 +38,9 @@ public class RMPolicyInterceptorProvider private static final Collection ASSERTION_TYPES; private RMInInterceptor rmIn = new RMInInterceptor(); private RMOutInterceptor rmOut = new RMOutInterceptor(); - private RMSoapInterceptor rmSoap = new RMSoapInterceptor(); + private RMCaptureOutInterceptor rmCaptureOut = new RMCaptureOutInterceptor(); + private RMSoapOutInterceptor rmOutSoap = new RMSoapOutInterceptor(); + private RMSoapInInterceptor rmInSoap = new RMSoapInInterceptor(); private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor(); static { @@ -51,20 +55,23 @@ public class RMPolicyInterceptorProvider super(ASSERTION_TYPES); rmIn.setBus(bus); rmOut.setBus(bus); + rmCaptureOut.setBus(bus); rmDelivery.setBus(bus); getInInterceptors().add(rmIn); - getInInterceptors().add(rmSoap); + getInInterceptors().add(rmInSoap); getInInterceptors().add(rmDelivery); getOutInterceptors().add(rmOut); - getOutInterceptors().add(rmSoap); + getOutInterceptors().add(rmCaptureOut); + getOutInterceptors().add(rmOutSoap); getInFaultInterceptors().add(rmIn); - getInFaultInterceptors().add(rmSoap); + getInFaultInterceptors().add(rmInSoap); getInFaultInterceptors().add(rmDelivery); getOutFaultInterceptors().add(rmOut); - getOutFaultInterceptors().add(rmSoap); + getOutFaultInterceptors().add(rmCaptureOut); + getOutFaultInterceptors().add(rmOutSoap); } }