Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0E96112A1 for ; Mon, 23 Jun 2014 01:33:09 +0000 (UTC) Received: (qmail 36030 invoked by uid 500); 23 Jun 2014 01:33:09 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 35792 invoked by uid 500); 23 Jun 2014 01:33:09 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 35541 invoked by uid 99); 23 Jun 2014 01:33:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jun 2014 01:33:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0411B886A0D; Mon, 23 Jun 2014 01:33:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dsosnoski@apache.org To: commits@cxf.apache.org Date: Mon, 23 Jun 2014 01:33:11 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/5] git commit: [CXF-3272] Return acknowledgement rather than Fault when duplicate message received. [CXF-3272] Return acknowledgement rather than Fault when duplicate message received. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c613aa49 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c613aa49 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c613aa49 Branch: refs/heads/master Commit: c613aa4919e81a3027d0dfbbf3c4ee0a1c6ce348 Parents: 0494f76 Author: dsosnoski Authored: Mon Jun 23 11:52:03 2014 +1200 Committer: dsosnoski Committed: Mon Jun 23 11:52:03 2014 +1200 ---------------------------------------------------------------------- .../java/org/apache/cxf/ws/rm/Destination.java | 22 +- .../apache/cxf/ws/rm/DestinationSequence.java | 31 ++- .../apache/cxf/ws/rm/InternalContextUtils.java | 275 +++++++++++++++++++ .../org/apache/cxf/ws/rm/RMOutInterceptor.java | 4 +- .../cxf/ws/rm/DestinationSequenceTest.java | 8 +- .../org/apache/cxf/ws/rm/DestinationTest.java | 32 +-- .../apache/cxf/systest/ws/rm/SequenceTest.java | 42 +-- .../apache/cxf/systest/ws/rm/rminterceptors.xml | 8 +- .../systest/ws/policy/RM12PolicyWsdlTest.java | 2 +- 9 files changed, 357 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java index 378c9b2..77c9c49 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java @@ -126,16 +126,24 @@ public class Destination extends AbstractEndpoint { } else { try { message.getInterceptorChain().abort(); - Conduit conduit = message.getExchange().getDestination() - .getBackChannel(message); + if (seq.sendAcknowledgement()) { + ackImmediately(seq, message); + } + Exchange exchange = message.getExchange(); + Conduit conduit = exchange.getDestination().getBackChannel(message); if (conduit != null) { //for a one-way, the back channel could be //null if it knows it cannot send anything. - Message partial = createMessage(message.getExchange()); - partial.remove(Message.CONTENT_TYPE); - partial.setExchange(message.getExchange()); - conduit.prepare(partial); - conduit.close(partial); + if (seq.sendAcknowledgement()) { + AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false); + InternalContextUtils.rebaseResponse(null, maps, message); + } else { + Message response = createMessage(exchange); + response.setExchange(exchange); + response.remove(Message.CONTENT_TYPE); + conduit.prepare(response); + conduit.close(response); + } } } catch (IOException e) { LOG.log(Level.SEVERE, e.getMessage()); http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java index d0aef1d..36f44d6 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java @@ -59,6 +59,7 @@ public class DestinationSequence extends AbstractSequence { private String correlationID; private volatile long inProcessNumber; private volatile long highNumberCompleted; + private long nextInOrder; private List continuations = new LinkedList(); private Set deliveringMessageNumbers = new HashSet(); @@ -223,23 +224,21 @@ public class DestinationSequence extends AbstractSequence { } /** - * Ensures that the delivery assurance is honored, e.g. by throwing an - * exception if the message had already been delivered and the delivery - * assurance is AtMostOnce. + * Ensures that the delivery assurance is honored. * If the delivery assurance includes either AtLeastOnce or ExactlyOnce, combined with InOrder, this * queues out-of-order messages for processing after the missing messages have been received. * * @param mn message number * @return true if message processing to continue, false if to be dropped - * @throws RMException if message had already been acknowledged */ - boolean applyDeliveryAssurance(long mn, Message message) throws RMException { + boolean applyDeliveryAssurance(long mn, Message message) { Continuation cont = getContinuation(message); RMConfiguration config = destination.getReliableEndpoint().getConfiguration(); DeliveryAssurance da = config.getDeliveryAssurance(); boolean canSkip = da != DeliveryAssurance.AT_LEAST_ONCE && da != DeliveryAssurance.EXACTLY_ONCE; boolean robust = false; boolean robustDelivering = false; + boolean inOrder = mn - nextInOrder == 1; if (message != null) { robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY)); if (robust) { @@ -250,22 +249,30 @@ public class DestinationSequence extends AbstractSequence { if (robust && !robustDelivering) { // no check performed if in robust and not in delivering removeDeliveringMessageNumber(mn); + if (inOrder) { + nextInOrder++; + } return true; } + if (inOrder) { + nextInOrder++; + } else { + + // message out of order, schedule acknowledgement to update sender + scheduleImmediateAcknowledgement(); + if (nextInOrder < mn) { + nextInOrder = mn + 1; + } + } if (cont != null && config.isInOrder() && !cont.isNew()) { return waitInQueue(mn, canSkip, message, cont); } if ((da == DeliveryAssurance.EXACTLY_ONCE || da == DeliveryAssurance.AT_MOST_ONCE) - && (isAcknowledged(mn) - || (robustDelivering && deliveringMessageNumbers.contains(mn)))) { - - // acknowledge at first opportunity following duplicate message - scheduleImmediateAcknowledgement(); + && (isAcknowledged(mn) || (robustDelivering && deliveringMessageNumbers.contains(mn)))) { org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message( "MESSAGE_ALREADY_DELIVERED_EXC", LOG, mn, getIdentifier().getValue()); LOG.log(Level.INFO, msg.toString()); - throw new RMException(msg); - + return false; } if (robustDelivering) { deliveringMessageNumbers.add(mn); http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java new file mode 100644 index 0000000..eac5662 --- /dev/null +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/InternalContextUtils.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.ws.rm; + + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.cxf.Bus; +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.endpoint.ConduitSelector; +import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.endpoint.NullConduitSelector; +import org.apache.cxf.endpoint.PreexistingConduitSelector; +import org.apache.cxf.interceptor.Fault; +import org.apache.cxf.interceptor.InterceptorChain; +import org.apache.cxf.interceptor.OutgoingChainInterceptor; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.service.model.BindingOperationInfo; +import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.transport.Conduit; +import org.apache.cxf.transport.ConduitInitiator; +import org.apache.cxf.transport.ConduitInitiatorManager; +import org.apache.cxf.transport.Destination; +import org.apache.cxf.transport.MessageObserver; +import org.apache.cxf.workqueue.OneShotAsyncExecutor; +import org.apache.cxf.workqueue.SynchronousExecutor; +import org.apache.cxf.workqueue.WorkQueueManager; +import org.apache.cxf.ws.addressing.AddressingProperties; +import org.apache.cxf.ws.addressing.ContextUtils; +import org.apache.cxf.ws.addressing.EndpointReferenceType; +import org.apache.cxf.ws.addressing.EndpointReferenceUtils; +import org.apache.cxf.ws.addressing.Names; + + + +/** + * Holder for utility methods relating to contexts. Somewhat stripped-down version of class of same name in + * org.apache.cxf.ws.addressing.impl. + */ +final class InternalContextUtils { + private static final class DecoupledDestination implements Destination { + private final EndpointInfo ei; + private final EndpointReferenceType reference; + + private DecoupledDestination(EndpointInfo ei, EndpointReferenceType reference) { + this.ei = ei; + this.reference = reference; + } + + public EndpointReferenceType getAddress() { + return reference; + } + + public Conduit getBackChannel(Message inMessage) throws IOException { + if (ContextUtils.isNoneAddress(reference)) { + return null; + } + Bus bus = inMessage.getExchange().get(Bus.class); + //this is a response targeting a decoupled endpoint. Treat it as a oneway so + //we don't wait for a response. + inMessage.getExchange().setOneWay(true); + ConduitInitiator conduitInitiator + = bus.getExtension(ConduitInitiatorManager.class) + .getConduitInitiatorForUri(reference.getAddress().getValue()); + if (conduitInitiator != null) { + Conduit c = conduitInitiator.getConduit(ei, reference, bus); + // ensure decoupled back channel input stream is closed + c.setMessageObserver(new MessageObserver() { + public void onMessage(Message m) { + InputStream is = m.getContent(InputStream.class); + if (is != null) { + try { + is.close(); + } catch (Exception e) { + // ignore + } + } + } + }); + return c; + } + return null; + } + + public MessageObserver getMessageObserver() { + return null; + } + + public void shutdown() { + } + + public void setMessageObserver(MessageObserver observer) { + } + } + + private static final Logger LOG = LogUtils.getL7dLogger(InternalContextUtils.class); + + /** + * Prevents instantiation. + */ + private InternalContextUtils() { + } + + + /** + * Rebase response on replyTo + * + * @param reference the replyTo reference + * @param inMAPs the inbound MAPs + * @param inMessage the current message + */ + //CHECKSTYLE:OFF Max executable statement count limitation + public static void rebaseResponse(EndpointReferenceType reference, + AddressingProperties inMAPs, + final Message inMessage) { + + String namespaceURI = inMAPs.getNamespaceURI(); + if (!ContextUtils.retrievePartialResponseSent(inMessage)) { + ContextUtils.storePartialResponseSent(inMessage); + Exchange exchange = inMessage.getExchange(); + Message fullResponse = exchange.getOutMessage(); + Message partialResponse = ContextUtils.createMessage(exchange); + ensurePartialResponseMAPs(partialResponse, namespaceURI); + + // ensure the inbound MAPs are available in the partial response + // message (used to determine relatesTo etc.) + ContextUtils.propogateReceivedMAPs(inMAPs, partialResponse); + Destination target = inMessage.getDestination(); + if (target == null) { + return; + } + + try { + if (reference == null) { + reference = ContextUtils.getNoneEndpointReference(); + } + Conduit backChannel = target.getBackChannel(inMessage); + if (backChannel != null) { + partialResponse.put(Message.PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE); + partialResponse.put(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE, Boolean.TRUE); + boolean robust = + MessageUtils.isTrue(inMessage.getContextualProperty(Message.ROBUST_ONEWAY)); + + if (robust) { + BindingOperationInfo boi = exchange.get(BindingOperationInfo.class); + // insert the executor in the exchange to fool the OneWayProcessorInterceptor + exchange.put(Executor.class, getExecutor(inMessage)); + // pause dispatch on current thread and resume... + inMessage.getInterceptorChain().pause(); + inMessage.getInterceptorChain().resume(); + // restore the BOI for the partial response handling + exchange.put(BindingOperationInfo.class, boi); + } + + + // set up interceptor chains and send message + InterceptorChain chain = + fullResponse != null + ? fullResponse.getInterceptorChain() + : OutgoingChainInterceptor.getOutInterceptorChain(exchange); + exchange.setOutMessage(partialResponse); + partialResponse.setInterceptorChain(chain); + exchange.put(ConduitSelector.class, + new PreexistingConduitSelector(backChannel, + exchange.get(Endpoint.class))); + + if (chain != null && !chain.doIntercept(partialResponse) + && partialResponse.getContent(Exception.class) != null) { + if (partialResponse.getContent(Exception.class) instanceof Fault) { + throw (Fault)partialResponse.getContent(Exception.class); + } else { + throw new Fault(partialResponse.getContent(Exception.class)); + } + } + if (chain != null) { + chain.reset(); + } + exchange.put(ConduitSelector.class, new NullConduitSelector()); + + if (fullResponse == null) { + fullResponse = ContextUtils.createMessage(exchange); + } + exchange.setOutMessage(fullResponse); + + Destination destination = createDecoupledDestination( + exchange, + reference); + exchange.setDestination(destination); + + } + } catch (Exception e) { + LOG.log(Level.WARNING, "SERVER_TRANSPORT_REBASE_FAILURE_MSG", e); + } + } + } + //CHECKSTYLE:ON + + private static Destination createDecoupledDestination( + Exchange exchange, final EndpointReferenceType reference) { + final EndpointInfo ei = exchange.get(Endpoint.class).getEndpointInfo(); + return new DecoupledDestination(ei, reference); + } + + /** + * Construct and store MAPs for partial response. + * + * @param partialResponse the partial response message + * @param namespaceURI the current namespace URI + */ + private static void ensurePartialResponseMAPs(Message partialResponse, + String namespaceURI) { + // ensure there is a MAPs instance available for the outbound + // partial response that contains appropriate To and ReplyTo + // properties (i.e. anonymous & none respectively) + AddressingProperties maps = new AddressingProperties(); + maps.setTo(EndpointReferenceUtils.getAnonymousEndpointReference()); + maps.setReplyTo(ContextUtils.WSA_OBJECT_FACTORY.createEndpointReferenceType()); + maps.getReplyTo().setAddress(ContextUtils.getAttributedURI(Names.WSA_NONE_ADDRESS)); + maps.setAction(ContextUtils.getAttributedURI("")); + maps.exposeAs(namespaceURI); + ContextUtils.storeMAPs(maps, partialResponse, true, true, false); + } + + /** + * Get the Executor for this invocation. + * @param endpoint + * @return + */ + private static Executor getExecutor(final Message message) { + Endpoint endpoint = message.getExchange().get(Endpoint.class); + Executor executor = endpoint.getService().getExecutor(); + + if (executor == null || SynchronousExecutor.isA(executor)) { + // need true asynchrony + Bus bus = message.getExchange().get(Bus.class); + if (bus != null) { + WorkQueueManager workQueueManager = + bus.getExtension(WorkQueueManager.class); + Executor autoWorkQueue = + workQueueManager.getNamedWorkQueue("ws-addressing"); + executor = autoWorkQueue != null + ? autoWorkQueue + : workQueueManager.getAutomaticWorkQueue(); + } else { + executor = OneShotAsyncExecutor.getInstance(); + } + } + message.getExchange().put(Executor.class, executor); + return executor; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java index 95eb200..8d3aa17 100644 --- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java +++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java @@ -115,10 +115,10 @@ public class RMOutInterceptor extends AbstractRMInterceptor { if (isPartialResponse && rmpsOut.getAcks() != null && rmpsOut.getAcks().size() > 0) { setAction(maps, constants.getSequenceAckAction()); msg.remove(Message.EMPTY_PARTIAL_RESPONSE_MESSAGE); + isAck = true; } } - if (isAck || constants.getSequenceAckAction().equals(action) - || (constants.getTerminateSequenceAction().equals(action) + if (isAck || (constants.getTerminateSequenceAction().equals(action) && RM10Constants.NAMESPACE_URI.equals(rmNamespace))) { maps.setReplyTo(RMUtils.createNoneReference()); } http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java index e46bf87..a189932 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java @@ -484,13 +484,7 @@ public class DestinationSequenceTest extends Assert { EasyMock.expect(r.getLower()).andReturn(new Long(5)); EasyMock.expect(r.getUpper()).andReturn(new Long(15)); control.replay(); - try { - ds.applyDeliveryAssurance(mn, null); - fail("Expected Fault not thrown."); - } catch (RMException ex) { - assertEquals("MESSAGE_ALREADY_DELIVERED_EXC", ex.getCode()); - } - + ds.applyDeliveryAssurance(mn, null); control.verify(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java ---------------------------------------------------------------------- diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java index 2405542..2288247 100644 --- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java +++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationTest.java @@ -19,8 +19,10 @@ package org.apache.cxf.ws.rm; +import java.io.IOException; import java.lang.reflect.Method; +import org.apache.cxf.interceptor.InterceptorChain; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.ws.rm.persistence.RMStore; @@ -96,7 +98,7 @@ public class DestinationTest extends Assert { } @Test - public void testAcknowledgeNoSequence() throws SequenceFault, RMException { + public void testAcknowledgeNoSequence() throws SequenceFault, RMException, IOException { Message message = setupMessage(); RMProperties rmps = control.createMock(RMProperties.class); EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps); @@ -106,7 +108,7 @@ public class DestinationTest extends Assert { } @Test - public void testAcknowledgeUnknownSequence() throws RMException { + public void testAcknowledgeUnknownSequence() throws RMException, IOException { Message message = setupMessage(); RMProperties rmps = control.createMock(RMProperties.class); EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps); @@ -129,7 +131,7 @@ public class DestinationTest extends Assert { @Test public void testAcknowledgeAlreadyAcknowledgedMessage() throws SequenceFault, RMException, - NoSuchMethodException { + NoSuchMethodException, IOException { Method m1 = Destination.class.getDeclaredMethod("getSequence", new Class[] {Identifier.class}); destination = EasyMock.createMockBuilder(Destination.class) @@ -145,17 +147,12 @@ public class DestinationTest extends Assert { EasyMock.expect(destination.getSequence(id)).andReturn(ds); long nr = 10; EasyMock.expect(st.getMessageNumber()).andReturn(nr); - RMException ex = new RMException(new RuntimeException("already acknowledged")); ds.applyDeliveryAssurance(nr, message); - EasyMock.expectLastCall().andThrow(ex); + EasyMock.expectLastCall().andReturn(false); + InterceptorChain ic = control.createMock(InterceptorChain.class); + EasyMock.expect(message.getInterceptorChain()).andReturn(ic); control.replay(); - try { - destination.acknowledge(message); - fail("Expected RMEcception not thrown."); - } catch (RMException e) { - assertSame(ex, e); - } - + destination.acknowledge(message); } /* @Test @@ -218,12 +215,15 @@ public class DestinationTest extends Assert { destination.acknowledge(message); } */ - private Message setupMessage() { + private Message setupMessage() throws IOException { Message message = control.createMock(Message.class); Exchange exchange = control.createMock(Exchange.class); - EasyMock.expect(message.getExchange()).andReturn(exchange); - EasyMock.expect(exchange.getOutMessage()).andReturn(null); - EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null); + org.apache.cxf.transport.Destination tdest = control.createMock(org.apache.cxf.transport.Destination.class); + EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes(); + EasyMock.expect(exchange.getOutMessage()).andReturn(null).anyTimes(); + EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null).anyTimes(); + EasyMock.expect(exchange.getDestination()).andReturn(tdest).anyTimes(); + EasyMock.expect(tdest.getBackChannel(message)).andReturn(null).anyTimes(); return message; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java ---------------------------------------------------------------------- diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java index 8642542..4946657 100644 --- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java +++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java @@ -439,10 +439,13 @@ public class SequenceTest extends AbstractBusClientServerTestBase { assertEquals(0, inRecorder.getInboundMessages().size()); // allow resends to kick in - // await multiple of 3 resends to avoid shutting down server - // in the course of retransmission - this is harmless but pollutes test output + // first duplicate received will trigger acknowledgement + awaitMessages(1, 1, 3000); - awaitMessages(3, 0, 7500); + mf.reset(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages()); + mf.verifyMessages(1, true); + mf.verifyMessages(1, false); + mf.verifyAcknowledgements(new boolean[] {true}, false); } @@ -764,17 +767,16 @@ public class SequenceTest extends AbstractBusClientServerTestBase { greeter.greetMe("one"); try { greeter.greetMe("two"); - fail("Expected fault."); + fail("Expected timeout."); } catch (WebServiceException ex) { - SoapFault sf = (SoapFault)ex.getCause(); - assertEquals("Unexpected fault code.", Soap11.getInstance().getReceiver(), sf.getFaultCode()); - assertNull("Unexpected sub code.", sf.getSubCode()); - assertTrue("Unexpected reason.", sf.getReason().endsWith("has already been delivered.")); + assertTrue("Unexpected exception cause", ex.getCause() instanceof IOException); + IOException ie = (IOException)ex.getCause(); + assertTrue("Unexpected IOException message", ie.getMessage().startsWith("Timed out")); } // wait for resend to occur - awaitMessages(3, 3, 5000); + awaitMessages(4, 3, 5000); MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages(), Names200408.WSA_NAMESPACE_NAME, RM10Constants.NAMESPACE_URI); @@ -782,30 +784,30 @@ public class SequenceTest extends AbstractBusClientServerTestBase { // Expected outbound: // CreateSequence // + two requests + // + acknowledgement - String[] expectedActions = new String[3]; + String[] expectedActions = new String[4]; expectedActions[0] = RM10Constants.CREATE_SEQUENCE_ACTION; - for (int i = 1; i < expectedActions.length; i++) { - expectedActions[i] = GREETME_ACTION; - } + expectedActions[1] = GREETME_ACTION; + expectedActions[2] = GREETME_ACTION; + expectedActions[3] = RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION; mf.verifyActions(expectedActions, true); - mf.verifyMessageNumbers(new String[] {null, "1", "1"}, true); - mf.verifyLastMessage(new boolean[3], true); - mf.verifyAcknowledgements(new boolean[3], true); + mf.verifyMessageNumbers(new String[] {null, "1", "1", null}, true); + mf.verifyLastMessage(new boolean[expectedActions.length], true); + mf.verifyAcknowledgements(new boolean[] {false, false, false, true}, true); // Expected inbound: // createSequenceResponse // + 1 response without acknowledgement - // + 1 fault + // + 1 acknowledgement/last message mf.verifyMessages(3, false); expectedActions = new String[] {RM10Constants.CREATE_SEQUENCE_RESPONSE_ACTION, GREETME_RESPONSE_ACTION, - RM10_GENERIC_FAULT_ACTION}; + RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION}; mf.verifyActions(expectedActions, false); mf.verifyMessageNumbers(new String[] {null, "1", null}, false); - mf.verifyAcknowledgements(new boolean[3] , false); - + mf.verifyAcknowledgements(new boolean[] {false, false, true}, false); } @Test http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml ---------------------------------------------------------------------- diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml index f78e2e0..342c0b9 100644 --- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml +++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/rminterceptors.xml @@ -42,7 +42,9 @@ - + + + @@ -58,7 +60,9 @@ - + + + http://git-wip-us.apache.org/repos/asf/cxf/blob/c613aa49/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java ---------------------------------------------------------------------- diff --git a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java index e1b9191..f86b2f9 100644 --- a/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java +++ b/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/policy/RM12PolicyWsdlTest.java @@ -69,7 +69,7 @@ public class RM12PolicyWsdlTest extends RMPolicyWsdlTestBase { @BeforeClass public static void startServers() throws Exception { TestUtil.getNewPortNumber("decoupled"); - assertTrue("server did not launch correctly", launchServer(Server.class, true)); + assertTrue("server did not launch correctly", launchServer(Server.class, false)); } @Test