cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject [4/5] git commit: [CXF-3272] Return acknowledgement rather than Fault when duplicate message received.
Date Mon, 23 Jun 2014 01:33:11 GMT
[CXF-3272] Return acknowledgement rather than Fault when duplicate
message received.

Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c613aa49
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c613aa49
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c613aa49

Branch: refs/heads/master
Commit: c613aa4919e81a3027d0dfbbf3c4ee0a1c6ce348
Parents: 0494f76
Author: dsosnoski <dsosnoski@apache.org>
Authored: Mon Jun 23 11:52:03 2014 +1200
Committer: dsosnoski <dsosnoski@apache.org>
Committed: Mon Jun 23 11:52:03 2014 +1200

----------------------------------------------------------------------
 .../java/org/apache/cxf/ws/rm/Destination.java  |  22 +-
 .../apache/cxf/ws/rm/DestinationSequence.java   |  31 ++-
 .../apache/cxf/ws/rm/InternalContextUtils.java  | 275 +++++++++++++++++++
 .../org/apache/cxf/ws/rm/RMOutInterceptor.java  |   4 +-
 .../cxf/ws/rm/DestinationSequenceTest.java      |   8 +-
 .../org/apache/cxf/ws/rm/DestinationTest.java   |  32 +--
 .../apache/cxf/systest/ws/rm/SequenceTest.java  |  42 +--
 .../apache/cxf/systest/ws/rm/rminterceptors.xml |   8 +-
 .../systest/ws/policy/RM12PolicyWsdlTest.java   |   2 +-
 9 files changed, 357 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
index 378c9b2..77c9c49 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
@@ -126,16 +126,24 @@ public class Destination extends AbstractEndpoint {
             } else {
                 try {
                     message.getInterceptorChain().abort();
-                    Conduit conduit = message.getExchange().getDestination()
-                        .getBackChannel(message);
+                    if (seq.sendAcknowledgement()) {
+                        ackImmediately(seq, message);
+                    }
+                    Exchange exchange = message.getExchange();
+                    Conduit conduit = exchange.getDestination().getBackChannel(message);
                     if (conduit != null) {
                         //for a one-way, the back channel could be
                         //null if it knows it cannot send anything.
-                        Message partial = createMessage(message.getExchange());
-                        partial.remove(Message.CONTENT_TYPE);
-                        partial.setExchange(message.getExchange());
-                        conduit.prepare(partial);
-                        conduit.close(partial);
+                        if (seq.sendAcknowledgement()) {
+                            AddressingProperties maps = RMContextUtils.retrieveMAPs(message,
false, false);
+                            InternalContextUtils.rebaseResponse(null, maps, message);
+                        } else {
+                            Message response = createMessage(exchange);
+                            response.setExchange(exchange);
+                            response.remove(Message.CONTENT_TYPE);
+                            conduit.prepare(response);
+                            conduit.close(response);
+                        }
                     }
                 } catch (IOException e) {
                     LOG.log(Level.SEVERE, e.getMessage());

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index d0aef1d..36f44d6 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -59,6 +59,7 @@ public class DestinationSequence extends AbstractSequence {
     private String correlationID;
     private volatile long inProcessNumber;
     private volatile long highNumberCompleted;
+    private long nextInOrder;
     private List<Continuation> continuations = new LinkedList<Continuation>();
     private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
@@ -223,23 +224,21 @@ public class DestinationSequence extends AbstractSequence {
     }
     
     /**
-     * Ensures that the delivery assurance is honored, e.g. by throwing an 
-     * exception if the message had already been delivered and the delivery
-     * assurance is AtMostOnce.
+     * Ensures that the delivery assurance is honored.
      * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with
InOrder, this
      * queues out-of-order messages for processing after the missing messages have been received.
      * 
      * @param mn message number
      * @return <code>true</code> if message processing to continue, <code>false</code>
if to be dropped
-     * @throws RMException if message had already been acknowledged
      */
-    boolean applyDeliveryAssurance(long mn, Message message) throws RMException {
+    boolean applyDeliveryAssurance(long mn, Message message) {
         Continuation cont = getContinuation(message);
         RMConfiguration config = destination.getReliableEndpoint().getConfiguration();
         DeliveryAssurance da = config.getDeliveryAssurance();
         boolean canSkip = da != DeliveryAssurance.AT_LEAST_ONCE && da != DeliveryAssurance.EXACTLY_ONCE;
         boolean robust = false;
         boolean robustDelivering = false;
+        boolean inOrder = mn - nextInOrder == 1;
         if (message != null) {
             robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
             if (robust) {
@@ -250,22 +249,30 @@ public class DestinationSequence extends AbstractSequence {
         if (robust && !robustDelivering) {
             // no check performed if in robust and not in delivering
             removeDeliveringMessageNumber(mn);
+            if (inOrder) {
+                nextInOrder++;
+            }
             return true;
         }
+        if (inOrder) {
+            nextInOrder++;
+        } else {
+            
+            // message out of order, schedule acknowledgement to update sender
+            scheduleImmediateAcknowledgement();
+            if (nextInOrder < mn) {
+                nextInOrder = mn + 1;
+            }
+        }
         if (cont != null && config.isInOrder() && !cont.isNew()) {
             return waitInQueue(mn, canSkip, message, cont);
         }
         if ((da == DeliveryAssurance.EXACTLY_ONCE || da == DeliveryAssurance.AT_MOST_ONCE)

-            && (isAcknowledged(mn) 
-                || (robustDelivering && deliveringMessageNumbers.contains(mn))))
{            
-            
-            // acknowledge at first opportunity following duplicate message
-            scheduleImmediateAcknowledgement();
+            && (isAcknowledged(mn) || (robustDelivering && deliveringMessageNumbers.contains(mn))))
{
             org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
                 "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue());
             LOG.log(Level.INFO, msg.toString());
-            throw new RMException(msg);
-            
+            return false;
         } 
         if (robustDelivering) {
             deliveringMessageNumbers.add(mn);

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
new file mode 100644
index 0000000..eac5662
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.ConduitSelector;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.NullConduitSelector;
+import org.apache.cxf.endpoint.PreexistingConduitSelector;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.interceptor.OutgoingChainInterceptor;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.ConduitInitiatorManager;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.workqueue.OneShotAsyncExecutor;
+import org.apache.cxf.workqueue.SynchronousExecutor;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.ContextUtils;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
+import org.apache.cxf.ws.addressing.Names;
+
+
+
+/**
+ * Holder for utility methods relating to contexts. Somewhat stripped-down version of class
of same name in
+ * org.apache.cxf.ws.addressing.impl.
+ */
+final class InternalContextUtils {
+    private static final class DecoupledDestination implements Destination {
+        private final EndpointInfo ei;
+        private final EndpointReferenceType reference;
+
+        private DecoupledDestination(EndpointInfo ei, EndpointReferenceType reference) {
+            this.ei = ei;
+            this.reference = reference;
+        }
+
+        public EndpointReferenceType getAddress() {
+            return reference;
+        }
+
+        public Conduit getBackChannel(Message inMessage) throws IOException {
+            if (ContextUtils.isNoneAddress(reference)) {
+                return null;
+            }
+            Bus bus = inMessage.getExchange().get(Bus.class);
+            //this is a response targeting a decoupled endpoint.   Treat it as a oneway so
+            //we don't wait for a response.
+            inMessage.getExchange().setOneWay(true);
+            ConduitInitiator conduitInitiator 
+                = bus.getExtension(ConduitInitiatorManager.class)
+                    .getConduitInitiatorForUri(reference.getAddress().getValue());
+            if (conduitInitiator != null) {
+                Conduit c = conduitInitiator.getConduit(ei, reference, bus);
+                // ensure decoupled back channel input stream is closed
+                c.setMessageObserver(new MessageObserver() {
+                    public void onMessage(Message m) {
+                        InputStream is = m.getContent(InputStream.class);
+                        if (is != null) {
+                            try {
+                                is.close();
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        }
+                    }
+                });
+                return c;
+            }
+            return null;
+        }
+
+        public MessageObserver getMessageObserver() {
+            return null;
+        }
+
+        public void shutdown() {
+        }
+
+        public void setMessageObserver(MessageObserver observer) {
+        }
+    }
+
+    private static final Logger LOG = LogUtils.getL7dLogger(InternalContextUtils.class);
+
+   /**
+    * Prevents instantiation.
+    */
+    private InternalContextUtils() {
+    }
+
+    
+    /**
+     * Rebase response on replyTo
+     * 
+     * @param reference the replyTo reference
+     * @param inMAPs the inbound MAPs
+     * @param inMessage the current message
+     */
+    //CHECKSTYLE:OFF  Max executable statement count limitation
+    public static void rebaseResponse(EndpointReferenceType reference,
+                                      AddressingProperties inMAPs,
+                                      final Message inMessage) {
+        
+        String namespaceURI = inMAPs.getNamespaceURI();
+        if (!ContextUtils.retrievePartialResponseSent(inMessage)) {
+            ContextUtils.storePartialResponseSent(inMessage);
+            Exchange exchange = inMessage.getExchange();
+            Message fullResponse = exchange.getOutMessage();
+            Message partialResponse = ContextUtils.createMessage(exchange);
+            ensurePartialResponseMAPs(partialResponse, namespaceURI);
+            
+            // ensure the inbound MAPs are available in the partial response
+            // message (used to determine relatesTo etc.)
+            ContextUtils.propogateReceivedMAPs(inMAPs, partialResponse);
+            Destination target = inMessage.getDestination();
+            if (target == null) {
+                return;
+            }
+            
+            try {
+                if (reference == null) {
+                    reference = ContextUtils.getNoneEndpointReference();
+                }
+                Conduit backChannel = target.getBackChannel(inMessage);
+                if (backChannel != null) {
+                    partialResponse.put(Message.PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE);
+                    partialResponse.put(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE);
+                    boolean robust =
+                        MessageUtils.isTrue(inMessage.getContextualProperty(Message.ROBUST_ONEWAY));
+                    
+                    if (robust) {
+                        BindingOperationInfo boi = exchange.get(BindingOperationInfo.class);
+                        // insert the executor in the exchange to fool the OneWayProcessorInterceptor
+                        exchange.put(Executor.class, getExecutor(inMessage));
+                        // pause dispatch on current thread and resume...
+                        inMessage.getInterceptorChain().pause();
+                        inMessage.getInterceptorChain().resume();
+                        // restore the BOI for the partial response handling
+                        exchange.put(BindingOperationInfo.class, boi);
+                    }
+                    
+                    
+                    // set up interceptor chains and send message
+                    InterceptorChain chain =
+                        fullResponse != null
+                        ? fullResponse.getInterceptorChain()
+                        : OutgoingChainInterceptor.getOutInterceptorChain(exchange);
+                    exchange.setOutMessage(partialResponse);
+                    partialResponse.setInterceptorChain(chain);
+                    exchange.put(ConduitSelector.class,
+                                 new PreexistingConduitSelector(backChannel,
+                                                                exchange.get(Endpoint.class)));
+
+                    if (chain != null && !chain.doIntercept(partialResponse) 
+                        && partialResponse.getContent(Exception.class) != null) {
+                        if (partialResponse.getContent(Exception.class) instanceof Fault)
{
+                            throw (Fault)partialResponse.getContent(Exception.class);
+                        } else {
+                            throw new Fault(partialResponse.getContent(Exception.class));
+                        }
+                    }
+                    if (chain != null) {
+                        chain.reset();                        
+                    }
+                    exchange.put(ConduitSelector.class, new NullConduitSelector());
+                    
+                    if (fullResponse == null) {
+                        fullResponse = ContextUtils.createMessage(exchange);
+                    }
+                    exchange.setOutMessage(fullResponse);
+                    
+                    Destination destination = createDecoupledDestination(
+                        exchange, 
+                        reference);
+                    exchange.setDestination(destination);
+
+                } 
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "SERVER_TRANSPORT_REBASE_FAILURE_MSG", e);
+            }
+        }
+    }
+    //CHECKSTYLE:ON
+
+    private static Destination createDecoupledDestination(
+        Exchange exchange, final EndpointReferenceType reference) {
+        final EndpointInfo ei = exchange.get(Endpoint.class).getEndpointInfo();
+        return new DecoupledDestination(ei, reference);
+    }
+    
+    /**
+     * Construct and store MAPs for partial response.
+     * 
+     * @param partialResponse the partial response message
+     * @param namespaceURI the current namespace URI
+     */
+    private static void ensurePartialResponseMAPs(Message partialResponse,
+                                                 String namespaceURI) {
+        // ensure there is a MAPs instance available for the outbound
+        // partial response that contains appropriate To and ReplyTo
+        // properties (i.e. anonymous & none respectively)
+        AddressingProperties maps = new AddressingProperties();
+        maps.setTo(EndpointReferenceUtils.getAnonymousEndpointReference());
+        maps.setReplyTo(ContextUtils.WSA_OBJECT_FACTORY.createEndpointReferenceType());
+        maps.getReplyTo().setAddress(ContextUtils.getAttributedURI(Names.WSA_NONE_ADDRESS));
+        maps.setAction(ContextUtils.getAttributedURI(""));
+        maps.exposeAs(namespaceURI);
+        ContextUtils.storeMAPs(maps, partialResponse, true, true, false);
+    }
+
+    /**
+     * Get the Executor for this invocation.
+     * @param endpoint
+     * @return
+     */
+    private static Executor getExecutor(final Message message) {
+        Endpoint endpoint = message.getExchange().get(Endpoint.class);
+        Executor executor = endpoint.getService().getExecutor();
+        
+        if (executor == null || SynchronousExecutor.isA(executor)) {
+            // need true asynchrony
+            Bus bus = message.getExchange().get(Bus.class);
+            if (bus != null) {
+                WorkQueueManager workQueueManager =
+                    bus.getExtension(WorkQueueManager.class);
+                Executor autoWorkQueue =
+                    workQueueManager.getNamedWorkQueue("ws-addressing");
+                executor = autoWorkQueue != null
+                           ? autoWorkQueue
+                           :  workQueueManager.getAutomaticWorkQueue();
+            } else {
+                executor = OneShotAsyncExecutor.getInstance();
+            }
+        }
+        message.getExchange().put(Executor.class, executor);
+        return executor;
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
index 95eb200..8d3aa17 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
@@ -115,10 +115,10 @@ public class RMOutInterceptor extends AbstractRMInterceptor<Message>
 {
             if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size()
> 0) {
                 setAction(maps, constants.getSequenceAckAction());
                 msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE);
+                isAck = true;
             }
         } 
-        if (isAck || constants.getSequenceAckAction().equals(action)
-            || (constants.getTerminateSequenceAction().equals(action)
+        if (isAck || (constants.getTerminateSequenceAction().equals(action)
                 && RM10Constants.NAMESPACE_URI.equals(rmNamespace))) {
             maps.setReplyTo(RMUtils.createNoneReference());
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
index e46bf87..a189932 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
@@ -484,13 +484,7 @@ public class DestinationSequenceTest extends Assert {
         EasyMock.expect(r.getLower()).andReturn(new Long(5));
         EasyMock.expect(r.getUpper()).andReturn(new Long(15));
         control.replay();     
-        try {
-            ds.applyDeliveryAssurance(mn, null);
-            fail("Expected Fault not thrown.");
-        } catch (RMException ex) {
-            assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode());
-        }
-          
+        ds.applyDeliveryAssurance(mn, null);
         control.verify();
 
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
index 2405542..2288247 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java
@@ -19,8 +19,10 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 
+import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -96,7 +98,7 @@ public class DestinationTest extends Assert {
     }
     
     @Test
-    public void testAcknowledgeNoSequence() throws SequenceFault, RMException {
+    public void testAcknowledgeNoSequence() throws SequenceFault, RMException, IOException
{
         Message message = setupMessage();
         RMProperties rmps = control.createMock(RMProperties.class);
         EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
@@ -106,7 +108,7 @@ public class DestinationTest extends Assert {
     }
     
     @Test
-    public void testAcknowledgeUnknownSequence() throws RMException {
+    public void testAcknowledgeUnknownSequence() throws RMException, IOException {
         Message message = setupMessage();
         RMProperties rmps = control.createMock(RMProperties.class);
         EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
@@ -129,7 +131,7 @@ public class DestinationTest extends Assert {
     
     @Test
     public void testAcknowledgeAlreadyAcknowledgedMessage() throws SequenceFault, RMException,

-    NoSuchMethodException {
+    NoSuchMethodException, IOException {
         
         Method m1 = Destination.class.getDeclaredMethod("getSequence", new Class[] {Identifier.class});
         destination = EasyMock.createMockBuilder(Destination.class)
@@ -145,17 +147,12 @@ public class DestinationTest extends Assert {
         EasyMock.expect(destination.getSequence(id)).andReturn(ds);
         long nr = 10;
         EasyMock.expect(st.getMessageNumber()).andReturn(nr);  
-        RMException ex = new RMException(new RuntimeException("already acknowledged"));
         ds.applyDeliveryAssurance(nr, message);
-        EasyMock.expectLastCall().andThrow(ex);
+        EasyMock.expectLastCall().andReturn(false);
+        InterceptorChain ic = control.createMock(InterceptorChain.class);
+        EasyMock.expect(message.getInterceptorChain()).andReturn(ic);
         control.replay();
-        try {
-            destination.acknowledge(message); 
-            fail("Expected RMEcception not thrown.");
-        } catch (RMException e) {
-            assertSame(ex, e);
-        }
-        
+        destination.acknowledge(message); 
     }
     
 /*    @Test
@@ -218,12 +215,15 @@ public class DestinationTest extends Assert {
         destination.acknowledge(message);   
     }   */
     
-    private Message setupMessage() {
+    private Message setupMessage() throws IOException {
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
-        EasyMock.expect(message.getExchange()).andReturn(exchange);
-        EasyMock.expect(exchange.getOutMessage()).andReturn(null);
-        EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null);
+        org.apache.cxf.transport.Destination tdest = control.createMock(org.apache.cxf.transport.Destination.class);
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
+        EasyMock.expect(exchange.getOutMessage()).andReturn(null).anyTimes();
+        EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null).anyTimes();
+        EasyMock.expect(exchange.getDestination()).andReturn(tdest).anyTimes();
+        EasyMock.expect(tdest.getBackChannel(message)).andReturn(null).anyTimes();
         return message;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
index 8642542..4946657 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
@@ -439,10 +439,13 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         assertEquals(0, inRecorder.getInboundMessages().size());
 
         // allow resends to kick in
-        // await multiple of 3 resends to avoid shutting down server
-        // in the course of retransmission - this is harmless but pollutes test output
+        // first duplicate received will trigger acknowledgement
+        awaitMessages(1, 1, 3000);
         
-        awaitMessages(3, 0, 7500);
+        mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
+        mf.verifyMessages(1, true);
+        mf.verifyMessages(1, false);
+        mf.verifyAcknowledgements(new boolean[] {true}, false);
         
     }
     
@@ -764,17 +767,16 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         greeter.greetMe("one");
         try {
             greeter.greetMe("two");
-            fail("Expected fault.");
+            fail("Expected timeout.");
         } catch (WebServiceException ex) {
-            SoapFault sf = (SoapFault)ex.getCause();
-            assertEquals("Unexpected fault code.", Soap11.getInstance().getReceiver(), sf.getFaultCode());
-            assertNull("Unexpected sub code.", sf.getSubCode());
-            assertTrue("Unexpected reason.", sf.getReason().endsWith("has already been delivered."));
+            assertTrue("Unexpected exception cause", ex.getCause() instanceof IOException);
+            IOException ie = (IOException)ex.getCause();
+            assertTrue("Unexpected IOException message", ie.getMessage().startsWith("Timed
out"));
         }
         
         // wait for resend to occur 
         
-        awaitMessages(3, 3, 5000);
+        awaitMessages(4, 3, 5000);
          
         MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(),
             inRecorder.getInboundMessages(), Names200408.WSA_NAMESPACE_NAME, RM10Constants.NAMESPACE_URI);
@@ -782,30 +784,30 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         // Expected outbound:
         // CreateSequence 
         // + two requests
+        // + acknowledgement
        
-        String[] expectedActions = new String[3];
+        String[] expectedActions = new String[4];
         expectedActions[0] = RM10Constants.CREATE_SEQUENCE_ACTION;        
-        for (int i = 1; i < expectedActions.length; i++) {
-            expectedActions[i] = GREETME_ACTION;
-        }
+        expectedActions[1] = GREETME_ACTION;
+        expectedActions[2] = GREETME_ACTION;
+        expectedActions[3] = RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION;
         mf.verifyActions(expectedActions, true);
-        mf.verifyMessageNumbers(new String[] {null, "1", "1"}, true);
-        mf.verifyLastMessage(new boolean[3], true);
-        mf.verifyAcknowledgements(new boolean[3], true);
+        mf.verifyMessageNumbers(new String[] {null, "1", "1", null}, true);
+        mf.verifyLastMessage(new boolean[expectedActions.length], true);
+        mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, true);
  
         // Expected inbound:
         // createSequenceResponse
         // + 1 response without acknowledgement
-        // + 1 fault
+        // + 1 acknowledgement/last message
         
         mf.verifyMessages(3, false);
         expectedActions = new String[] {RM10Constants.CREATE_SEQUENCE_RESPONSE_ACTION,
                                         GREETME_RESPONSE_ACTION, 
-                                        RM10_GENERIC_FAULT_ACTION};
+                                        RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION};
         mf.verifyActions(expectedActions, false);
         mf.verifyMessageNumbers(new String[] {null, "1", null}, false);
-        mf.verifyAcknowledgements(new boolean[3] , false);
-        
+        mf.verifyAcknowledgements(new boolean[] {false, false, true}, false);
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
index f78e2e0..342c0b9 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml
@@ -42,7 +42,9 @@
             <ref bean="rmLogicalIn"/>
             <ref bean="rmSoapIn"/>
             <ref bean="rmDelivery"/>
-            <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
+            <bean class="org.apache.cxf.interceptor.LoggingInInterceptor">
+                <property name="prettyLogging" value="true"/>
+            </bean>
         </cxf:inInterceptors>
         <cxf:inFaultInterceptors>
             <ref bean="mapAggregator"/>
@@ -58,7 +60,9 @@
             <ref bean="rmLogicalOut"/>
             <ref bean="rmSoapOut"/>
             <ref bean="rmCaptureOut"/>
-            <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
+            <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor">
+                <property name="prettyLogging" value="true"/>
+            </bean>
         </cxf:outInterceptors>
         <cxf:outFaultInterceptors>
             <ref bean="mapAggregator"/>

http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
index e1b9191..f86b2f9 100644
--- a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
+++ b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java
@@ -69,7 +69,7 @@ public class RM12PolicyWsdlTest extends RMPolicyWsdlTestBase {
     @BeforeClass
     public static void startServers() throws Exception {
         TestUtil.getNewPortNumber("decoupled");
-        assertTrue("server did not launch correctly", launchServer(Server.class, true));
+        assertTrue("server did not launch correctly", launchServer(Server.class, false));
     }
          
     @Test


Mime
View raw message