cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject svn commit: r1303303 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/
Date Wed, 21 Mar 2012 08:09:29 GMT
Author: ay
Date: Wed Mar 21 08:09:28 2012
New Revision: 1303303

URL: http://svn.apache.org/viewvc?rev=1303303&view=rev
Log:
[CXF-4188] Robust-InOnly processing with WS-RM to perform AtMostOnce check

Added:
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
    cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
  (with props)
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Wed Mar 21 08:09:28
2012
@@ -113,6 +113,9 @@ public class Destination extends Abstrac
 
         if (null != seq) {
             if (seq.applyDeliveryAssurance(sequenceType.getMessageNumber(), message)) {
+                if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY)))
{
+                    return;
+                }
                 seq.acknowledge(message);
     
                 if (null != rmps.getCloseSequence()) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Wed Mar
21 08:09:28 2012
@@ -21,8 +21,10 @@ package org.apache.cxf.ws.rm;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.TimerTask;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -32,6 +34,7 @@ import org.apache.cxf.continuations.Cont
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.rm.manager.AcksPolicyType;
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
@@ -60,6 +63,7 @@ public class DestinationSequence extends
     private long inProcessNumber;
     private long highNumberCompleted;
     private List<Continuation> continuations = new LinkedList<Continuation>();
+    private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation
pv) {
         this(i, a, 0, null, pv);
@@ -238,10 +242,26 @@ public class DestinationSequence extends
         Continuation cont = getContinuation(message);
         DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
         boolean canSkip = !da.isSetAtLeastOnce() && !da.isSetExactlyOnce();
+        boolean robust = false;
+        boolean robustDelivering = false;
+        if (message != null) {
+            robust = MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
+            if (robust) {
+                robustDelivering = 
+                    MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY));
+            }
+        }
+        if (robust && !robustDelivering) {
+            // no check performed if in robust and not in delivering
+            deliveringMessageNumbers.remove(mn);
+            return true;
+        }
         if (cont != null && da.isSetInOrder() && !cont.isNew()) {
             return waitInQueue(mn, canSkip, message, cont);
         }
-        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) && isAcknowledged(mn))
{            
+        if ((da.isSetExactlyOnce() || da.isSetAtMostOnce()) 
+            && (isAcknowledged(mn) 
+                || (robustDelivering && deliveringMessageNumbers.contains(mn))))
{            
             
             // acknowledge at first opportunity following duplicate message
             scheduleImmediateAcknowledgement();
@@ -251,6 +271,9 @@ public class DestinationSequence extends
             throw new RMException(msg);
             
         } 
+        if (robustDelivering) {
+            deliveringMessageNumbers.add(mn);
+        }
         if (da.isSetInOrder()) {
             return waitInQueue(mn, canSkip, message, cont);
         }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java Wed Mar
21 08:09:28 2012
@@ -47,6 +47,7 @@ public class RMDeliveryInterceptor exten
         final boolean robust =
             MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
         if (robust) {
+            message.remove(RMMessageConstants.DELIVERING_ROBUST_ONEWAY);
             dest.acknowledge(message);
         }
         dest.processingComplete(message);

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Wed Mar 21
08:09:28 2012
@@ -153,9 +153,11 @@ public class RMInInterceptor extends Abs
         throws SequenceFault, RMException {
         final boolean robust =
             MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
-        if (!robust) {
-            destination.acknowledge(message);
-        }
+        if (robust) {
+            // set this property to change the acknlowledging behavior
+            message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE);
+        } 
+        destination.acknowledge(message);
     }
     
     void processDeliveryAssurance(RMProperties rmps) {

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java?rev=1303303&r1=1303302&r2=1303303&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java Wed Mar
21 08:09:28 2012
@@ -42,6 +42,10 @@ public final class RMMessageConstants {
     public static final String SAVED_CONTENT =
         "org.apache.cxf.ws.rm.content";
     
+    // keep this constant in the ws-rm package until it finds a general use outside of ws-rm
+    static final String DELIVERING_ROBUST_ONEWAY = 
+        "org.apache.cxf.oneway.robust.delivering";
+    
     /**
      * Prevents instantiation. 
      */

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java?rev=1303303&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
(added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/RobustServiceAtMostOnceTest.java
Wed Mar 21 08:09:28 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Logger;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.ws.rm.RMManager;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the addition of WS-RM properties to application messages and the
+ * exchange of WS-RM protocol messages.
+ */
+public class RobustServiceAtMostOnceTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(Server.class); 
+    public static final String GREETMEONEWAY_ACTION 
+        = "http://cxf.apache.org/greeter_control/Greeter/greetMeOneWayRequest";
+    private static final Logger LOG = LogUtils.getLogger(RobustServiceAtMostOnceTest.class);
+    
+    private static GreeterCounterImpl serverGreeter;
+    private Greeter greeter;
+
+    
+    public static class Server extends AbstractBusTestServerBase {
+
+        protected void run() {
+            SpringBusFactory bf = new SpringBusFactory();
+            // use a at-most-once server with sync ack processing
+            Bus bus = bf.createBus("/org/apache/cxf/systest/ws/rm/atmostonce.xml");
+            BusFactory.setDefaultBus(bus);
+            bus.getExtension(RMManager.class).getRMAssertion().getAcknowledgementInterval()
+                .setMilliseconds(0L);
+
+            // add some intentional processing delay at inbound
+            SlowProcessingSimulator sps = new SlowProcessingSimulator();
+            sps.setAction("http://cxf.apache.org/greeter_control/Greeter/greetMeOneWayRequest");
+            sps.setDelay(10000L);
+            bus.getInInterceptors().add(sps);
+            serverGreeter = new GreeterCounterImpl();
+            String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+            
+            // publish this robust oneway endpoint
+            Endpoint ep = Endpoint.create(serverGreeter);
+            ep.getProperties().put(Message.ROBUST_ONEWAY, Boolean.TRUE);
+            ep.publish(address);
+            LOG.info("Published greeter endpoint.");
+        }
+
+        public static void main(String[] args) {
+            try {
+                Server s = new Server();
+                s.start();
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                System.exit(-1);
+            } finally {
+                System.out.println("done!");
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void startServers() throws Exception {
+        assertTrue("server did not launch correctly", launchServer(Server.class, true));
+    }
+    
+    @Test 
+    public void testRobustAtMostOnceWithSlowProcessing() throws Exception {
+        LOG.fine("Creating greeter client");
+        SpringBusFactory bf = new SpringBusFactory();
+        bus = bf.createBus("/org/apache/cxf/systest/ws/rm/seqlength1.xml");
+        // set the client retry interval much shorter than the slow processing delay
+        RMManager manager = bus.getExtension(RMManager.class); 
+        manager.getRMAssertion().getBaseRetransmissionInterval()
+            .setMilliseconds(3000L);
+
+        BusFactory.setDefaultBus(bus);
+        GreeterService gs = new GreeterService();
+        greeter = gs.getGreeterPort();
+        updateAddressPort(greeter, PORT);
+        
+        LOG.fine("Invoking greeter");
+        greeter.greetMeOneWay("one");
+        Thread.sleep(10000);
+        
+        assertEquals("invoked too many times", 1, serverGreeter.getCount());
+        assertTrue("still in retransmission", manager.getRetransmissionQueue().isEmpty());
+    }
+
+    private static class GreeterCounterImpl extends GreeterImpl {
+        private int count;
+
+        public void greetMeOneWay(String arg0) {
+            super.greetMeOneWay(arg0);
+            count++;
+        }
+        
+        public int getCount() {
+            return count;
+        }
+    }
+}

Added: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java?rev=1303303&view=auto
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
(added)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
Wed Mar 21 08:09:28 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.binding.soap.SoapBindingConstants;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+import org.apache.cxf.ws.addressing.AddressingProperties;
+import org.apache.cxf.ws.addressing.ContextUtils;
+
+public class SlowProcessingSimulator extends AbstractPhaseInterceptor<Message> {
+    private static final Logger LOG = LogUtils.getLogger(SlowProcessingSimulator.class);
+    
+    private long delay = 10000L;
+    private String action;
+    
+    public SlowProcessingSimulator() {
+        this(Phase.USER_PROTOCOL);
+    }
+    
+    public SlowProcessingSimulator(String p) {
+        super(p);
+    }
+
+    
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String action) {
+        this.action = action;
+    }
+
+    public void handleMessage(Message message) throws Fault {
+        try {
+            // sleep delay msec for the specified action or any action if unspecified.
+            String a = getAction(message);
+            LOG.log(Level.INFO, "action=" + a);
+            if (null == action || action.equals(a)) {
+                LOG.log(Level.INFO, "sleeping " + delay + " msec ...");
+                Thread.sleep(delay);    
+            }
+        } catch (InterruptedException e) {
+            LOG.log(Level.INFO, "interrupted");
+        }
+        LOG.log(Level.INFO, "continuing");
+    }
+
+    private String getAction(Message message) {
+        final AddressingProperties ap = ContextUtils.retrieveMAPs(message, false, false);
+        if (ap != null && ap.getAction() != null) {
+            return ap.getAction().getValue();
+        } 
+        return (String)message.get(SoapBindingConstants.SOAP_ACTION);
+    }
+
+}

Propchange: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/SlowProcessingSimulator.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message