cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject [3/3] git commit: Revamp how WS-RM captures the message to capture while writing to normal output.
Date Thu, 13 Mar 2014 19:29:37 GMT
Revamp how WS-RM captures the message to capture while writing to normal output.


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/79bd4f37
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/79bd4f37
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/79bd4f37

Branch: refs/heads/master
Commit: 79bd4f37f7fea32346195180566d1c2562d79871
Parents: f8e3111
Author: Daniel Kulp <dkulp@apache.org>
Authored: Thu Mar 13 15:27:08 2014 -0400
Committer: Daniel Kulp <dkulp@apache.org>
Committed: Thu Mar 13 15:29:27 2014 -0400

----------------------------------------------------------------------
 .../apache/cxf/ws/rm/CapturingXMLWriter.java    | 413 +++++++++++++++++++
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      | 210 ++++------
 .../apache/cxf/ws/rm/RMMessageConstants.java    |   4 +-
 .../cxf/systest/ws/rm/MessageLossSimulator.java |   5 +-
 .../apache/cxf/systest/ws/rm/SequenceTest.java  |   9 +-
 5 files changed, 503 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/79bd4f37/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/CapturingXMLWriter.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/CapturingXMLWriter.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/CapturingXMLWriter.java
new file mode 100644
index 0000000..6d49252
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/CapturingXMLWriter.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.ws.rm;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.staxutils.transform.OutTransformWriter;
+
+public class CapturingXMLWriter implements XMLStreamWriter {
+    
+    XMLStreamWriter delegate;
+    XMLStreamWriter capture;
+    LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
+    Throwable throwable;
+
+    public CapturingXMLWriter(XMLStreamWriter del) {
+        delegate = del;
+        capture = StaxUtils.createXMLStreamWriter(bos, "UTF-8");
+        
+        Map<String, String> map = new HashMap<String, String>();
+        map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}Sequence", "");
+        map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}SequenceAcknowledgement", "");
+        map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}Sequence", "");
+        map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}SequenceAcknowledgement",
"");
+        
+        capture = new OutTransformWriter(capture,
+                                         map,
+                                         Collections.<String, String>emptyMap(),
+                                         Collections.<String>emptyList(),
+                                         false, 
+                                         null);
+    }
+    
+    public void setDefaultNamespace(String uri) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.setDefaultNamespace(uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.setDefaultNamespace(uri);
+    }
+    public void setNamespaceContext(NamespaceContext ctx) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.setNamespaceContext(ctx);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.setNamespaceContext(ctx);
+    }
+    public void setPrefix(String pfx, String uri) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.setPrefix(pfx, uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.setPrefix(pfx, uri);
+    }
+
+    public void writeAttribute(String prefix, String uri, 
+                               String local, String value) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeAttribute(prefix, uri, local, value);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeAttribute(prefix, uri, local, value);
+    }
+
+    public void writeAttribute(String uri, String local, String value) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeAttribute(uri, local, value);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeAttribute(uri, local, value);
+    }
+
+    public void writeAttribute(String local, String value) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeAttribute(local, value);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeAttribute(local, value);
+    }
+
+    public void writeCData(String cdata) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeCData(cdata);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeCData(cdata);
+    }
+
+    public void writeCharacters(char[] arg0, int arg1, int arg2) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeCharacters(arg0, arg1, arg2);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeCharacters(arg0, arg1, arg2);
+    }
+
+    public void writeCharacters(String text) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeCharacters(text);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeCharacters(text);
+    }
+
+    public void writeComment(String text) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeComment(text);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeComment(text);
+    }
+
+    public void writeDefaultNamespace(String uri) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeDefaultNamespace(uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeDefaultNamespace(uri);
+    }
+
+    public void writeDTD(String dtd) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeDTD(dtd);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeDTD(dtd);
+    }
+
+    public void writeEmptyElement(String prefix, String local, String uri) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeEmptyElement(prefix, local, uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEmptyElement(prefix, local, uri);
+    }
+
+    public void writeEmptyElement(String uri, String local) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeEmptyElement(uri, local);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEmptyElement(uri, local);
+    }
+
+    public void writeEmptyElement(String localName) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeEmptyElement(localName);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEmptyElement(localName);
+    }
+
+    public void writeEndDocument() throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeEndDocument();
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEndDocument();
+    }
+
+    public void writeEndElement() throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeEndElement();
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEndElement();
+    }
+
+    public void writeEntityRef(String ent) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeEntityRef(ent);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeEntityRef(ent);
+    }
+
+    public void writeNamespace(String prefix, String uri) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeNamespace(prefix, uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeNamespace(prefix, uri);
+    }
+
+    public void writeProcessingInstruction(String target, String data) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeProcessingInstruction(target, data);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeProcessingInstruction(target, data);
+    }
+    public void writeProcessingInstruction(String target) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeProcessingInstruction(target);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeProcessingInstruction(target);
+    }
+
+    public void writeStartDocument() throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeStartDocument();
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartDocument();
+    }
+
+    public void writeStartDocument(String encoding, String ver) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeStartDocument(encoding, ver);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartDocument(encoding, ver);
+    }
+
+    public void writeStartDocument(String ver) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeStartDocument(ver);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartDocument(ver);
+    }
+
+    public void writeStartElement(String prefix, String local, String uri) throws XMLStreamException
{
+        if (delegate != null) {            
+            try {
+                delegate.writeStartElement(prefix, local, uri);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartElement(prefix, local, uri);
+    }
+
+    public void writeStartElement(String uri, String local) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeStartElement(uri, local);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartElement(uri, local);
+    }
+
+    public void writeStartElement(String local) throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.writeStartElement(local);
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+        capture.writeStartElement(local);
+    }
+
+
+    public void close() throws XMLStreamException {
+        if (delegate != null) {            
+            try {
+                delegate.close();
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+    }
+
+    public void flush() throws XMLStreamException {
+        if (delegate != null) {
+            try {
+                delegate.flush();
+            } catch (Throwable t) {
+                stopToDelegate(t);
+            }
+        }
+    }
+
+    public String getPrefix(String uri) throws XMLStreamException {
+        if (delegate != null) {
+            return delegate.getPrefix(uri);
+        }
+        return capture.getPrefix(uri);
+    }
+
+    public NamespaceContext getNamespaceContext() {
+        if (delegate != null) {
+            return delegate.getNamespaceContext();
+        }
+        return capture.getNamespaceContext();
+    }
+    public Object getProperty(String name) throws IllegalArgumentException {
+        if (delegate != null) {
+            return delegate.getProperty(name);
+        }
+        return capture.getProperty(name);
+    }
+    
+    public LoadingByteArrayOutputStream getOutputStream() throws XMLStreamException {
+        capture.flush();
+        capture.close();
+        return bos;
+    }
+    public Throwable getThrowable() {
+        return throwable;
+    }
+    
+    //if there is some problem writing to the original output, we need to stop writing 
+    // to the output, but keep capturing the message so we can try and resend later 
+    private void stopToDelegate(Throwable t) {
+        if (throwable == null) {
+            throwable = t;
+            delegate = null;
+        }
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/79bd4f37/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 1de2687..9d8f85b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -21,51 +21,44 @@ package org.apache.cxf.ws.rm;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+
 import org.apache.cxf.Bus;
 import org.apache.cxf.binding.Binding;
-import org.apache.cxf.binding.soap.SoapMessage;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.interceptor.AbstractOutDatabindingInterceptor;
 import org.apache.cxf.interceptor.AttachmentOutInterceptor;
-import org.apache.cxf.interceptor.Interceptor;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
-import org.apache.cxf.interceptor.MessageSenderInterceptor.MessageSenderEndingInterceptor;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageContentsList;
-import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;
-import org.apache.cxf.phase.PhaseInterceptor;
-import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.BindingOperationInfo;
 import org.apache.cxf.service.model.OperationInfo;
-import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.ContextUtils;
-import org.apache.cxf.ws.policy.PolicyVerificationOutInterceptor;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
 import org.apache.cxf.ws.rm.v200702.SequenceType;
 import org.apache.cxf.ws.rm.v200702.TerminateSequenceType;
-import org.apache.cxf.ws.security.SecurityConstants;
 
 /**
  * 
@@ -203,131 +196,96 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>
 {
         // capture message if retransmission possible
         if (isApplicationMessage && !isPartialResponse) {
             getManager().initializeInterceptorChain(msg);
+            //doneCaptureMessage(msg);
             captureMessage(msg);
         }
     }
-    
     private void captureMessage(Message message) {
-        Message capture = new MessageImpl();
-        capture.setId(message.getId());
-        capture.put(RMMessageConstants.MESSAGE_CAPTURE_CHAIN, Boolean.TRUE);
-        Iterator<Class<?>> citer = message.getContentFormats().iterator();
-        while (citer.hasNext()) {
-            Class<?> clas = citer.next();
-            if (OutputStream.class != clas) {
-                
-                // clone contents list so changes won't effect original message
-                Object content = message.getContent(clas);
-                if (content instanceof MessageContentsList) {
-                    content = new MessageContentsList((MessageContentsList)content);
-                }
-                capture.setContent(clas, content);
-            }
-        }
-        Iterator<String> kiter = message.keySet().iterator();
-        while (kiter.hasNext()) {
-            String key = kiter.next();
-            capture.put(key, message.get(key));
-        }
-        kiter = message.getContextualPropertyKeys().iterator();
-        while (kiter.hasNext()) {
-            String key = kiter.next();
-            capture.setContextualProperty(key, message.getContextualProperty(key));
-        }
-        if (message instanceof SoapMessage) {
-            capture = new SoapMessage(capture);
-            ((SoapMessage)capture).setVersion(((SoapMessage)message).getVersion());
+        message.put(RMMessageConstants.MESSAGE_CAPTURE, Boolean.TRUE);
+
+        message.getInterceptorChain().add(new CaptureStart());
+        message.getInterceptorChain().add(new CaptureEnd());
+    }    
+    
+    private class CaptureStart extends AbstractPhaseInterceptor<Message> {
+        public CaptureStart() {
+            super(Phase.PRE_PROTOCOL);
         }
-        
-        // eliminate all other RM interceptors, along with attachment and security and message
loss interceptors, from
-        //  capture chain
-        PhaseInterceptorChain chain = (PhaseInterceptorChain)message.getInterceptorChain();
-        PhaseInterceptorChain cchain = chain.cloneChain();
-        ListIterator<Interceptor<? extends Message>> iterator = cchain.getIterator();
-        boolean past = false;
-        boolean ending = false;
-        while (iterator.hasNext()) {
-            PhaseInterceptor<? extends Message> intercept = (PhaseInterceptor<?
extends Message>)iterator.next();
-            String id = intercept.getId();
-            if (RMCaptureOutInterceptor.class.getName().equals(id)) {
-                past = true;
-            } else if (past && id != null) {
-                if ((id.startsWith(RMCaptureOutInterceptor.class.getPackage().getName())
-                    && !(id.equals(RetransmissionInterceptor.class.getName())))
-                    || id.startsWith(SecurityConstants.class.getPackage().getName())
-                    || PolicyVerificationOutInterceptor.class.getName().equals(id)
-                    || AttachmentOutInterceptor.class.getName().equals(id)
-                    || LoggingOutInterceptor.class.getName().equals(id)
-                    || "org.apache.cxf.systest.ws.rm.MessageLossSimulator$MessageLossEndingInterceptor".equals(id))
{
-                    cchain.remove(intercept);
-                } else if (MessageSenderEndingInterceptor.class.getName().equals(id)) {
-                    ending = true;
-                }
-            }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
+            message.put("RM_ORIGINAL_WRITER", writer);
+            writer = new CapturingXMLWriter(writer);
+            message.put("RM_CAPTURING_WRITER", writer);
+            message.setContent(XMLStreamWriter.class, writer);
+            message.put(AbstractOutDatabindingInterceptor.DISABLE_OUTPUTSTREAM_OPTIMIZATION,
Boolean.TRUE);
         }
-        if (!ending) {
-            
-            // add normal ending interceptor back in, in case removed by MessageLossSimulator
-            cchain.add(new MessageSenderEndingInterceptor());
+    }
+    private class CaptureEnd extends AbstractPhaseInterceptor<Message> {
+        public CaptureEnd() {
+            super(Phase.POST_PROTOCOL);
         }
-        capture.setInterceptorChain(cchain);
-        LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream();
-        capture.setContent(OutputStream.class, bos);
-        ExchangeImpl captureExchange = new ExchangeImpl((ExchangeImpl)message.getExchange());
-        capture.setExchange(captureExchange);
-        captureExchange.setOutMessage(capture);
-        captureExchange.setConduit(new AbstractConduit(captureExchange.getConduit(capture).getTarget())
{
-            
-            @Override
-            public void prepare(Message message) throws IOException {
-            }
-            
-            @Override
-            protected Logger getLogger() {
-                return null;
-            }
-            
-        });
-        cchain.doInterceptStartingAfter(capture, RMCaptureOutInterceptor.class.getName());
-        try {
-            
-            RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
-            SequenceType sequence = rmps.getSequence();
-            Long number = sequence.getMessageNumber();
-            Identifier sid = sequence.getIdentifier();
-            if (LOG.isLoggable(Level.INFO)) {
-                LOG.log(Level.INFO, "Captured message " + number + " in sequence " + sid.getValue());
-            }
+
+        @Override
+        public void handleMessage(Message message) throws Fault {
+            XMLStreamWriter w = (XMLStreamWriter)message.get("RM_ORIGINAL_WRITER");
+            message.setContent(XMLStreamWriter.class, w); 
             
-            // save message for potential retransmission
-            ByteArrayInputStream bis = bos.createInputStream();
-            message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(bis));
-            RMManager manager = getManager();
-            manager.getRetransmissionQueue().start();
-            manager.getRetransmissionQueue().addUnacknowledged(message);
-            RMStore store = manager.getStore();
-            if (null != store) {
+            CapturingXMLWriter cw = (CapturingXMLWriter)message.get("RM_CAPTURING_WRITER");
+
+            try {
+                
+                RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+                SequenceType sequence = rmps.getSequence();
+                Long number = sequence.getMessageNumber();
+                Identifier sid = sequence.getIdentifier();
+                if (LOG.isLoggable(Level.INFO)) {
+                    LOG.log(Level.INFO, "Captured message " + number + " in sequence " +
sid.getValue());
+                }
                 
-                // persist message to store
-                Source s = manager.getSource(message);
-                SourceSequence ss = s.getSequence(sid);
-                RMMessage msg = new RMMessage();
-                msg.setMessageNumber(number);
-                if (!MessageUtils.isRequestor(message)) {
-                    AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false,
true);
-                    if (null != maps && null != maps.getTo()) {
-                        msg.setTo(maps.getTo().getValue());
+                // save message for potential retransmission
+                ByteArrayInputStream bis = cw.getOutputStream().createInputStream();
+                message.put(RMMessageConstants.SAVED_CONTENT, RewindableInputStream.makeRewindable(bis));
+                RMManager manager = getManager();
+                manager.getRetransmissionQueue().start();
+                manager.getRetransmissionQueue().addUnacknowledged(message);
+                RMStore store = manager.getStore();
+                if (null != store) {
+                    
+                    // persist message to store
+                    Source s = manager.getSource(message);
+                    SourceSequence ss = s.getSequence(sid);
+                    RMMessage msg = new RMMessage();
+                    msg.setMessageNumber(number);
+                    if (!MessageUtils.isRequestor(message)) {
+                        AddressingProperties maps = RMContextUtils.retrieveMAPs(message,
false, true);
+                        if (null != maps && null != maps.getTo()) {
+                            msg.setTo(maps.getTo().getValue());
+                        }
                     }
+                    msg.setContent(bis);
+                    store.persistOutgoing(ss, msg);
                 }
-                msg.setContent(bis);
-                store.persistOutgoing(ss, msg);
+                    
+            } catch (RMException e) {
+                // ignore
+            } catch (XMLStreamException e) {
+                LOG.log(Level.SEVERE, "Error persisting message", e);
+            } catch (IOException e) {
+                LOG.log(Level.SEVERE, "Error persisting message", e);
+            } 
+            if (cw.getThrowable() != null) {
+                Throwable t = cw.getThrowable();
+                RuntimeException exception = null;
+                if (t instanceof RuntimeException) {
+                    exception = (RuntimeException)t;
+                } else {
+                    exception = new Fault(t);
+                }
+                throw exception;
             }
-                
-        } catch (RMException e) {
-            // ignore
-        } catch (IOException e) {
-            LOG.log(Level.SEVERE, "Error persisting message", e);
-        } 
+        }
     }
 
     private String getAddressingNamespace(AddressingProperties maps) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/79bd4f37/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
index 8cf5af9..4012c5b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
@@ -42,8 +42,8 @@ public final class RMMessageConstants {
     /** Retransmission in progress flag (Boolean.TRUE if in progress). */
     public static final String RM_RETRANSMISSION = "org.apache.cxf.ws.rm.retransmitting";
     
-    /** Boolean property TRUE for a chain used only to capture (not send) a message. */
-    public static final String MESSAGE_CAPTURE_CHAIN = "org.apache.cxf.rm.captureOnly";
+    /** Boolean property TRUE for a chain used to capture a message. */
+    public static final String MESSAGE_CAPTURE = "org.apache.cxf.rm.capture";
     
     /** Client callback (must be instance of {@link MessageCallback}). */
     public static final String RM_CLIENT_CALLBACK = "org.apache.cxf.rm.clientCallback";

http://git-wip-us.apache.org/repos/asf/cxf/blob/79bd4f37/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
index 0beba26..1e6f40d 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageLossSimulator.java
@@ -82,9 +82,6 @@ public class MessageLossSimulator extends AbstractPhaseInterceptor<Message>
{
         if (MessageUtils.isPartialResponse(message)) {
             return;
         }
-        if (Boolean.TRUE.equals(message.get(RMMessageConstants.MESSAGE_CAPTURE_CHAIN))) {
-            return;
-        }
         if (Boolean.TRUE.equals(message.get(RMMessageConstants.RM_RETRANSMISSION))) {
             return;
         }
@@ -103,7 +100,7 @@ public class MessageLossSimulator extends AbstractPhaseInterceptor<Message>
{
                 }
             }
         }
-        
+
         InterceptorChain chain = message.getInterceptorChain();
         ListIterator<Interceptor<? extends Message>> it = chain.getIterator();
         while (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/79bd4f37/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
index f529d9c..8642542 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
@@ -53,7 +53,6 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.binding.soap.Soap11;
 import org.apache.cxf.binding.soap.SoapFault;
-import org.apache.cxf.binding.soap.interceptor.SoapOutInterceptor;
 import org.apache.cxf.bus.spring.SpringBusFactory;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Client;
@@ -467,7 +466,7 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         assertEquals("THREE", greeter.greetMe("three"));
 
         // TODO: temporarily commented out for first version of new RM code
-//        verifyTwowayNonAnonymous();
+        //verifyTwowayNonAnonymous();
     }
 
     @Test
@@ -748,8 +747,7 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         
         class MessageNumberInterceptor extends AbstractPhaseInterceptor<Message> {
             public MessageNumberInterceptor() {
-                super(Phase.WRITE);
-                addBefore(SoapOutInterceptor.class.getName());
+                super(Phase.PRE_STREAM);
             }
             
             public void handleMessage(Message m) {
@@ -816,8 +814,7 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         
         class SequenceIdInterceptor extends AbstractPhaseInterceptor<Message> {
             public SequenceIdInterceptor() {
-                super(Phase.WRITE);
-                addBefore(SoapOutInterceptor.class.getName());
+                super(Phase.PRE_STREAM);
             }
             
             public void handleMessage(Message m) {


Mime
View raw message