cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsosno...@apache.org
Subject svn commit: r1522516 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/
Date Thu, 12 Sep 2013 10:35:50 GMT
Author: dsosnoski
Date: Thu Sep 12 10:35:50 2013
New Revision: 1522516

URL: http://svn.apache.org/r1522516
Log:
CXF-2118 Add support and test for source sequence termination via JMX
call
CXF-5274 Add support and test for source sequence message acknowledgment
notifications via JMX

Added:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AcknowledgementNotification.java
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
    cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java

Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AcknowledgementNotification.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AcknowledgementNotification.java?rev=1522516&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AcknowledgementNotification.java
(added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AcknowledgementNotification.java
Thu Sep 12 10:35:50 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import javax.management.Notification;
+
+/**
+ * Notification of a message acknowledgment for a source sequence.
+ */
+public class AcknowledgementNotification extends Notification {
+    private static final long serialVersionUID = 7809325584426123035L;
+    private final String sequenceId;
+    private final long messageNumber;
+    
+    public AcknowledgementNotification(Object source, long seq, String sid, long msgnum)
{
+        super(ManagedRMEndpoint.ACKNOWLEDGEMENT_NOTIFICATION, source, seq);
+        sequenceId = sid;
+        messageNumber = msgnum;
+    }
+
+    public String getSequenceId() {
+        return sequenceId;
+    }
+
+    public long getMessageNumber() {
+        return messageNumber;
+    }
+}
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1522516&r1=1522515&r2=1522516&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Thu Sep 12
10:35:50 2013
@@ -24,10 +24,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
-import javax.management.AttributeChangeNotification;
 import javax.management.JMException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -38,6 +35,8 @@ import javax.management.openmbean.Simple
 
 import org.apache.cxf.management.ManagedComponent;
 import org.apache.cxf.management.annotation.ManagedAttribute;
+import org.apache.cxf.management.annotation.ManagedNotification;
+import org.apache.cxf.management.annotation.ManagedNotifications;
 import org.apache.cxf.management.annotation.ManagedOperation;
 import org.apache.cxf.management.annotation.ManagedOperationParameter;
 import org.apache.cxf.management.annotation.ManagedOperationParameters;
@@ -52,8 +51,12 @@ import org.apache.cxf.ws.rm.v200702.Sequ
  *
  */
 @ManagedResource(componentName = "RMEndpoint", 
-                 description = "Responsible for Sources and Destinations.")
-public class ManagedRMEndpoint extends NotificationBroadcasterSupport implements ManagedComponent
{
+    description = "Responsible for Sources and Destinations.")
+@ManagedNotifications({@ManagedNotification(name = "org.apache.ws.rm.acknowledgement",
+    notificationTypes = {"org.apache.cxf.ws.rm.AcknowledgementNotification" }) })
+public class ManagedRMEndpoint implements ManagedComponent {
+
+    public static final String ACKNOWLEDGEMENT_NOTIFICATION = "org.apache.ws.rm.acknowledgement";
 
     private static final String[] SOURCE_SEQUENCE_NAMES = 
     {"sequenceId", "currentMessageNumber", "expires", "lastMessage", "queuedMessageCount",

@@ -88,7 +91,6 @@ public class ManagedRMEndpoint extends N
     private static CompositeType retryStatusType;
 
     private RMEndpoint endpoint;
-    
 
     static {
         try {
@@ -510,9 +512,27 @@ public class ManagedRMEndpoint extends N
         }
     }
 
+    @ManagedOperation(description = "Terminate Source Sequence")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")

+    })
+    public void terminateSourceSequence(String sid) throws JMException {
+        SourceSequence ss = getSourceSeq(sid);
+        if (null == ss) {
+            throw new JMException("no source sequence");
+        }
+        Proxy proxy = endpoint.getProxy();
+        try {
+            proxy.terminate(ss);
+            ss.getSource().removeSequence(ss);
+        } catch (RMException e) {
+            throw new JMException("Error terminating sequence: " + e.getMessage());
+        }
+    }
+
     @ManagedOperation(description = "Terminate Destination Sequence")
     @ManagedOperationParameters({
-        @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")

+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")

     })
     public void terminateDestinationSequence(String sid) throws JMException {
         DestinationSequence ds = getDestinationSeq(sid);
@@ -684,15 +704,4 @@ public class ManagedRMEndpoint extends N
     public int getCompletedDestinationSequenceCount() {
         return endpoint.getCompletedDestinationSequenceCount();
     }
-    
-    @Override
-    public MBeanNotificationInfo[] getNotificationInfo() {
-        String[] types = new String[] {
-            AttributeChangeNotification.ATTRIBUTE_CHANGE
-        };
-        String name = AttributeChangeNotification.class.getName();
-        String description = "Message acknowledged";
-        MBeanNotificationInfo info =  new MBeanNotificationInfo(types, name, description);
-        return new MBeanNotificationInfo[] {info};
-    }
 }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=1522516&r1=1522515&r2=1522516&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Thu Sep 12 10:35:50
2013
@@ -28,6 +28,13 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.RuntimeOperationsException;
+import javax.management.modelmbean.InvalidTargetObjectTypeException;
+import javax.management.modelmbean.ModelMBeanInfo;
+import javax.management.modelmbean.RequiredModelMBean;
 import javax.wsdl.extensions.ExtensibilityElement;
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
@@ -47,6 +54,7 @@ import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.jaxb.JAXBDataBinding;
 import org.apache.cxf.management.InstrumentationManager;
+import org.apache.cxf.management.jmx.export.runtime.ModelMBeanAssembler;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.factory.ServiceConstructionException;
@@ -106,8 +114,10 @@ public class RMEndpoint {
     private AtomicInteger applicationMessageCount;
     private AtomicInteger controlMessageCount;
     private InstrumentationManager instrumentationManager;
-    private ManagedRMEndpoint managedEndpoint;
     private RMConfiguration configuration;
+    private ManagedRMEndpoint managedEndpoint;
+    private RequiredModelMBean modelMBean;
+    private AtomicInteger acknowledgementSequence;
     
     /**
      * Constructor.
@@ -127,6 +137,7 @@ public class RMEndpoint {
         endpoints = new HashMap<ProtocolVariation, Endpoint>();
         applicationMessageCount = new AtomicInteger();
         controlMessageCount = new AtomicInteger();
+        acknowledgementSequence = new AtomicInteger();
     }
 
     /**
@@ -277,6 +288,28 @@ public class RMEndpoint {
     EndpointReferenceType getReplyTo() {
         return replyTo;
     }
+    
+    /**
+     * Handle message acknowledgement for source sequence. This generates a notification
of the acknowledgement if JMX
+     * is being used.
+     * 
+     * @param ssid
+     * @param number
+     */
+    public void handleAcknowledgement(String ssid, long number) {
+        if (modelMBean != null) {
+            int seq = acknowledgementSequence.incrementAndGet();
+            try {
+                modelMBean.sendNotification(new AcknowledgementNotification(this, seq, ssid,
number));
+            } catch (RuntimeOperationsException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            } catch (MBeanException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
 
     void initialise(RMConfiguration config, Conduit c, EndpointReferenceType r,
         org.apache.cxf.transport.Destination d) {
@@ -290,10 +323,27 @@ public class RMEndpoint {
             managedEndpoint = new ManagedRMEndpoint(this);
             instrumentationManager = manager.getBus().getExtension(InstrumentationManager.class);
       
             if (instrumentationManager != null) {   
-                try {
-                    instrumentationManager.register(managedEndpoint);
-                } catch (JMException jmex) {
-                    LOG.log(Level.WARNING, "Registering ManagedRMEndpoint failed.", jmex);
+                ModelMBeanAssembler assembler = new ModelMBeanAssembler();
+                ModelMBeanInfo mbi = assembler.getModelMbeanInfo(managedEndpoint.getClass());
+                MBeanServer mbs = instrumentationManager.getMBeanServer();
+                if (mbs == null) {
+                    LOG.log(Level.WARNING, "MBeanServer not available.");
+                } else {
+                    try {
+                        RequiredModelMBean rtMBean = 
+                            (RequiredModelMBean)mbs.instantiate("javax.management.modelmbean.RequiredModelMBean");
+                        rtMBean.setModelMBeanInfo(mbi);
+                        try {
+                            rtMBean.setManagedResource(managedEndpoint, "ObjectReference");
+                        } catch (InvalidTargetObjectTypeException itotex) {
+                            throw new JMException(itotex.getMessage());
+                        }
+                        ObjectName name = managedEndpoint.getObjectName();
+                        instrumentationManager.register(rtMBean, name);
+                        modelMBean = rtMBean;
+                    } catch (JMException jmex) {
+                        LOG.log(Level.WARNING, "Registering ManagedRMEndpoint failed.", jmex);
+                    }
                 }
             }
         }

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1522516&r1=1522515&r2=1522516&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
(original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
Thu Sep 12 10:35:50 2013
@@ -58,6 +58,7 @@ import org.apache.cxf.ws.policy.builder.
 import org.apache.cxf.ws.rm.ProtocolVariation;
 import org.apache.cxf.ws.rm.RMConfiguration;
 import org.apache.cxf.ws.rm.RMContextUtils;
+import org.apache.cxf.ws.rm.RMEndpoint;
 import org.apache.cxf.ws.rm.RMException;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
@@ -172,6 +173,10 @@ public class RetransmissionQueueImpl imp
             if (null != store) {
                 store.removeMessages(seq.getIdentifier(), purged, true);
             }
+            for (Long number : purged) {
+                RMEndpoint rmEndpoint = seq.getSource().getReliableEndpoint();
+                rmEndpoint.handleAcknowledgement(seq.getIdentifier().getValue(), number);
+            }
         }
     }
 

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?rev=1522516&r1=1522515&r2=1522516&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
(original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
Thu Sep 12 10:35:50 2013
@@ -27,9 +27,11 @@ import java.util.concurrent.Executor;
 
 import org.apache.cxf.message.Message;
 import org.apache.cxf.ws.rm.RMConfiguration;
+import org.apache.cxf.ws.rm.RMEndpoint;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMMessageConstants;
 import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.Source;
 import org.apache.cxf.ws.rm.SourceSequence;
 import org.apache.cxf.ws.rm.manager.RetryPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
@@ -421,10 +423,16 @@ public class RetransmissionQueueImplTest
         SourceSequence sequence = createMock(SourceSequence.class);
         Identifier id = createMock(Identifier.class);
         sequence.getIdentifier();
-        EasyMock.expectLastCall().andReturn(id);
+        EasyMock.expectLastCall().andReturn(id).anyTimes();
         id.getValue();
-        EasyMock.expectLastCall().andReturn(sid);
+        EasyMock.expectLastCall().andReturn(sid).anyTimes();
         identifiers.add(id);
+        Source source = createMock(Source.class);
+        sequence.getSource();
+        EasyMock.expectLastCall().andReturn(source).anyTimes();
+        RMEndpoint rme = createMock(RMEndpoint.class);
+        source.getReliableEndpoint();
+        EasyMock.expectLastCall().andReturn(rme).anyTimes();
         boolean includesAcked = false;
         for (int i = 0; isAcked != null && i < isAcked.length; i++) {
             sequence.isAcknowledged(messageNumbers[i]);
@@ -434,16 +442,6 @@ public class RetransmissionQueueImplTest
             }
         }
         if (includesAcked) {
-            // Will be called once or twice depending on whether any more
-            // unacknowledged messages are left for this sequence
-            sequence.getIdentifier();
-            EasyMock.expectLastCall().andReturn(id).times(1, 2);
-
-            // Would be called only when there are no more
-            // unacknowledged messages left for this sequence
-            id.getValue();
-            EasyMock.expectLastCall().andReturn(sid).times(0, 1);
-
             RMStore store = createMock(RMStore.class);
             manager.getStore();
             EasyMock.expectLastCall().andReturn(store);

Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1522516&r1=1522515&r2=1522516&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
(original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
Thu Sep 12 10:35:50 2013
@@ -24,8 +24,13 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.logging.Logger;
 
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
 import javax.management.MBeanServer;
+import javax.management.Notification;
+import javax.management.NotificationListener;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 import javax.management.openmbean.CompositeData;
 import javax.xml.ws.Endpoint;
 
@@ -39,6 +44,7 @@ import org.apache.cxf.greeter_control.Gr
 import org.apache.cxf.management.InstrumentationManager;
 import org.apache.cxf.management.ManagementConstants;
 import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
+import org.apache.cxf.ws.rm.AcknowledgementNotification;
 import org.apache.cxf.ws.rm.RM11Constants;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMUtils;
@@ -145,10 +151,68 @@ public class ManagedEndpointsTest extend
         // we need to find out serverEndpointName by using the query name
         ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
         
+        String sseqId = getSingleSourceSequenceId(mbs, clientEndpointName);
+        
+        o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null, null);
+        assertTrue("Expected sequence identifier", o instanceof String && sseqId.equals(o));
+        
+        o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null, null);
+        verifyArray("Expected sequence identifier", o, new String[]{sseqId}, false); 
+        
+        String dseqId = getSingleDestinationSequenceId(mbs, clientEndpointName);
+
+        testOperation(mbs, greeter, clientEndpointName, serverEndpointName, sseqId, dseqId);
+        
+        mbs.invoke(clientEndpointName, "terminateSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
         o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
-        String sseqId = ((String[])o)[0];
+            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("Source sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+        
+        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId},
ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
+            new Object[]{}, EMPTY_SIGNATURE);
+        assertTrue("Destination sequence terminated", o instanceof String[] && 0
== ((String[])o).length);
+        
+    }
+
+    @Test
+    public void testManagedEndpointsOneway12() throws Exception {
+        prepareClient();
+        
+        RMManager clientManager = clientBus.getExtension(RMManager.class);
+        RMManager serverManager = serverBus.getExtension(RMManager.class);
+        
+        InstrumentationManager serverIM = serverBus.getExtension(InstrumentationManager.class);
+        MBeanServer mbs = serverIM.getMBeanServer();
+        assertNotNull("MBeanServer must be available.", mbs);
+    
+        ObjectName clientManagerName = RMUtils.getManagedObjectName(clientManager);
+        ObjectName serverManagerName = RMUtils.getManagedObjectName(serverManager);
+    
+        Object o;
+        GreeterService gs = new GreeterService();
+        final Greeter greeter = gs.getGreeterPort();
+        updateAddressPort(greeter, ManagedEndpointsTest.PORT);
+        LOG.fine("Created greeter client.");
+    
+        ClientProxy.getClient(greeter).getRequestContext().put(RMManager.WSRM_VERSION_PROPERTY,
+            RM11Constants.NAMESPACE_URI);
+    
+        org.apache.cxf.endpoint.Endpoint ep = ClientProxy.getClient(greeter).getEndpoint();
+        String epId = RMUtils.getEndpointIdentifier(ep, clientBus);
+        greeter.greetMeOneWay("one"); // sent
+    
+        o = mbs.invoke(clientManagerName, "getEndpointIdentifiers", null, null);
+        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+    
+        o = mbs.invoke(serverManagerName, "getEndpointIdentifiers", null, null);
+        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
+        
+        ObjectName clientEndpointName = RMUtils.getManagedObjectName(clientManager, ep);
+        // we need to find out serverEndpointName by using the query name
+        ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
+        
+        String sseqId = getSingleSourceSequenceId(mbs, clientEndpointName);
         
         o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null, null);
         assertTrue("Expected sequence identifier", o instanceof String && sseqId.equals(o));
@@ -156,10 +220,33 @@ public class ManagedEndpointsTest extend
         o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null, null);
         verifyArray("Expected sequence identifier", o, new String[]{sseqId}, false); 
         
-        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null, null);
-        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
-        String dseqId = ((String[])o)[0];
+        String dseqId = getSingleDestinationSequenceId(mbs, clientEndpointName);
+    
+        testOperation(mbs, greeter, clientEndpointName, serverEndpointName, sseqId, dseqId);
+        
+        mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
+            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        verifyArray("Expected sequence identifier", o, new String[]{sseqId}, true); 
+        
+        mbs.invoke(clientEndpointName, "terminateSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
+            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("Source sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
+       
+        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId},
ONESTRING_SIGNATURE);
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
+            new Object[]{}, EMPTY_SIGNATURE);
+        assertTrue("Destination sequence terminated", o instanceof String[] && 0
== ((String[])o).length);
+        
+    }
 
+    private void testOperation(MBeanServer mbs, final Greeter greeter, ObjectName clientEndpointName,
+        ObjectName serverEndpointName, String sseqId, String dseqId) throws ReflectionException,
+        InstanceNotFoundException, MBeanException, InterruptedException {
+        AcknowledgementListener listener = new AcknowledgementListener();
+        mbs.addNotificationListener(clientEndpointName, listener, null, null);
+        Object o;
         o = mbs.invoke(serverEndpointName, "getSourceSequenceIds", 
                        new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false); 
@@ -198,6 +285,7 @@ public class ManagedEndpointsTest extend
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
                        new Object[]{sseqId}, ONESTRING_SIGNATURE);
         verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
+        assertEquals(3L, listener.lastAcknowledgement);
         
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
                        new Object[]{sseqId}, ONESTRING_SIGNATURE);
@@ -218,6 +306,7 @@ public class ManagedEndpointsTest extend
         o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
                        new Object[]{true}, ONEBOOLEAN_SIGNATURE);
         assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
+        assertEquals(2L, listener.lastAcknowledgement);
 
         o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
                        new Object[]{sseqId}, ONESTRING_SIGNATURE);
@@ -230,17 +319,23 @@ public class ManagedEndpointsTest extend
         o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
                        new Object[]{sseqId}, ONESTRING_SIGNATURE);
         assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
-        
-        mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
+    }
+
+    private String getSingleDestinationSequenceId(MBeanServer mbs, ObjectName clientEndpointName)
+        throws ReflectionException, InstanceNotFoundException, MBeanException {
+        Object o;
+        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null, null);
+        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+        return ((String[])o)[0];
+    }
+
+    private String getSingleSourceSequenceId(MBeanServer mbs, ObjectName clientEndpointName)
+        throws ReflectionException, InstanceNotFoundException, MBeanException {
+        Object o;
         o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
-            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("Source sequence terminated", o instanceof String[] && 0 == ((String[])o).length);
-        
-        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId},
ONESTRING_SIGNATURE);
-        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
-            new Object[]{}, EMPTY_SIGNATURE);
-        assertTrue("Destination sequence terminated", o instanceof String[] && 0
== ((String[])o).length);
-        
+                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
+        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
+        return ((String[])o)[0];
     }
     
     @Test
@@ -305,140 +400,6 @@ public class ManagedEndpointsTest extend
         assertTrue("No queued messages", o instanceof Integer && 0 == ((Integer)o).intValue());
     }
     
-    @Test
-    public void testManagedEndpointsOneway12() throws Exception {
-        prepareClient();
-        
-        RMManager clientManager = clientBus.getExtension(RMManager.class);
-        RMManager serverManager = serverBus.getExtension(RMManager.class);
-        
-        InstrumentationManager serverIM = serverBus.getExtension(InstrumentationManager.class);
-        MBeanServer mbs = serverIM.getMBeanServer();
-        assertNotNull("MBeanServer must be available.", mbs);
-
-        ObjectName clientManagerName = RMUtils.getManagedObjectName(clientManager);
-        ObjectName serverManagerName = RMUtils.getManagedObjectName(serverManager);
-
-        Object o;
-        GreeterService gs = new GreeterService();
-        final Greeter greeter = gs.getGreeterPort();
-        updateAddressPort(greeter, ManagedEndpointsTest.PORT);
-        LOG.fine("Created greeter client.");
-
-        ClientProxy.getClient(greeter).getRequestContext().put(RMManager.WSRM_VERSION_PROPERTY,
-            RM11Constants.NAMESPACE_URI);
-
-        org.apache.cxf.endpoint.Endpoint ep = ClientProxy.getClient(greeter).getEndpoint();
-        String epId = RMUtils.getEndpointIdentifier(ep, clientBus);
-        greeter.greetMeOneWay("one"); // sent
-
-        o = mbs.invoke(clientManagerName, "getEndpointIdentifiers", null, null);
-        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
-
-        o = mbs.invoke(serverManagerName, "getEndpointIdentifiers", null, null);
-        verifyArray("Expected endpoint identifier", o, new String[]{epId}, true);
-        
-        ObjectName clientEndpointName = RMUtils.getManagedObjectName(clientManager, ep);
-        // we need to find out serverEndpointName by using the query name
-        ObjectName serverEndpointName = getEndpointName(mbs, serverManager);
-        
-        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
-        String sseqId = ((String[])o)[0];
-        
-        o = mbs.invoke(clientEndpointName, "getCurrentSourceSequenceId", null, null);
-        assertTrue("Expected sequence identifier", o instanceof String && sseqId.equals(o));
-        
-        o = mbs.invoke(serverEndpointName, "getDestinationSequenceIds", null, null);
-        verifyArray("Expected sequence identifier", o, new String[]{sseqId}, false); 
-        
-        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", null, null);
-        assertTrue("One sequence expected", o instanceof String[] && 1 == ((String[])o).length);
-        String dseqId = ((String[])o)[0];
-
-        o = mbs.invoke(serverEndpointName, "getSourceSequenceIds", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        verifyArray("Expected sequence identifier", o, new String[]{dseqId}, false); 
-        
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
-
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
-                       new Object[]{sseqId, true}, new String[]{"java.lang.String", "boolean"});
-        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
-
-        o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null, null);
-        verifySourceSequence(o, sseqId, 1, 0);
-
-        o = mbs.invoke(clientEndpointName, "getSourceSequences", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("One sequence message", o instanceof CompositeData[] && 1 == ((CompositeData[])o).length);
-        verifySourceSequence(((CompositeData[])o)[0], sseqId, 1, 0);
-
-        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        verifyArray("Expected range", o, new Long[]{1L, 1L}, true);
-        
-        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
-        
-        greeter.greetMeOneWay("two"); // getting lost
-        greeter.greetMeOneWay("three"); // sent
-        
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
-
-        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
-        
-        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        assertTrue("One unacknowledged message", o instanceof Long[] && 1 == ((Long[])o).length);
-                
-        o = mbs.invoke(clientEndpointName, "getRetransmissionStatus", 
-                       new Object[]{sseqId, 2}, new String[]{"java.lang.String", "long"});
-        verifyRetransmissionStatus(o, 2L, 0);
-
-        o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        verifyArray("Expected range", o, new Long[]{1L, 1L, 3L, 3L}, true);
-
-        // 7 sec retry interval + 5 sec
-        LOG.info("waiting for 12 secs for the retry to complete ...");
-        Thread.sleep(12000);
-
-        o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", 
-                       new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
-
-        o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
-        
-        o = mbs.invoke(serverEndpointName, "getDestinationSequenceAcknowledgedRange", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        verifyArray("Expected range", o, new Long[]{1L, 3L}, true);
-
-        o = mbs.invoke(clientEndpointName, "getUnAcknowledgedMessageIdentifiers", 
-                       new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        assertTrue("No unacknowledged message", o instanceof Long[] && 0 == ((Long[])o).length);
-        
-        mbs.invoke(clientEndpointName, "closeSourceSequence", new Object[]{sseqId}, ONESTRING_SIGNATURE);
-        o = mbs.invoke(clientEndpointName, "getSourceSequenceIds", 
-            new Object[]{true}, ONEBOOLEAN_SIGNATURE);
-        
-        mbs.invoke(clientEndpointName, "terminateDestinationSequence", new Object[]{dseqId},
ONESTRING_SIGNATURE);
-        o = mbs.invoke(clientEndpointName, "getDestinationSequenceIds", 
-            new Object[]{}, EMPTY_SIGNATURE);
-        assertTrue("Destination sequence terminated", o instanceof String[] && 0
== ((String[])o).length);
-        
-    }
-    
     private void prepareClient() {
         checkServerReady(30000);
         
@@ -531,5 +492,18 @@ public class ManagedEndpointsTest extend
         Set<?> s = mbs.queryNames(serviceEndpointQueryName, null);
         Iterator<?> it = s.iterator();
         return (ObjectName)it.next();
-    }    
+    }  
+    
+    private class AcknowledgementListener implements NotificationListener {
+        private volatile long lastAcknowledgement;
+        
+        @Override
+        public void handleNotification(Notification notification, Object handback) {
+            if (notification instanceof AcknowledgementNotification) {
+                AcknowledgementNotification ack = (AcknowledgementNotification)notification;
+                lastAcknowledgement = ack.getMessageNumber();
+            }
+        }
+        
+    }
 }



Mime
View raw message