cxf-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dennis Sosnoski <...@sosnoski.com>
Subject Re: svn commit: r1297370 - in /cxf/branches/2.5.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/
Date Tue, 06 Mar 2012 08:16:03 GMT
Hi Aki,

On reviewing the code, I think you'll also need to make a change to
Destination.acknowledge() for this to work correctly. Right now
Destination.acknowledge() is what persists the received message to the
store, so if it's not called until processing is complete messages will
never be persisted. This should be moved out to a separate method which
can be called by RMInInterceptor.

Separately, it looks like we need to change the code to handle passing
persisted messages on to the application when recovering from the store.
It looks to me like at present messages will be acknowledged by the RM
layer but never delivered to the application if there's a crash or
shutdown while they're waiting to be processed. What do you think?

  - Dennis


On 03/06/2012 08:51 PM, ay@apache.org wrote:
> Author: ay
> Date: Tue Mar  6 07:51:02 2012
> New Revision: 1297370
>
> URL: http://svn.apache.org/viewvc?rev=1297370&view=rev
> Log:
> Merged revisions 1297296 via svnmerge from 
> https://svn.apache.org/repos/asf/cxf/trunk
>
> ........
>   r1297296 | ay | 2012-03-06 00:57:14 +0100 (Tue, 06 Mar 2012) | 1 line
>   
>   [CXF-4164] Robust-InOnly processing with WS-RM must delay updating the sequence until
message delivery
> ........
>
> Added:
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckBase.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckPersistenceTest.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
>       - copied unchanged from r1297296, cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/sync-ack-persistent-server.xml
> Modified:
>     cxf/branches/2.5.x-fixes/   (props changed)
>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
>     cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
>     cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
>
> Propchange: cxf/branches/2.5.x-fixes/
> ------------------------------------------------------------------------------
>     svn:mergeinfo = /cxf/trunk:1297296
>
> Propchange: cxf/branches/2.5.x-fixes/
> ------------------------------------------------------------------------------
> Binary property 'svnmerge-integrated' - no diff available.
>
> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
(original)
> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
Tue Mar  6 07:51:02 2012
> @@ -23,6 +23,7 @@ import java.util.logging.Logger;
>  
>  import org.apache.cxf.common.logging.LogUtils;
>  import org.apache.cxf.message.Message;
> +import org.apache.cxf.message.MessageUtils;
>  import org.apache.cxf.phase.Phase;
>  
>  /**
> @@ -42,6 +43,12 @@ public class RMDeliveryInterceptor exten
>      
>      public void handle(Message message) throws SequenceFault, RMException {
>          LOG.entering(getClass().getName(), "handleMessage");
> -        getManager().getDestination(message).processingComplete(message);
> +        Destination dest = getManager().getDestination(message);
> +        final boolean robust =
> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
> +        if (robust) {
> +            dest.acknowledge(message);
> +        }
> +        dest.processingComplete(message);
>      }
>  }
>
> Modified: cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
(original)
> +++ cxf/branches/2.5.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
Tue Mar  6 07:51:02 2012
> @@ -25,6 +25,7 @@ import java.util.logging.Logger;
>  
>  import org.apache.cxf.common.logging.LogUtils;
>  import org.apache.cxf.message.Message;
> +import org.apache.cxf.message.MessageUtils;
>  import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
>  import org.apache.cxf.ws.addressing.ContextUtils;
>  import org.apache.cxf.ws.addressing.MAPAggregator;
> @@ -150,7 +151,11 @@ public class RMInInterceptor extends Abs
>      
>      void processSequence(Destination destination, Message message) 
>          throws SequenceFault, RMException {
> -        destination.acknowledge(message);
> +        final boolean robust =
> +            MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
> +        if (!robust) {
> +            destination.acknowledge(message);
> +        }
>      }
>      
>      void processDeliveryAssurance(RMProperties rmps) {
>
> Modified: cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
> URL: http://svn.apache.org/viewvc/cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java?rev=1297370&r1=1297369&r2=1297370&view=diff
> ==============================================================================
> --- cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
(original)
> +++ cxf/branches/2.5.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServiceInvocationAckTest.java
Tue Mar  6 07:51:02 2012
> @@ -18,186 +18,12 @@
>   */
>  package org.apache.cxf.systest.ws.rm;
>  
> -import java.net.MalformedURLException;
> -import java.util.logging.Logger;
> -
> -import javax.xml.ws.Endpoint;
> -
> -import org.apache.cxf.Bus;
> -import org.apache.cxf.BusFactory;
> -import org.apache.cxf.bus.spring.SpringBusFactory;
> -import org.apache.cxf.common.logging.LogUtils;
> -import org.apache.cxf.greeter_control.Control;
> -import org.apache.cxf.greeter_control.ControlService;
> -import org.apache.cxf.greeter_control.Greeter;
> -import org.apache.cxf.greeter_control.GreeterService;
> -import org.apache.cxf.greeter_control.types.FaultLocation;
> -import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
> -import org.apache.cxf.phase.Phase;
> -import org.apache.cxf.test.TestUtilities;
> -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
> -import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
> -import org.apache.cxf.ws.rm.RMManager;
> -
> -import org.junit.After;
> -import org.junit.AfterClass;
> -import org.junit.BeforeClass;
> -import org.junit.Test;
> -
>  /**
>   * Tests the acknowledgement delivery back to the non-decoupled port when there is some
>   * error at the provider side and how its behavior is affected by the robust in-only
mode setting.
>   */
> -public class ServiceInvocationAckTest extends AbstractBusClientServerTestBase {
> -    public static final String PORT = allocatePort(Server.class);
> -    
> -    private static final Logger LOG = LogUtils.getLogger(ServiceInvocationAckTest.class);
> -
> -    private static final String CONTROL_PORT_ADDRESS = 
> -        "http://localhost:" + PORT + "/SoapContext/ControlPort";
> -
> -    public static class Server extends AbstractBusTestServerBase {
> -
> -        protected void run() {
> -            SpringBusFactory factory = new SpringBusFactory();
> -            Bus bus = factory.createBus();
> -            BusFactory.setDefaultBus(bus);
> -            setBus(bus);
> -
> -            ControlImpl implementor = new ControlImpl();
> -            implementor.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
> -            GreeterImpl greeterImplementor = new GreeterImpl();
> -            implementor.setImplementor(greeterImplementor);
> -            Endpoint.publish(CONTROL_PORT_ADDRESS, implementor);
> -            LOG.fine("Published control endpoint.");
> -        }
> -
> -        public static void main(String[] args) {
> -            try {
> -                Server s = new Server();
> -                s.start();
> -            } catch (Exception ex) {
> -                ex.printStackTrace();
> -                System.exit(-1);
> -            } finally {
> -                System.out.println("done!");
> -            }
> -        }
> -    }
> -    
> -    private Bus controlBus;
> -    private Control control;
> -    private Bus greeterBus;
> -    private Greeter greeter;
> -    
> -
> -    @BeforeClass
> -    public static void startServers() throws Exception {
> -        TestUtilities.setKeepAliveSystemProperty(false);
> -        assertTrue("server did not launch correctly", launchServer(Server.class, true));
> -    }
> -    
> -    @AfterClass
> -    public static void cleanup() {
> -        TestUtilities.recoverKeepAliveSystemProperty();
> -    }
> -    
> -    @After
> -    public void tearDown() {
> -        if (null != greeter) {
> -            assertTrue("Failed to stop greeter.", control.stopGreeter(null));
> -            greeterBus.shutdown(true);
> -            greeterBus = null;
> -        }
> -        if (null != control) {  
> -            assertTrue("Failed to stop greeter", control.stopGreeter(null));
> -            controlBus.shutdown(true);
> -        }
> -    }
> -
> -    @Test
> -    public void testDefaultInvocationHandling() throws Exception {
> +public class ServiceInvocationAckTest extends ServiceInvocationAckBase {
> +    protected void setupGreeter() throws Exception {
>          setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
> -
> -        control.setRobustInOnlyMode(false);
> -        
> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
> -            .createFaultLocation();
> -        location.setPhase(Phase.INVOKE);
> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
> -        
> -        RMManager manager = greeterBus.getExtension(RMManager.class);
> -
> -        // the message is acked and the invocation takes place
> -        greeter.greetMeOneWay("one");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -    
> -        control.setFaultLocation(location);
> -
> -        // the invocation fails but the message is acked because the delivery succeeds
> -        greeter.greetMeOneWay("two");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -    }
> -
> -    @Test
> -    public void testRobustInvocationHandling() throws Exception {
> -        setupGreeter("org/apache/cxf/systest/ws/rm/sync-ack-server.xml");
> -
> -        control.setRobustInOnlyMode(true);
> -        
> -        FaultLocation location = new org.apache.cxf.greeter_control.types.ObjectFactory()
> -            .createFaultLocation();
> -        location.setPhase(Phase.INVOKE);
> -        location.setBefore(ServiceInvokerInterceptor.class.getName());
> -        
> -        RMManager manager = greeterBus.getExtension(RMManager.class);
> -
> -        
> -        // the message is acked and the invocation takes place
> -        greeter.greetMeOneWay("one");
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -
> -        control.setFaultLocation(location);
> -
> -        // the invocation fails but the message is acked because the delivery succeeds
> -        greeter.greetMeOneWay("two");
> -        Thread.sleep(6000L);
> -        assertFalse("RetransmissionQueue must not be empty", manager.getRetransmissionQueue().isEmpty());
> -        
> -        location.setPhase(null);
> -        control.setFaultLocation(location);
> -
> -        // the retransmission succeeds and the invocation succeeds, the message is acked
> -        Thread.sleep(6000L);
> -        assertTrue("RetransmissionQueue must be empty", manager.getRetransmissionQueue().isEmpty());
> -        
> -    }
> -
> -    private void setupGreeter(String cfgResource) throws NumberFormatException, MalformedURLException
{
> -        
> -        SpringBusFactory bf = new SpringBusFactory();
> -        
> -        controlBus = bf.createBus();
> -        BusFactory.setDefaultBus(controlBus);
> -
> -        ControlService cs = new ControlService();
> -        control = cs.getControlPort();
> -        updateAddressPort(control, PORT);
> -        
> -        assertTrue("Failed to start greeter", control.startGreeter(cfgResource));
> -        
> -        greeterBus = bf.createBus(cfgResource);
> -        BusFactory.setDefaultBus(greeterBus);
> -        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
> -        
> -        GreeterService gs = new GreeterService();
> -
> -        greeter = gs.getGreeterPort();
> -        updateAddressPort(greeter, PORT);
> -        LOG.fine("Created greeter client.");
> -
>      }
>  }
>
>

Mime
View raw message