cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1066985 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-specs/s...
Date Thu, 03 Feb 2011 21:30:32 GMT
Author: dkulp
Date: Thu Feb  3 21:30:31 2011
New Revision: 1066985

URL: http://svn.apache.org/viewvc?rev=1066985&view=rev
Log:
[CXF-3271] WS-RM code does not support InOrder assurances
Patch from Dennis Sosnoski applied
Modified patch to use continuations when available to not consume
threads.

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml   (with props)
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml   (with props)
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractRMInterceptor.java Thu Feb  3 21:30:31 2011
@@ -47,8 +47,12 @@ public abstract class AbstractRMIntercep
     private RMManager manager;
     private Bus bus;
     
+    protected AbstractRMInterceptor(String phase) {
+        super(phase);
+    }
+    
     protected AbstractRMInterceptor() {
-        super(Phase.PRE_LOGICAL);
+        this(Phase.PRE_LOGICAL);
     }
      
     public RMManager getManager() {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Thu Feb  3 21:30:31 2011
@@ -19,16 +19,26 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 
 public class Destination extends AbstractEndpoint {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(Destination.class);
 
     private Map<String, DestinationSequence> map;
 
@@ -91,12 +101,31 @@ public class Destination extends Abstrac
         DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
         if (null != seq) {
-            seq.applyDeliveryAssurance(sequenceType.getMessageNumber());
-            seq.acknowledge(message);
-
-            if (null != sequenceType.getLastMessage()) {
-                seq.setLastMessageNumber(sequenceType.getMessageNumber());
-                ackImmediately(seq, message);
+            if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+                seq.acknowledge(message);
+    
+                if (null != sequenceType.getLastMessage()) {
+                    seq.setLastMessageNumber(sequenceType.getMessageNumber());
+                    ackImmediately(seq, message);
+                }
+            } else {
+                try {
+                    message.getInterceptorChain().abort();
+                    Conduit conduit = message.getExchange().getDestination()
+                        .getBackChannel(message, null, null);
+                    if (conduit != null) {
+                        //for a one-way, the back channel could be
+                        //null if it knows it cannot send anything.
+                        Message partial = createMessage(message.getExchange());
+                        partial.remove(Message.CONTENT_TYPE);
+                        partial.setExchange(message.getExchange());
+                        conduit.prepare(partial);
+                        conduit.close(partial);
+                    }
+                } catch (IOException e) {
+                    LOG.log(Level.SEVERE, e.getMessage());
+                    throw new RMException(e);
+                }
             }
         } else {
             SequenceFaultFactory sff = new SequenceFaultFactory();
@@ -140,5 +169,28 @@ public class Destination extends Abstrac
             getReliableEndpoint().getProxy().acknowledge(seq);                    
         }
     }
+    
+    void processingComplete(Message message) {
+        SequenceType sequenceType = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+        if (null == sequenceType) {
+            return;
+        }
+        
+        DestinationSequence seq = getSequence(sequenceType.getIdentifier());
 
+        if (null != seq) {
+            seq.processingComplete(sequenceType.getMessageNumber());
+        }
+    }
+    
+    private static Message createMessage(Exchange exchange) {
+        Endpoint ep = exchange.get(Endpoint.class);
+        Message msg = null;
+        if (ep != null) {
+            msg = new MessageImpl();
+            msg.setExchange(exchange);
+            msg = ep.getBinding().createMessage(msg);
+        }
+        return msg;
+    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Feb  3 21:30:31 2011
@@ -22,12 +22,16 @@ package org.apache.cxf.ws.rm;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.TimerTask;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
@@ -51,6 +55,9 @@ public class DestinationSequence extends
     private List<DeferredAcknowledgment> deferredAcknowledgments;
     private SequenceTermination scheduledTermination;
     private String correlationID;
+    private BigInteger inProcessNumber;
+    private BigInteger highNumberCompleted = BigInteger.ZERO;
+    private List<Continuation> continuations = new LinkedList<Continuation>();
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d) {
         this(i, a, null, null);
@@ -141,7 +148,7 @@ public class DestinationSequence extends
                 acknowledgement.getAcknowledgementRange().add(i, range);
             }
             mergeRanges();
-            notifyAll();
+            wakeupAll();
         }
         
         purgeAcknowledged(messageNumber);
@@ -208,45 +215,102 @@ public class DestinationSequence extends
         // can be included in a HTTP response
         return getAcksTo().getAddress().getValue().equals(RMConstants.getAnonymousAddress());
     }
-       
+    
     /**
      * Ensures that the delivery assurance is honored, e.g. by throwing an 
      * exception if the message had already been delivered and the delivery
      * assurance is AtMostOnce.
-     * This method blocks in case the delivery assurance is 
-     * InOrder and and not all messages with lower message numbers have been 
-     * delivered.
+     * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this
+     * queues out-of-order messages for processing after the missing messages have been received.
      * 
-     * @param s the SequenceType object including identifier and message number
+     * @param mn message number
+     * @return <code>true</code> if message processing to continue, <code>false</code> if to be dropped
      * @throws Fault if message had already been acknowledged
      */
-    void applyDeliveryAssurance(BigInteger mn) throws RMException {
+    boolean applyDeliveryAssurance(BigInteger mn, Message message) throws RMException {
+        Continuation cont = getContinuation(message);
         DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
-        if (da.isSetAtMostOnce() && isAcknowledged(mn)) {            
+        if (cont != null && da.isSetInOrder() && !cont.isNew()) {
+            return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+                               message, cont);
+        }
+        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn)) {            
             org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
                 "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
-            LOG.log(Level.SEVERE, msg.toString());
+            LOG.log(Level.INFO, msg.toString());
             throw new RMException(msg);
         } 
-        if (da.isSetInOrder() && da.isSetAtLeastOnce()) {
-            synchronized (this) {
-                boolean ok = allPredecessorsAcknowledged(mn);
-                while (!ok) {
-                    try {
-                        wait();                        
-                        ok = allPredecessorsAcknowledged(mn);
-                    } catch (InterruptedException ie) {
-                        // ignore
-                    }
+        if (da.isSetInOrder()) {
+            return waitInQueue(mn, !(da.isSetAtLeastOnce() || da.isSetExactlyOnce()),
+                               message, cont);
+        }
+        return true;
+    }
+    
+    private Continuation getContinuation(Message message) {
+        if (message == null) {
+            return null;
+        }
+        return message.get(Continuation.class);
+    }
+    
+    synchronized boolean waitInQueue(BigInteger mn, boolean canSkip,
+                                     Message message, Continuation continuation) {
+        while (true) {
+            
+            // can process now if no other in process and this one is next
+            if (inProcessNumber == null) {
+                BigInteger diff = mn.subtract(highNumberCompleted);
+                if (BigInteger.ONE.equals(diff) || (canSkip && diff.signum() > 0)) {
+                    inProcessNumber = mn;
+                    return true;
+                }
+            }
+            
+            // can abort now if same message in process or already processed
+            BigInteger compare = inProcessNumber == null ? highNumberCompleted : inProcessNumber;
+            if (compare.compareTo(mn) >= 0) {
+                return false;
+            }
+            if (continuation == null) {
+                ContinuationProvider p = message.get(ContinuationProvider.class);
+                if (p != null) {
+                    boolean isOneWay = message.getExchange().isOneWay();
+                    message.getExchange().setOneWay(false);
+                    continuation = p.getContinuation();
+                    message.getExchange().setOneWay(isOneWay);
+                    message.put(Continuation.class, continuation);
+                }
+            }
+
+            if (continuation != null) {
+                continuation.setObject(message);
+                if (continuation.suspend(-1)) {
+                    continuations.add(continuation);
+                    throw new SuspendedInvocationException();
                 }
             }
+            try {
+                //if we get here, there isn't a continuation available
+                //so we need to block/wait
+                wait();                        
+            } catch (InterruptedException ie) {
+                // ignore
+            }
         }
     }
+    synchronized void wakeupAll() {
+        while (!continuations.isEmpty()) {
+            Continuation c = continuations.remove(0);
+            c.resume();
+        }
+        notifyAll();
+    }
     
-    synchronized boolean allPredecessorsAcknowledged(BigInteger mn) {
-        return acknowledgement.getAcknowledgementRange().size() == 1
-            && acknowledgement.getAcknowledgementRange().get(0).getLower().equals(BigInteger.ONE)
-            && acknowledgement.getAcknowledgementRange().get(0).getUpper().subtract(mn).signum() >= 0;
+    synchronized void processingComplete(BigInteger mn) {
+        inProcessNumber = null;
+        highNumberCompleted = mn;
+        wakeupAll();
     }
     
     void purgeAcknowledged(BigInteger messageNr) {

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Thu Feb  3 21:30:31 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.Phase;
+
+/**
+ * Interceptor used for InOrder delivery of messages to the destination. This works with
+ * {@link DestinationSequence} to allow only one message at a time from a particular sequence through to the
+ * destination (since otherwise there is no way to enforce in-order delivery).
+ */
+public class RMDeliveryInterceptor extends AbstractRMInterceptor<Message> {
+    
+    private static final Logger LOG = LogUtils.getL7dLogger(RMDeliveryInterceptor.class);
+  
+    public RMDeliveryInterceptor() {
+        super(Phase.POST_INVOKE);
+    }
+    
+    // Interceptor interface 
+    
+    public void handle(Message message) throws SequenceFault, RMException {
+        LOG.entering(getClass().getName(), "handleMessage");
+        getManager().getDestination(message).processingComplete(message);
+    }
+}

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java Thu Feb  3 21:30:31 2011
@@ -23,6 +23,7 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.common.injection.NoJSR250Annotations;
 import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMOutInterceptor;
@@ -47,6 +48,7 @@ public class RMFeature extends AbstractF
 
     private RMInInterceptor rmLogicalIn = new RMInInterceptor();
     private RMOutInterceptor rmLogicalOut = new RMOutInterceptor();
+    private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
     private RMSoapInterceptor rmCodec = new RMSoapInterceptor();
 
     public void setDeliveryAssurance(DeliveryAssuranceType da) {
@@ -91,15 +93,18 @@ public class RMFeature extends AbstractF
 
         rmLogicalIn.setBus(bus);
         rmLogicalOut.setBus(bus);
+        rmDelivery.setBus(bus);
 
         provider.getInInterceptors().add(rmLogicalIn);
         provider.getInInterceptors().add(rmCodec);
+        provider.getInInterceptors().add(rmDelivery);
 
         provider.getOutInterceptors().add(rmLogicalOut);
         provider.getOutInterceptors().add(rmCodec);
 
         provider.getInFaultInterceptors().add(rmLogicalIn);
         provider.getInFaultInterceptors().add(rmCodec);
+        provider.getInInterceptors().add(rmDelivery);
 
         provider.getOutFaultInterceptors().add(rmLogicalOut);
         provider.getOutFaultInterceptors().add(rmCodec);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/RMPolicyInterceptorProvider.java Thu Feb  3 21:30:31 2011
@@ -25,6 +25,7 @@ import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.ws.policy.AbstractPolicyInterceptorProvider;
+import org.apache.cxf.ws.rm.RMDeliveryInterceptor;
 import org.apache.cxf.ws.rm.RMInInterceptor;
 import org.apache.cxf.ws.rm.RMOutInterceptor;
 import org.apache.cxf.ws.rm.soap.RMSoapInterceptor;
@@ -35,6 +36,7 @@ public class RMPolicyInterceptorProvider
     private RMInInterceptor rmIn = new RMInInterceptor();
     private RMOutInterceptor rmOut = new RMOutInterceptor();
     private RMSoapInterceptor rmSoap = new RMSoapInterceptor();
+    private RMDeliveryInterceptor rmDelivery = new RMDeliveryInterceptor();
 
     static {
         Collection<QName> types = new ArrayList<QName>();
@@ -46,15 +48,18 @@ public class RMPolicyInterceptorProvider
         super(ASSERTION_TYPES);
         rmIn.setBus(bus);
         rmOut.setBus(bus);
+        rmDelivery.setBus(bus);
         
         getInInterceptors().add(rmIn);
         getInInterceptors().add(rmSoap);
+        getInInterceptors().add(rmDelivery);
 
         getOutInterceptors().add(rmOut);
         getOutInterceptors().add(rmSoap);
 
         getInFaultInterceptors().add(rmIn);
         getInFaultInterceptors().add(rmSoap);
+        getInInterceptors().add(rmDelivery);
 
         getOutFaultInterceptors().add(rmOut);
         getOutFaultInterceptors().add(rmSoap);

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Thu Feb  3 21:30:31 2011
@@ -462,7 +462,7 @@ public class DestinationSequenceTest ext
         control.replay();        
         DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
         ds.setDestination(destination);
-        ds.applyDeliveryAssurance(mn);
+        ds.applyDeliveryAssurance(mn, null);
         control.verify();
         
         control.reset();
@@ -475,7 +475,7 @@ public class DestinationSequenceTest ext
         EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
         control.replay();     
         try {
-            ds.applyDeliveryAssurance(mn);
+            ds.applyDeliveryAssurance(mn, null);
             fail("Expected Fault not thrown.");
         } catch (RMException ex) {
             assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
@@ -486,34 +486,6 @@ public class DestinationSequenceTest ext
     }
     
     @Test
-    public void testInOrderNoWait() throws RMException {
-        setUpDestination();
-
-        BigInteger mn = BigInteger.TEN;
-        
-        DeliveryAssuranceType da = control.createMock(DeliveryAssuranceType.class);
-        EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da);
-        EasyMock.expect(da.isSetAtMostOnce()).andReturn(false);
-        EasyMock.expect(da.isSetAtLeastOnce()).andReturn(true);
-        EasyMock.expect(da.isSetInOrder()).andReturn(true); 
-        
-        SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
-        List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
-        AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
-        ranges.add(r);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(new BigInteger("15"));
-        
-        control.replay(); 
-        
-        DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
-        ds.setDestination(destination);
-        ds.applyDeliveryAssurance(mn);
-        control.verify();
-    }
-    
-    @Test
     public void testInOrderWait() {
         setUpDestination();
         Message[] messages = new Message[5];
@@ -549,7 +521,7 @@ public class DestinationSequenceTest ext
             public void run() {
                 try {
                     ds.acknowledge(message);
-                    ds.applyDeliveryAssurance(messageNr);
+                    ds.applyDeliveryAssurance(messageNr, message);
                 } catch (Exception ex) {
                     // ignore
                 }
@@ -583,51 +555,6 @@ public class DestinationSequenceTest ext
     }
     
     @Test
-    public void testAllPredecessorsAcknowledged() {
-
-        SequenceAcknowledgement ack = control.createMock(SequenceAcknowledgement.class);
-        List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
-        AcknowledgementRange r = control.createMock(AcknowledgementRange.class);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
-        control.replay();
-        DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
-        ds.setDestination(destination);
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        ranges.add(r);
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(2);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.TEN);
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(new BigInteger("5"));
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges).times(3);
-        EasyMock.expect(r.getLower()).andReturn(BigInteger.ONE);
-        EasyMock.expect(r.getUpper()).andReturn(BigInteger.TEN);
-        control.replay();
-        assertTrue("not all predecessors acknowledged", ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-        
-        ranges.add(r);
-        control.reset();
-        EasyMock.expect(ack.getAcknowledgementRange()).andReturn(ranges);
-        control.replay();
-        assertTrue("all predecessors acknowledged", !ds.allPredecessorsAcknowledged(BigInteger.TEN));
-        control.verify();
-    }
-    
-    @Test
     public void testScheduleSequenceTermination() throws SequenceFault {
         Timer timer = new Timer();
         setUpDestination(timer);

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java Thu Feb  3 21:30:31 2011
@@ -146,7 +146,7 @@ public class DestinationTest extends Ass
         BigInteger nr = BigInteger.TEN;
         EasyMock.expect(st.getMessageNumber()).andReturn(nr);  
         RMException ex = new RMException(new RuntimeException("already acknowledged"));
-        ds.applyDeliveryAssurance(nr);
+        ds.applyDeliveryAssurance(nr, message);
         EasyMock.expectLastCall().andThrow(ex);
         control.replay();
         try {
@@ -177,8 +177,8 @@ public class DestinationTest extends Ass
         DestinationSequence ds = control.createMock(DestinationSequence.class);
         EasyMock.expect(destination.getSequence(id)).andReturn(ds);
         
-        ds.applyDeliveryAssurance(nr);
-        EasyMock.expectLastCall();
+        ds.applyDeliveryAssurance(nr, message);
+        EasyMock.expectLastCall().andReturn(Boolean.TRUE);
         ds.acknowledge(message);
         EasyMock.expectLastCall();
         SequenceType.LastMessage lm = control.createMock(SequenceType.LastMessage.class);

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java Thu Feb  3 21:30:31 2011
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.io.StringReader;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+import javax.jws.WebService;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Provider;
+import javax.xml.ws.Service.Mode;
+import javax.xml.ws.ServiceMode;
+import javax.xml.xpath.XPathConstants;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.helpers.XMLUtils;
+import org.apache.cxf.helpers.XPathUtils;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.test.TestUtilities;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Tests the operation of InOrder delivery assurance for one-way messages to the server.
+ */
+public class DeliveryAssuranceOnewayTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(DeliveryAssuranceOnewayTest.class);
+    private static final String GREETER_ADDRESS 
+        = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+
+    private static final Logger LOG = LogUtils.getLogger(DeliveryAssuranceOnewayTest.class);
+
+    private Bus serverBus;
+    private Endpoint endpoint;
+    private Bus greeterBus;
+    private Greeter greeter;
+
+
+    @BeforeClass
+    public static void setProps() throws Exception {
+        TestUtilities.setKeepAliveSystemProperty(false);
+    }
+    
+    @AfterClass
+    public static void cleanup() {
+        TestUtilities.recoverKeepAliveSystemProperty();
+    }
+            
+    
+    @After
+    public void tearDown() throws Exception {
+        try {
+            stopClient();
+        } catch (Throwable t) {
+            //ignore
+        }
+        try {
+            stopServer();
+        } catch (Throwable t) {
+            //ignore
+        }
+        Thread.sleep(100);
+    }
+
+/*    @Test    
+    public void testAtLeastOnce() throws Exception {
+        testOnewayAtLeastOnce(null);
+    }
+    
+    @Test    
+    public void testAtLeastOnceAsyncExecutor() throws Exception {
+        testOnewayAtLeastOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayAtLeastOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/atleastonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length + 2, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertTrue("Too few messages", callArgs.length <= actualArgs.size());
+        for (int i = 0; i < callArgs.length; i++) {
+            boolean match = false;
+            for (int j = 0; j < actualArgs.size(); j++) {
+                if (actualArgs.get(j).equals(callArgs[i])) {
+                    match = true;
+                    break;
+                }
+            }
+            if (!match) {
+                fail("No match for request " + callArgs[i]);
+            }
+        }
+        
+    }
+
+    @Test    
+    public void testAtMostOnce() throws Exception {
+        testOnewayAtMostOnce(null);
+    }
+    
+    @Test    
+    public void testAtMostOnceAsyncExecutor() throws Exception {
+        testOnewayAtMostOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayAtMostOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/atmostonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertTrue("Too many messages", callArgs.length >= actualArgs.size());
+        for (int i = 0; i < actualArgs.size() - 1; i++) {
+            for (int j = i + 1; j < actualArgs.size(); j++) {
+                if (actualArgs.get(j).equals(actualArgs.get(i))) {
+                    fail("Message received more than once " + callArgs[i]);
+                }
+            }
+        }
+        
+    }
+
+    @Test    
+    public void testExactlyOnce() throws Exception {
+        testOnewayExactlyOnce(null);
+    }
+    
+    @Test    
+    public void testExactlyOnceAsyncExecutor() throws Exception {
+        testOnewayExactlyOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayExactlyOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/exactlyonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertEquals("Wrong message count", callArgs.length, actualArgs.size());
+        for (int i = 0; i < callArgs.length; i++) {
+            boolean match = false;
+            for (int j = 0; j < actualArgs.size(); j++) {
+                if (actualArgs.get(j).equals(callArgs[i])) {
+                    match = true;
+                    break;
+                }
+            }
+            if (!match) {
+                fail("No match for request " + callArgs[i]);
+            }
+        }
+        
+    }*/
+
+    @Test    
+    public void testInOrder() throws Exception {
+        testOnewayInOrder(null);
+    }
+    
+    @Test    
+    public void testInOrderAsyncExecutor() throws Exception {
+        testOnewayInOrder(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayInOrder(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/inorder.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length - 2, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        int argNum = 0;
+        for (String actual : actualArgs) {
+            while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+                argNum++;
+            }
+            assertTrue("Message out of order", argNum < callArgs.length);
+        }
+    }
+
+    @Test    
+    public void testAtMostOnceInOrder() throws Exception {
+        testOnewayAtMostOnceInOrder(null);
+    }
+    
+    @Test    
+    public void testAtMostOnceInOrderAsyncExecutor() throws Exception {
+        testOnewayAtMostOnceInOrder(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayAtMostOnceInOrder(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length - 2, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertTrue("Too many messages", callArgs.length >= actualArgs.size());
+        int argNum = 0;
+        for (String actual : actualArgs) {
+            while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+                argNum++;
+            }
+            assertTrue("Message out of order", argNum < callArgs.length);
+        }
+    }
+
+    @Test    
+    public void testExactlyOnceInOrder() throws Exception {
+        testOnewayExactlyOnceInOrder(null);
+    }
+    
+    @Test    
+    public void testExactlyOnceInOrderAsyncExecutor() throws Exception {
+        testOnewayExactlyOnceInOrder(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayExactlyOnceInOrder(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getRMAssertion().getBaseRetransmissionInterval().setMilliseconds(new BigInteger("2000"));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        awaitMessages(callArgs.length, 3000, 60000);
+        List<String> actualArgs = GreeterProvider.CALL_ARGS;
+        assertEquals("Wrong number of messages", callArgs.length, actualArgs.size());
+        int argNum = 0;
+        for (String actual : actualArgs) {
+            while (argNum < callArgs.length && !actual.equals(callArgs[argNum])) {
+                argNum++;
+            }
+            assertTrue("Message out of order", argNum < callArgs.length);
+        }
+    }
+
+    // --- test utilities ---
+
+    private void init(String cfgResource, Executor executor) {
+        
+        SpringBusFactory bf = new SpringBusFactory();
+        initServer(bf, cfgResource);
+        initGreeterBus(bf, cfgResource);
+        initProxy(executor);
+    }
+    
+    private void initServer(SpringBusFactory bf, String cfgResource) {
+        String derbyHome = System.getProperty("derby.system.home"); 
+        try {
+            synchronized (GreeterProvider.CALL_ARGS) {
+                GreeterProvider.CALL_ARGS.clear();
+            }
+            System.setProperty("derby.system.home", derbyHome + "-server");   
+            serverBus = bf.createBus(cfgResource);
+            BusFactory.setDefaultBus(serverBus);
+            LOG.info("Initialised bus " + serverBus + " with cfg file resource: " + cfgResource);
+            LOG.info("serverBus inInterceptors: " + serverBus.getInInterceptors());
+            endpoint = Endpoint.publish(GREETER_ADDRESS, new GreeterProvider());
+        } finally {
+            if (derbyHome != null) {
+                System.setProperty("derby.system.home", derbyHome);
+            } else {
+                System.clearProperty("derby.system.home");
+            }
+        }
+    }
+    
+    private void initGreeterBus(SpringBusFactory bf,
+                                String cfgResource) {
+        greeterBus = bf.createBus(cfgResource);
+        BusFactory.setDefaultBus(greeterBus);
+        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
+    }
+
+    private void initProxy(Executor executor) {        
+        GreeterService gs = new GreeterService();
+
+        if (null != executor) {
+            gs.setExecutor(executor);
+        }
+   
+        greeter = gs.getGreeterPort();
+        try {
+            updateAddressPort(greeter, PORT);
+        } catch (Exception e) {
+            //ignore
+        }
+        LOG.fine("Created greeter client.");
+
+        ConnectionHelper.setKeepAliveConnection(greeter, true);
+    }
+    
+    private void stopClient() {
+        if (null != greeterBus) {
+            
+            //ensure we close the decoupled destination of the conduit,
+            //so that release the port if the destination reference count hit zero
+            if (greeter != null) {
+                ClientProxy.getClient(greeter).getConduit().close();
+            }
+            greeterBus.shutdown(true);
+            greeter = null;
+            greeterBus = null;
+        }
+    }
+    
+    private void stopServer() {
+        if (null != endpoint) {
+            LOG.info("Stopping Greeter endpoint");
+            endpoint.stop();
+        } else {
+            LOG.info("No endpoint active.");
+        }
+        endpoint = null;
+        if (null != serverBus) {
+            serverBus.shutdown(true);
+            serverBus = null;
+        }
+    }
+    
+    /**
+     * @param nExpectedIn number of messages to wait for
+     * @param delay added delay before return (in case more are coming)
+     * @param timeout maximum time to wait for expected messages
+     */
+    private void awaitMessages(int nExpectedIn, int delay, int timeout) {
+        int waited = 0;
+        int nIn = 0;
+        while (waited <= timeout) {                
+            synchronized (GreeterProvider.CALL_ARGS) {
+                nIn = GreeterProvider.CALL_ARGS.size();
+            }
+            if (nIn >= nExpectedIn) {
+                break;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ex) {
+                // ignore
+            }
+            waited += 100;
+        }
+        try {
+            Thread.sleep(delay);
+        } catch (InterruptedException ex) {
+            // ignore
+        }
+    }
+
+    @WebService(serviceName = "GreeterService",
+                portName = "GreeterPort",
+                targetNamespace = "http://cxf.apache.org/greeter_control",
+                wsdlLocation = "/wsdl/greeter_control.wsdl")
+    @ServiceMode(Mode.PAYLOAD)
+    public static class GreeterProvider implements Provider<Source> {
+        
+        public static final List<String> CALL_ARGS = new ArrayList<String>();
+
+        public Source invoke(Source obj) {
+
+            Node el;
+            try {
+                el = XMLUtils.fromSource(obj);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            if (el instanceof Document) {
+                el = ((Document)el).getDocumentElement();
+            }
+            
+            Map<String, String> ns = new HashMap<String, String>();
+            ns.put("ns", "http://cxf.apache.org/greeter_control/types");
+            XPathUtils xp = new XPathUtils(ns);
+            String s = (String)xp.getValue("/ns:greetMe/ns:requestType",
+                                           el,
+                                           XPathConstants.STRING);
+
+            if (s == null || "".equals(s)) {
+                s = (String)xp.getValue("/ns:greetMeOneWay/ns:requestType",
+                                        el,
+                                        XPathConstants.STRING);
+                synchronized (CALL_ARGS) {
+                    CALL_ARGS.add(s);
+                }
+                return null;
+            } else {
+                synchronized (CALL_ARGS) {
+                    CALL_ARGS.add(s);
+                }
+                String resp =
+                    "<greetMeResponse "
+                        + "xmlns=\"http://cxf.apache.org/greeter_control/types\">"
+                        + "<responseType>" + s.toUpperCase() + "</responseType>"
+                    + "</greetMeResponse>";
+                return new StreamSource(new StringReader(resp));
+            }
+        }
+    }    
+}

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/DeliveryAssuranceOnewayTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Feb  3 21:30:31 2011
@@ -1104,7 +1104,8 @@ public class SequenceTest extends Abstra
             OutMessageRecorder outRecorder;  
             String id;
             
-            ClientThread(SpringBusFactory bf, String cfgResource, int n) { 
+            ClientThread(SpringBusFactory bf, String cfgResource, int n) {
+                super("client " + n);
                 SequenceTest.this.initGreeter(bf, cfgResource, true, null);
                 greeter = SequenceTest.this.greeter;
                 greeterBus = SequenceTest.this.greeterBus;
@@ -1114,9 +1115,18 @@ public class SequenceTest extends Abstra
             }
             
             public void run() {
-                greeter.greetMe(id + ": a");
-                greeter.greetMe(id + ": b");
-                greeter.greetMe(id + ": c");
+                String s = greeter.greetMe(id + ": a").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
+                s = greeter.greetMe(id + ": b").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
+                s = greeter.greetMe(id + ": c").toLowerCase();
+                if (!s.contains(id)) {
+                    System.out.println("Correlation problem <" + s + ">  <" + id + ">");
+                }
 
                 // three application messages plus createSequence
 
@@ -1134,9 +1144,10 @@ public class SequenceTest extends Abstra
             for (int i = 0; i < clients.length; i++) {
                 clients[i].start();
             }
-
             for (int i = 0; i < clients.length; i++) {
                 clients[i].join();
+            }
+            for (int i = 0; i < clients.length; i++) {
                 MessageFlow mf = new MessageFlow(clients[i].outRecorder.getOutboundMessages(), 
                                                  clients[i].inRecorder.getInboundMessages());
                                 

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml Thu Feb  3 21:30:31 2011
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one
+	or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information
+	regarding copyright ownership. The ASF licenses this file
+	to you under the Apache License, Version 2.0 (the
+	"License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at
+	
+	http://www.apache.org/licenses/LICENSE-2.0
+	
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied. See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+	xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+	xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+	<import resource="rminterceptors.xml" />
+
+	<wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+        <wsrm-policy:RMAssertion>
+			<wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+			<wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+		</wsrm-policy:RMAssertion>
+		<wsrm-mgr:deliveryAssurance>
+			<wsrm-mgr:AtLeastOnce />
+		</wsrm-mgr:deliveryAssurance>
+		<wsrm-mgr:destinationPolicy>
+			<wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+		</wsrm-mgr:destinationPolicy>
+	</wsrm-mgr:rmManager>
+
+
+</beans>
\ No newline at end of file

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atleastonce.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml Thu Feb  3 21:30:31 2011
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one
+	or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information
+	regarding copyright ownership. The ASF licenses this file
+	to you under the Apache License, Version 2.0 (the
+	"License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at
+	
+	http://www.apache.org/licenses/LICENSE-2.0
+	
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied. See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+	xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+	xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+	<import resource="rminterceptors.xml" />
+
+	<wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+        <wsrm-policy:RMAssertion>
+			<wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+			<wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+		</wsrm-policy:RMAssertion>
+		<wsrm-mgr:deliveryAssurance>
+			<wsrm-mgr:AtMostOnce />
+			<wsrm-mgr:InOrder />
+		</wsrm-mgr:deliveryAssurance>
+		<wsrm-mgr:destinationPolicy>
+			<wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+		</wsrm-mgr:destinationPolicy>
+	</wsrm-mgr:rmManager>
+
+
+</beans>

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/atmostonce-inorder.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml Thu Feb  3 21:30:31 2011
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one
+	or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information
+	regarding copyright ownership. The ASF licenses this file
+	to you under the Apache License, Version 2.0 (the
+	"License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at
+	
+	http://www.apache.org/licenses/LICENSE-2.0
+	
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied. See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+	xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+	xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+	<import resource="rminterceptors.xml" />
+
+	<wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+        <wsrm-policy:RMAssertion>
+			<wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+			<wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+		</wsrm-policy:RMAssertion>
+		<wsrm-mgr:deliveryAssurance>
+			<wsrm-mgr:ExactlyOnce />
+			<wsrm-mgr:InOrder />
+		</wsrm-mgr:deliveryAssurance>
+		<wsrm-mgr:destinationPolicy>
+			<wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+		</wsrm-mgr:destinationPolicy>
+	</wsrm-mgr:rmManager>
+
+
+</beans>

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml?rev=1066985&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml (added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml Thu Feb  3 21:30:31 2011
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one
+	or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information
+	regarding copyright ownership. The ASF licenses this file
+	to you under the Apache License, Version 2.0 (the
+	"License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at
+	
+	http://www.apache.org/licenses/LICENSE-2.0
+	
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied. See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:wsrm-mgr="http://cxf.apache.org/ws/rm/manager"
+	xmlns:wsrm-policy="http://schemas.xmlsoap.org/ws/2005/02/rm/policy"
+	xsi:schemaLocation="
+http://schemas.xmlsoap.org/ws/2005/02/rm/policy http://schemas.xmlsoap.org/ws/2005/02/rm/wsrm-policy.xsd
+http://cxf.apache.org/ws/rm/manager http://cxf.apache.org/schemas/configuration/wsrm-manager.xsd
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+	<import resource="rminterceptors.xml" />
+
+	<wsrm-mgr:rmManager id="org.apache.cxf.ws.rm.RMManager">
+        <wsrm-policy:RMAssertion>
+			<wsrm-policy:BaseRetransmissionInterval Milliseconds="60000" />
+			<wsrm-policy:AcknowledgementInterval Milliseconds="10000" />
+		</wsrm-policy:RMAssertion>
+		<wsrm-mgr:deliveryAssurance>
+			<wsrm-mgr:InOrder />
+		</wsrm-mgr:deliveryAssurance>
+		<wsrm-mgr:destinationPolicy>
+			<wsrm-mgr:acksPolicy intraMessageThreshold="0" />
+		</wsrm-mgr:destinationPolicy>
+	</wsrm-mgr:rmManager>
+
+
+</beans>

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/inorder.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml Thu Feb  3 21:30:31 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
         <property name="bus" ref="cxf"/>
     </bean>
     <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+    <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
 
     <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
 
@@ -41,7 +44,8 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+                <ref bean="rmDelivery"/>
+                <!-- bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
             </list>
         </property>
         <property name="inFaultInterceptors">
@@ -50,7 +54,8 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
+                <ref bean="rmDelivery"/>
+                <!--bean class="org.apache.cxf.interceptor.LoggingInInterceptor" /-->
             </list>
         </property>
         <property name="outInterceptors">
@@ -68,7 +73,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalOut"/>
                 <ref bean="rmCodec"/>
-                <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" />
+                <!-- bean class="org.apache.cxf.interceptor.LoggingOutInterceptor" /-->
             </list>
         </property>
     </bean>

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors_provider.xml Thu Feb  3 21:30:31 2011
@@ -31,6 +31,9 @@ http://www.springframework.org/schema/be
         <property name="bus" ref="cxf"/>
     </bean>
     <bean id="rmCodec" class="org.apache.cxf.ws.rm.soap.RMSoapInterceptor"/>
+    <bean id="rmDelivery" class="org.apache.cxf.ws.rm.RMDeliveryInterceptor">
+        <property name="bus" ref="cxf"/>
+    </bean>
 
     <!-- We are adding the interceptors to the bus as we will have only one endpoint/service/bus. -->
 
@@ -41,6 +44,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
+                <ref bean="rmDelivery"/>
                 <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
             </list>
         </property>
@@ -50,6 +54,7 @@ http://www.springframework.org/schema/be
                 <ref bean="mapCodec"/>
                 <ref bean="rmLogicalIn"/>
                 <ref bean="rmCodec"/>
+                <ref bean="rmDelivery"/>
                 <bean class="org.apache.cxf.interceptor.LoggingInInterceptor" />
             </list>
         </property>

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/InMessageRecorder.java Thu Feb  3 21:30:31 2011
@@ -55,7 +55,8 @@ public class InMessageRecorder extends A
             IOUtils.copy(is, bos);
             is.close();
             bos.close();
-            inbound.add(bos.toByteArray());
+            byte bytes[] = bos.toByteArray();
+            inbound.add(bytes);
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound: " + bos.toString());
             }

Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java?rev=1066985&r1=1066984&r2=1066985&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/util/MessageFlow.java Thu Feb  3 21:30:31 2011
@@ -24,8 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -33,6 +31,7 @@ import org.w3c.dom.Node;
 
 import junit.framework.Assert;
 
+import org.apache.cxf.staxutils.StaxUtils;
 import org.apache.cxf.ws.rm.RMConstants;
 
 
@@ -64,21 +63,18 @@ public class MessageFlow extends Assert 
             out.remove(0);
         }
         outStreams = out;
-        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-        factory.setNamespaceAware(true);
-        DocumentBuilder parser = factory.newDocumentBuilder();
         inboundMessages.clear();
         for (int i = 0; i < inStreams.size(); i++) {
             byte[] bytes = inStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);
-            Document document = parser.parse(is);
+            Document document = StaxUtils.read(is);
             inboundMessages.add(document);
         }
         outboundMessages.clear();
         for (int i = 0; i < outStreams.size(); i++) {
             byte[] bytes = outStreams.get(i);
             ByteArrayInputStream is = new ByteArrayInputStream(bytes);
-            Document document = parser.parse(is);
+            Document document = StaxUtils.read(is);
             outboundMessages.add(document);
         }
     }



Mime
View raw message