cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject [1/2] git commit: Add MessageCallback interface for WS-RM message accept and acknowledge notifications, MessageCountingCallback simple implementation, test code.
Date Sun, 23 Feb 2014 11:58:54 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 03a0a7e6f -> 1bc073112


Add MessageCallback interface for WS-RM message accept and acknowledge
notifications, MessageCountingCallback simple implementation, test code.

Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c03492dc
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c03492dc
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c03492dc

Branch: refs/heads/master
Commit: c03492dca099f5b5827ee4a27b01b5a1c9fbf00d
Parents: 09f8068
Author: dsosnoski <dsosnoski@apache.org>
Authored: Mon Feb 24 00:34:01 2014 +1300
Committer: dsosnoski <dsosnoski@apache.org>
Committed: Mon Feb 24 00:34:01 2014 +1300

----------------------------------------------------------------------
 .../org/apache/cxf/ws/rm/MessageCallback.java   |  28 ++
 .../cxf/ws/rm/MessageCountingCallback.java      |  82 ++++
 .../java/org/apache/cxf/ws/rm/RMEndpoint.java   |  30 +-
 .../apache/cxf/ws/rm/RMMessageConstants.java    |   3 +
 .../apache/cxf/ws/rm/RetransmissionQueue.java   |   9 +-
 .../cxf/ws/rm/soap/RetransmissionQueueImpl.java |  20 +-
 .../cxf/ws/rm/MessageCountingCallbackTest.java  |  52 +++
 .../ws/rm/soap/RetransmissionQueueImplTest.java |  53 ++-
 .../ws/rm/MessageCallbackOnewayTest.java        | 426 +++++++++++++++++++
 9 files changed, 667 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCallback.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCallback.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCallback.java
new file mode 100644
index 0000000..c8a0cd8
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+/**
+ * Called by RM code when messages accepted for sending and when acknowledged. 
+ */
+public interface MessageCallback {
+    void messageAccepted(String seqId, long msgNum);
+    void messageAcknowledged(String seqId, long msgNum);
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCountingCallback.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCountingCallback.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCountingCallback.java
new file mode 100644
index 0000000..acc8fce
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/MessageCountingCallback.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+/**
+ * Implementation just counts the number of messages accepted for sending and the number
acknowledged, allows checking /
+ * waiting for completion.
+ */
+public class MessageCountingCallback implements MessageCallback {
+    
+    /** Internal lock (rather than using this, so we can prevent any other access). */
+    private Object lock = new Object();
+    private volatile int countOutstanding;
+    
+    @Override
+    public void messageAccepted(String seqId, long msgNum) {
+        synchronized (lock) {
+            countOutstanding++;
+        }
+    }
+
+    @Override
+    public void messageAcknowledged(String seqId, long msgNum) {
+        synchronized (lock) {
+            countOutstanding--;
+            if (countOutstanding == 0) {
+                lock.notifyAll();
+            }
+        }
+    }
+    
+    /**
+     * Get the number of messages accepted for sending which have not yet been acknowledged.
+     * 
+     * @return count
+     */
+    public int getCountOutstanding() {
+        return countOutstanding;
+    }
+    
+    /**
+     * Wait for all accepted messages to be acknowledged.
+     * 
+     * @param timeout maximum time to wait, in milliseconds (no timeout if 0)
+     * @return <code>true</code> if all accepted messages acknowledged, <code>false</code>
if timed out
+     */
+    public boolean waitComplete(long timeout) {
+        long start = System.currentTimeMillis();
+        synchronized (lock) {
+            while (countOutstanding > 0) {
+                long remain = 0;
+                if (timeout != 0) {
+                    remain = start + timeout - System.currentTimeMillis();
+                    if (remain <= 0) {
+                        return false; 
+                    }
+                }
+                try {
+                    lock.wait(remain);
+                } catch (InterruptedException e) { /* ignored */ }
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
index 3d857f7..2ef43c7 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
@@ -289,25 +289,41 @@ public class RMEndpoint {
     }
     
     /**
-     * Handle message acknowledgement for source sequence. This generates a notification
of the acknowledgement if JMX
-     * is being used.
+     * Handle message accepted for source sequence. This generates a callback if a receiver
is set on the message.
+     * @param ssid
+     * @param number
+     * @param msg
+     */
+    public void handleAccept(String ssid, long number, Message msg) {
+        Object value = msg.get(RMMessageConstants.RM_CLIENT_CALLBACK);
+        if (value instanceof MessageCallback) {
+            ((MessageCallback)value).messageAccepted(ssid, number);
+        }
+    }
+    
+    /**
+     * Handle message acknowledgment for source sequence. This generates a notification of
the acknowledgment if JMX
+     * is being used, and also generates a callback if a receiver is set on the message.
      * 
      * @param ssid
      * @param number
+     * @param msg
      */
-    public void handleAcknowledgement(String ssid, long number) {
+    public void handleAcknowledgment(String ssid, long number, Message msg) {
         if (modelMBean != null) {
             int seq = acknowledgementSequence.incrementAndGet();
             try {
                 modelMBean.sendNotification(new AcknowledgementNotification(this, seq, ssid,
number));
             } catch (RuntimeOperationsException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+                LOG.log(Level.WARNING, "Error handling JMX notification", e);
             } catch (MBeanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+                LOG.log(Level.WARNING, "Error handling JMX notification", e);
             }
         }
+        Object value = msg.get(RMMessageConstants.RM_CLIENT_CALLBACK);
+        if (value instanceof MessageCallback) {
+            ((MessageCallback)value).messageAcknowledged(ssid, number);
+        }
     }
 
     void initialise(RMConfiguration config, Conduit c, EndpointReferenceType r,

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
index d24a6e9..8cf5af9 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMMessageConstants.java
@@ -45,6 +45,9 @@ public final class RMMessageConstants {
     /** Boolean property TRUE for a chain used only to capture (not send) a message. */
     public static final String MESSAGE_CAPTURE_CHAIN = "org.apache.cxf.rm.captureOnly";
     
+    /** Client callback (must be instance of {@link MessageCallback}). */
+    public static final String RM_CLIENT_CALLBACK = "org.apache.cxf.rm.clientCallback";
+    
     static final String RM_PROTOCOL_VARIATION = "org.apache.cxf.ws.rm.protocol";
 
     // keep this constant in the ws-rm package until it finds a general use outside of ws-rm

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
index 84da83c..6f0630e 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
@@ -47,20 +47,23 @@ public interface RetransmissionQueue {
     boolean isEmpty();
     
     /**
-     * Accepts a new message for possible future retransmission. 
+     * Accepts a new message for possible future retransmission. Implementations must call
the
+     * RMEndpoint.handleAccepted() method for each accepted message.
+     * 
      * @param message the message context.
      */
     void addUnacknowledged(Message message);
     
     /**
-     * Purge all candidates for the given sequence that have been acknowledged.
+     * Purge all candidates for the given sequence that have been acknowledged. Implementations
must call the
+     * RMEndpoint.handleAcknowledgment() method for each acknowledged message.
      * 
      * @param seq the sequence object.
      */
     void purgeAcknowledged(SourceSequence seq);
     
     /**
-     * Purge all candiates for the given sequence.
+     * Purge all candidates for the given sequence.
      * 
      * @param seq the sequence object
      */

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
index 75269e9..48281c4 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
@@ -84,6 +84,7 @@ import org.apache.cxf.ws.rm.RMCaptureOutInterceptor;
 import org.apache.cxf.ws.rm.RMConfiguration;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMEndpoint;
+import org.apache.cxf.ws.rm.RMException;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
@@ -172,6 +173,8 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
     
     private void purgeCandidates(SourceSequence seq, boolean any) {
         Collection<Long> purged = new ArrayList<Long>();
+        Collection<ResendCandidate> resends = new ArrayList<ResendCandidate>();
+        Identifier sid = seq.getIdentifier();
         synchronized (this) {
             LOG.fine("Start purging resend candidates.");
             List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
@@ -184,10 +187,11 @@ public class RetransmissionQueueImpl implements RetransmissionQueue
{
                         candidate.resolved();
                         unacknowledgedCount--;
                         purged.add(m);
+                        resends.add(candidate);
                     }
                 }
                 if (sequenceCandidates.isEmpty()) {
-                    candidates.remove(seq.getIdentifier().getValue());
+                    candidates.remove(sid.getValue());
                 }
             }
             LOG.fine("Completed purging resend candidates.");
@@ -195,11 +199,11 @@ public class RetransmissionQueueImpl implements RetransmissionQueue
{
         if (purged.size() > 0) {
             RMStore store = manager.getStore();
             if (null != store) {
-                store.removeMessages(seq.getIdentifier(), purged, true);
+                store.removeMessages(sid, purged, true);
             }
-            for (Long number : purged) {
-                RMEndpoint rmEndpoint = seq.getSource().getReliableEndpoint();
-                rmEndpoint.handleAcknowledgement(seq.getIdentifier().getValue(), number);
+            RMEndpoint rmEndpoint = seq.getSource().getReliableEndpoint();
+            for (ResendCandidate resend: resends) {
+                rmEndpoint.handleAcknowledgment(sid.getValue(), resend.getNumber(), resend.getMessage());
             }
         }
     }
@@ -348,6 +352,12 @@ public class RetransmissionQueueImpl implements RetransmissionQueue {
             unacknowledgedCount++;
         }
         LOG.fine("Cached unacknowledged message.");
+        try {
+            RMEndpoint rme = manager.getReliableEndpoint(message);
+            rme.handleAccept(key, st.getMessageNumber(), message);
+        } catch (RMException e) {
+            LOG.log(Level.WARNING, "Could not find reliable endpoint for message");
+        }
         return candidate;
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/MessageCountingCallbackTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/MessageCountingCallbackTest.java
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/MessageCountingCallbackTest.java
new file mode 100644
index 0000000..1f3e122
--- /dev/null
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/MessageCountingCallbackTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MessageCountingCallbackTest {
+
+    @Test
+    public void test() {
+        final MessageCountingCallback ccb = new MessageCountingCallback();
+        ccb.messageAccepted("123", 1);
+        assertFalse(ccb.waitComplete(1));
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(ccb.waitComplete(1000));
+            }
+        });
+        try {
+            Thread.sleep(20);
+        } catch (InterruptedException e) { /* ignore */ }
+        ccb.messageAcknowledged("123", 1);
+        try {
+            thread.join(100);
+        } catch (InterruptedException e) {
+            fail("Thread did not complete");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
index ab1b12f..1c5b0e5 100644
--- a/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
+++ b/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
@@ -29,6 +29,7 @@ import org.apache.cxf.binding.soap.SoapMessage;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.rm.RMConfiguration;
 import org.apache.cxf.ws.rm.RMEndpoint;
+import org.apache.cxf.ws.rm.RMException;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
@@ -51,11 +52,13 @@ import org.junit.Test;
  * Test resend logic.
  */
 public class RetransmissionQueueImplTest extends Assert {
-    private static final Long ONE = new Long(1);
-    private static final Long TEN = new Long(10);
+    private static final Long ONE = Long.valueOf(1);
+    private static final Long TWO = Long.valueOf(2);
+    private static final Long TEN = Long.valueOf(10);
 
     private IMocksControl control;
     private RMManager manager;
+    private RMEndpoint endpoint;
     private Executor executor;
     private RetransmissionQueueImpl queue;
     private TestResender resender;
@@ -66,9 +69,11 @@ public class RetransmissionQueueImplTest extends Assert {
     private List<Object> mocks = new ArrayList<Object>();
     
     @Before
-    public void setUp() {
+    public void setUp() throws RMException {
         control = EasyMock.createNiceControl();
         manager = createMock(RMManager.class);
+        endpoint = createMock(RMEndpoint.class);
+        EasyMock.expect(manager.getReliableEndpoint(EasyMock.anyObject(Message.class))).andReturn(endpoint).anyTimes();
         queue = new RetransmissionQueueImpl(manager);
         resender = new TestResender();
         queue.replaceResender(resender);
@@ -160,14 +165,21 @@ public class RetransmissionQueueImplTest extends Assert {
     
     @Test
     public void testCacheUnacknowledged() {
-        SoapMessage message1 = setUpMessage("sequence1");
-        SoapMessage message2 = setUpMessage("sequence2");
-        SoapMessage message3 = setUpMessage("sequence1");
+        SoapMessage message1 = setUpMessage("sequence1", ONE);
+        SoapMessage message2 = setUpMessage("sequence2", ONE);
+        SoapMessage message3 = setUpMessage("sequence1", TWO);
         
         setupMessagePolicies(message1);
         setupMessagePolicies(message2);
         setupMessagePolicies(message3);
         
+        endpoint.handleAccept("sequence1", 1, message1);
+        EasyMock.expectLastCall();
+        endpoint.handleAccept("sequence2", 1, message2);
+        EasyMock.expectLastCall();
+        endpoint.handleAccept("sequence1", 2, message3);
+        EasyMock.expectLastCall();
+        
         ready(false);
         
         assertNotNull("expected resend candidate",
@@ -220,6 +232,9 @@ public class RetransmissionQueueImplTest extends Assert {
         setupMessagePolicies(message1);        
         SoapMessage message2 = setUpMessage("sequence1", messageNumbers[1]);
         setupMessagePolicies(message2);
+        
+        endpoint.handleAcknowledgment("sequence1", TEN, message1);
+        EasyMock.expectLastCall();
         ready(false);
         
         sequenceList.add(queue.createResendCandidate(message1));
@@ -274,6 +289,11 @@ public class RetransmissionQueueImplTest extends Assert {
         setupMessagePolicies(message1);
         SoapMessage message2 = setUpMessage("sequence1", messageNumbers[1]);
         setupMessagePolicies(message2);
+        
+        endpoint.handleAcknowledgment("sequence1", TEN, message1);
+        EasyMock.expectLastCall();
+        endpoint.handleAcknowledgment("sequence1", ONE, message2);
+        EasyMock.expectLastCall();
         ready(false);
 
         sequenceList.add(queue.createResendCandidate(message1));
@@ -336,10 +356,6 @@ public class RetransmissionQueueImplTest extends Assert {
         control.replay();
         queue.start();
     }
-    
-    private SoapMessage setUpMessage(String sid) {
-        return setUpMessage(sid, null);
-    }
 
     private SoapMessage setUpMessage(String sid, Long messageNumber) {
         return setUpMessage(sid, messageNumber, true);
@@ -391,16 +407,12 @@ public class RetransmissionQueueImplTest extends Assert {
             EasyMock.expectLastCall().andReturn(sequence);
         }
         if (messageNumber != null) {
-            sequence.getMessageNumber();
-            EasyMock.expectLastCall().andReturn(messageNumber);
-        } else {
-            Identifier id = createMock(Identifier.class);
-            sequence.getIdentifier();
-            EasyMock.expectLastCall().andReturn(id);
-            id.getValue();
-            EasyMock.expectLastCall().andReturn(sid);
-            identifiers.add(id);
+            EasyMock.expect(sequence.getMessageNumber()).andReturn(messageNumber).anyTimes();
         }
+        Identifier id = createMock(Identifier.class);
+        EasyMock.expect(sequence.getIdentifier()).andReturn(id).anyTimes();
+        EasyMock.expect(id.getValue()).andReturn(sid).anyTimes();
+        identifiers.add(id);
         sequences.add(sequence);
         return sequence;
     }
@@ -416,9 +428,8 @@ public class RetransmissionQueueImplTest extends Assert {
         Source source = createMock(Source.class);
         sequence.getSource();
         EasyMock.expectLastCall().andReturn(source).anyTimes();
-        RMEndpoint rme = createMock(RMEndpoint.class);
         source.getReliableEndpoint();
-        EasyMock.expectLastCall().andReturn(rme).anyTimes();
+        EasyMock.expectLastCall().andReturn(endpoint).anyTimes();
         boolean includesAcked = false;
         for (int i = 0; isAcked != null && i < isAcked.length; i++) {
             sequence.isAcknowledged(messageNumbers[i]);

http://git-wip-us.apache.org/repos/asf/cxf/blob/c03492dc/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageCallbackOnewayTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageCallbackOnewayTest.java
b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageCallbackOnewayTest.java
new file mode 100644
index 0000000..4643026
--- /dev/null
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/MessageCallbackOnewayTest.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.ws.rm;
+
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+import javax.jws.WebService;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Provider;
+import javax.xml.ws.Service.Mode;
+import javax.xml.ws.ServiceMode;
+import javax.xml.xpath.XPathConstants;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.GreeterService;
+import org.apache.cxf.helpers.XPathUtils;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.systest.ws.util.ConnectionHelper;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.ws.rm.MessageCallback;
+import org.apache.cxf.ws.rm.RMManager;
+import org.apache.cxf.ws.rm.RMMessageConstants;
+
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests the operation of MessageCallback for one-way messages to the server.
+ */
+public class MessageCallbackOnewayTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = allocatePort(MessageCallbackOnewayTest.class);
+    private static final String GREETER_ADDRESS 
+        = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+
+    private static final Logger LOG = LogUtils.getLogger(MessageCallbackOnewayTest.class);
+
+    private Bus serverBus;
+    private Endpoint endpoint;
+    private Bus greeterBus;
+    private Greeter greeter;
+    private RecordingMessageCallback callback;
+    
+    @After
+    public void tearDown() throws Exception {
+        try {
+            stopClient();
+        } catch (Throwable t) {
+            //ignore
+        }
+        try {
+            stopServer();
+        } catch (Throwable t) {
+            //ignore
+        }
+        Thread.sleep(100);
+    }
+    
+    private void verifyCallback(int index, boolean accept, long mnum) {
+        Callback cb = callback.getCallbacks().get(index);
+        assertEquals(accept, cb.isAccept());
+        assertEquals(mnum, cb.getMsgNumber());
+    }
+
+    @Test    
+    public void testAtLeastOnce() throws Exception {
+        testOnewayAtLeastOnce(null);
+    }
+    
+    @Test    
+    public void testAtLeastOnceAsyncExecutor() throws Exception {
+        testOnewayAtLeastOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayAtLeastOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/atleastonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getConfiguration().setBaseRetransmissionInterval(new Long(2000));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        callback.waitDone(6, 3000, 60000);
+        verifyCallback(0, true, 1);
+        verifyCallback(1, true, 2);
+        verifyCallback(2, true, 3);
+        verifyCallback(3, false, 3);
+        verifyCallback(4, false, 1);
+        verifyCallback(5, true, 4);
+    }
+
+    @Test    
+    public void testAtMostOnce() throws Exception {
+        testOnewayAtMostOnce(null);
+    }
+    
+    @Test    
+    public void testAtMostOnceAsyncExecutor() throws Exception {
+        testOnewayAtMostOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayAtMostOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/atmostonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getConfiguration().setBaseRetransmissionInterval(new Long(2000));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        callback.waitDone(6, 3000, 60000);
+        verifyCallback(0, true, 1);
+        verifyCallback(1, true, 2);
+        verifyCallback(2, true, 3);
+        verifyCallback(3, false, 3);
+        verifyCallback(4, false, 1);
+        verifyCallback(5, true, 4);
+    }
+
+    @Test    
+    public void testExactlyOnce() throws Exception {
+        testOnewayExactlyOnce(null);
+    }
+    
+    @Test    
+    public void testExactlyOnceAsyncExecutor() throws Exception {
+        testOnewayExactlyOnce(Executors.newSingleThreadExecutor());
+    } 
+
+    private void testOnewayExactlyOnce(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/exactlyonce.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getConfiguration().setBaseRetransmissionInterval(new Long(2000));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        callback.waitDone(6, 3000, 60000);
+        verifyCallback(0, true, 1);
+        verifyCallback(1, true, 2);
+        verifyCallback(2, true, 3);
+        verifyCallback(3, false, 3);
+        verifyCallback(4, false, 1);
+        verifyCallback(5, true, 4);
+    }
+
+    @Test    
+    public void testExactlyOnceInOrder() throws Exception {
+        testOnewayExactlyOnceInOrder(null);
+    }
+    
+    @Test    
+    public void testExactlyOnceInOrderAsyncExecutor() throws Exception {
+        testOnewayExactlyOnceInOrder(Executors.newSingleThreadExecutor());
+    }
+
+    private void testOnewayExactlyOnceInOrder(Executor executor) throws Exception {
+        init("org/apache/cxf/systest/ws/rm/exactlyonce-inorder.xml", executor);
+        
+        greeterBus.getOutInterceptors().add(new MessageLossSimulator());
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        manager.getConfiguration().setBaseRetransmissionInterval(new Long(2000));
+        String[] callArgs = new String[] {"one", "two", "three", "four"};
+        for (int i = 0; i < callArgs.length; i++) {
+            greeter.greetMeOneWay(callArgs[i]);
+        }
+        
+        callback.waitDone(8, 3000, 60000);
+        verifyCallback(0, true, 1);
+        verifyCallback(1, true, 2);
+        verifyCallback(2, true, 3);
+        verifyCallback(3, false, 2);
+        verifyCallback(4, false, 1);
+        verifyCallback(5, true, 4);
+        verifyCallback(6, false, 4);
+        verifyCallback(7, false, 3);
+    }
+
+    // --- test utilities ---
+
+    private void init(String cfgResource, Executor executor) {
+        SpringBusFactory bf = new SpringBusFactory();
+        initServer(bf, cfgResource);
+        initGreeterBus(bf, cfgResource);
+        initProxy(executor);
+    }
+    
+    private void initServer(SpringBusFactory bf, String cfgResource) {
+        String derbyHome = System.getProperty("derby.system.home"); 
+        try {
+            synchronized (GreeterProvider.CALL_ARGS) {
+                GreeterProvider.CALL_ARGS.clear();
+            }
+            System.setProperty("derby.system.home", derbyHome + "-server");   
+            serverBus = bf.createBus(cfgResource);
+            BusFactory.setDefaultBus(serverBus);
+            LOG.info("Initialised bus " + serverBus + " with cfg file resource: " + cfgResource);
+            LOG.info("serverBus inInterceptors: " + serverBus.getInInterceptors());
+            endpoint = Endpoint.publish(GREETER_ADDRESS, new GreeterProvider());
+        } finally {
+            if (derbyHome != null) {
+                System.setProperty("derby.system.home", derbyHome);
+            } else {
+                System.clearProperty("derby.system.home");
+            }
+        }
+    }
+    
+    private void initGreeterBus(SpringBusFactory bf,
+                                String cfgResource) {
+        greeterBus = bf.createBus(cfgResource);
+        BusFactory.setDefaultBus(greeterBus);
+        LOG.fine("Initialised greeter bus with configuration: " + cfgResource);
+    }
+
+    private void initProxy(Executor executor) {        
+        GreeterService gs = new GreeterService();
+
+        if (null != executor) {
+            gs.setExecutor(executor);
+        }
+   
+        greeter = gs.getGreeterPort();
+        try {
+            updateAddressPort(greeter, PORT);
+        } catch (Exception e) {
+            //ignore
+        }
+        LOG.fine("Created greeter client.");
+
+        ConnectionHelper.setKeepAliveConnection(greeter, false);
+        
+        callback = new RecordingMessageCallback();
+        ((BindingProvider)greeter).getRequestContext().put(RMMessageConstants.RM_CLIENT_CALLBACK,
callback);
+    }
+    
+    private void stopClient() {
+        if (null != greeterBus) {
+            
+            //ensure we close the decoupled destination of the conduit,
+            //so that release the port if the destination reference count hit zero
+            if (greeter != null) {
+                ClientProxy.getClient(greeter).getConduit().close();
+            }
+            greeterBus.shutdown(true);
+            greeter = null;
+            greeterBus = null;
+        }
+    }
+    
+    private void stopServer() {
+        if (null != endpoint) {
+            LOG.info("Stopping Greeter endpoint");
+            endpoint.stop();
+        } else {
+            LOG.info("No endpoint active.");
+        }
+        endpoint = null;
+        if (null != serverBus) {
+            serverBus.shutdown(true);
+            serverBus = null;
+        }
+    }
+
+    @WebService(serviceName = "GreeterService",
+                portName = "GreeterPort",
+                targetNamespace = "http://cxf.apache.org/greeter_control",
+                wsdlLocation = "/wsdl/greeter_control.wsdl")
+    @ServiceMode(Mode.PAYLOAD)
+    public static class GreeterProvider implements Provider<Source> {
+        
+        public static final List<String> CALL_ARGS = new ArrayList<String>();
+
+        public Source invoke(Source obj) {
+
+            Node el;
+            try {
+                el = StaxUtils.read(obj);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            if (el instanceof Document) {
+                el = ((Document)el).getDocumentElement();
+            }
+            
+            Map<String, String> ns = new HashMap<String, String>();
+            ns.put("ns", "http://cxf.apache.org/greeter_control/types");
+            XPathUtils xp = new XPathUtils(ns);
+            String s = (String)xp.getValue("/ns:greetMe/ns:requestType",
+                                           el,
+                                           XPathConstants.STRING);
+
+            if (s == null || "".equals(s)) {
+                s = (String)xp.getValue("/ns:greetMeOneWay/ns:requestType",
+                                        el,
+                                        XPathConstants.STRING);
+                synchronized (CALL_ARGS) {
+                    CALL_ARGS.add(s);
+                }
+                return null;
+            } else {
+                synchronized (CALL_ARGS) {
+                    CALL_ARGS.add(s);
+                }
+                String resp =
+                    "<greetMeResponse "
+                        + "xmlns=\"http://cxf.apache.org/greeter_control/types\">"
+                        + "<responseType>" + s.toUpperCase() + "</responseType>"
+                    + "</greetMeResponse>";
+                return new StreamSource(new StringReader(resp));
+            }
+        }
+    }
+    
+    private static class RecordingMessageCallback implements MessageCallback {
+        
+        private List<Callback> callbacks = new ArrayList<Callback>();
+        
+        @Override
+        public void messageAccepted(String seqId, long msgNum) {
+            synchronized (callbacks) {
+                callbacks.add(new Callback(true, msgNum));
+                callbacks.notifyAll();
+            }
+        }
+
+        @Override
+        public void messageAcknowledged(String seqId, long msgNum) {
+            synchronized (callbacks) {
+                callbacks.add(new Callback(false, msgNum));
+                callbacks.notifyAll();
+            }
+        }
+        
+        public List<Callback> getCallbacks() {
+            return callbacks;
+        }
+        
+        /**
+         * Wait for expected number of callbacks.
+         * 
+         * @param count expected number of callbacks
+         * @param delay extra time to wait after expected number received (in case more are
coming)
+         * @param timeout maximum time to wait, in milliseconds
+         */
+        public void waitDone(int count, int delay, long timeout) {
+            long start = System.currentTimeMillis();
+            synchronized (callbacks) {
+                while (callbacks.size() < count) {
+                    long remain = start + timeout - System.currentTimeMillis();
+                    if (remain <= 0) {
+                        fail("Expected " + count + " callbacks, only got " + callbacks.size());

+                    }
+                    try {
+                        callbacks.wait(remain);
+                    } catch (InterruptedException e) { /* ignored */ }
+                }
+                try {
+                    callbacks.wait((long)delay);
+                } catch (InterruptedException e) { /* ignored */ }
+                if (callbacks.size() > count) {
+                    fail("Expected " + count + " callbacks, got " + callbacks.size()); 
+                }
+            }
+        }
+    }
+    
+    private static class Callback {
+        private final boolean accept;
+        private final long msgNumber;
+        
+        public Callback(boolean acc, long msgNum) {
+            accept = acc;
+            msgNumber = msgNum;
+        }
+
+        public boolean isAccept() {
+            return accept;
+        }
+
+        public long getMsgNumber() {
+            return msgNumber;
+        }
+    }
+}


Mime
View raw message