cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andreasm...@apache.org
Subject svn commit: r541364 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/resources/META-INF/c...
Date Thu, 24 May 2007 17:33:13 GMT
Author: andreasmyth
Date: Thu May 24 10:33:11 2007
New Revision: 541364

URL: http://svn.apache.org/viewvc?view=rev&rev=541364
Log:
[JIRA CXF-139] Client-side recovery. 
Added client lifecycle management interface to be notified of client creation so that recovery can take place before client sends any requests.
Fixed minor bug in handling TerminateSequence invocation.

Added:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java   (with props)
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java   (with props)
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java   (with props)
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMUtilsTest.java   (with props)
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/ClientPersistenceTest.java   (with props)
Removed:
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/PersistenceTest.java
Modified:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java
    incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java
    incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/oneway-client-crash.xml

Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java Thu May 24 10:33:11 2007
@@ -92,4 +92,11 @@
      * @param selector the ConduitSelector to use
      */
     void setConduitSelector(ConduitSelector selector);
+    
+    /**
+     * Indicates that the client is no longer needed and that any resources it holds
+     * can now be freed.
+     *
+     */
+    void destroy();
 }

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java Thu May 24 10:33:11 2007
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+public interface ClientLifeCycleListener {
+    void clientCreated(Client client);
+    void clientDestroyed(Client client); 
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java (added)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java Thu May 24 10:33:11 2007
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+
+public interface ClientLifeCycleManager { 
+    void clientCreated(Client client);
+    void clientDestroyed(Client client);
+    void registerListener(ClientLifeCycleListener listener);
+    void unRegisterListener(ClientLifeCycleListener listener);
+}

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java (original)
+++ incubator/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/SoapHeaderInterceptor.java Thu May 24 10:33:11 2007
@@ -63,6 +63,10 @@
         }
 
         BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
+        if (null == bop) {
+            return;
+        }
+
         if (bop.isUnwrapped()) {
             bop = bop.getWrappedOperation();
         }

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Thu May 24 10:33:11 2007
@@ -85,6 +85,7 @@
         bus = b;
         outFaultObserver = new ClientOutFaultObserver(bus);
         getConduitSelector(sc).setEndpoint(e);
+        notifyLifecycleManager();
     }
 
     public ClientImpl(URL wsdlUrl) {
@@ -109,6 +110,24 @@
         } catch (EndpointException epex) {
             throw new IllegalStateException("Unable to create endpoint: " + epex.getMessage(), epex);
         }
+        notifyLifecycleManager();
+    }
+    
+    public void destroy() {
+        
+        // TODO: also inform the conduit so it can shutdown any response listeners
+        
+        ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != mgr) {
+            mgr.clientDestroyed(this);
+        }
+    }
+    
+    private void notifyLifecycleManager() {
+        ClientLifeCycleManager mgr = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != mgr) {
+            mgr.clientCreated(this);
+        }
     }
 
     private EndpointInfo findEndpoint(Service svc, QName port) {
@@ -226,7 +245,7 @@
         // setup conduit selector
         prepareConduitSelector(message);
         
-        // execute chain
+        // execute chain        
         chain.doIntercept(message);
 
         getConduitSelector().complete(exchange);

Added: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java?view=auto&rev=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java (added)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java Thu May 24 10:33:11 2007
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.endpoint;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.extension.BusExtension;
+
+public class ClientLifeCycleManagerImpl implements ClientLifeCycleManager, BusExtension {
+    
+    private List<ClientLifeCycleListener> listeners = new ArrayList<ClientLifeCycleListener>(); 
+
+    public Class<?> getRegistrationType() {
+        return ClientLifeCycleManager.class;
+    }
+
+    public void registerListener(ClientLifeCycleListener listener) {
+        listeners.add(listener);
+    }
+
+    public void clientCreated(Client client) {
+        if (null != listeners) {
+            for (ClientLifeCycleListener listener : listeners) {
+                listener.clientCreated(client);
+            }
+        }
+    }
+
+    public void clientDestroyed(Client client) {
+        if (null != listeners) {
+            for (ClientLifeCycleListener listener : listeners) {
+                listener.clientDestroyed(client);
+            }
+        } 
+    }
+
+    public void unRegisterListener(ClientLifeCycleListener listener) {
+        listeners.remove(listener);
+    }
+
+}

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ServerLifeCycleManagerImpl.java Thu May 24 10:33:11 2007
@@ -22,15 +22,15 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
+import org.apache.cxf.extension.BusExtension;
 
-import org.apache.cxf.Bus;
-
-public class ServerLifeCycleManagerImpl implements ServerLifeCycleManager {
+public class ServerLifeCycleManagerImpl implements ServerLifeCycleManager, BusExtension {
     
     private List<ServerLifeCycleListener> listeners = new ArrayList<ServerLifeCycleListener>();
-    private Bus bus;
+
+    public Class<?> getRegistrationType() {
+        return ServerLifeCycleManager.class;
+    }
 
     public synchronized void registerListener(ServerLifeCycleListener listener) {
         listeners.add(listener);
@@ -62,21 +62,5 @@
 
     public synchronized void unRegisterListener(ServerLifeCycleListener listener) {
         listeners.remove(listener);
-    }
-    
-    public Bus getBus() {
-        return bus;
-    }
-    
-    @Resource
-    public void setBus(Bus bus) {        
-        this.bus = bus;        
-    }
-    
-    @PostConstruct
-    public void register() {
-        if (null != bus) {
-            bus.setExtension(this, ServerLifeCycleManager.class);
-        }
     }
 }

Modified: incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml (original)
+++ incubator/cxf/trunk/rt/core/src/main/resources/META-INF/cxf/cxf.xml Thu May 24 10:33:11 2007
@@ -88,9 +88,9 @@
         <property name="bus" ref="cxf"/>
     </bean>
 
-    <bean id="org.apache.cxf.endpoint.ServerLifeCycleManager" class="org.apache.cxf.endpoint.ServerLifeCycleManagerImpl">
-        <property name="bus" ref="cxf"/>
-    </bean>
+    <bean id="org.apache.cxf.endpoint.ServerLifeCycleManager" class="org.apache.cxf.endpoint.ServerLifeCycleManagerImpl"/>
+    <bean id="org.apache.cxf.endpoint.ClientLifeCycleManager" class="org.apache.cxf.endpoint.ClientLifeCycleManagerImpl"/>
+        
 
     <bean id="org.apache.cxf.transports.http.QueryHandlerRegistry" class="org.apache.cxf.transport.http.QueryHandlerRegistryImpl">
         <property name="bus" ref="cxf"/>

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/AbstractEndpoint.java Thu May 24 10:33:11 2007
@@ -30,7 +30,7 @@
     }
     
     public String getName() {
-        return reliableEndpoint.getName();
+        return RMUtils.getEndpointIdentifier(getEndpoint());
     }
     
     /** 

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Proxy.java Thu May 24 10:33:11 2007
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -184,7 +185,10 @@
         
         OperationInfo oi = reliableEndpoint.getEndpoint().getEndpointInfo().getService().getInterface()
             .getOperation(RMConstants.getLastMessageOperationName());
-        invoke(oi, new Object[] {}, null);
+        // pass reference to source sequence in invocation context
+        Map<String, Object> context = Collections.singletonMap(SourceSequence.class.getName(), (Object)s);
+
+        invoke(oi, new Object[] {}, context);
     }
     
     void ackRequested(SourceSequence s) throws RMException {

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMContextUtils.java Thu May 24 10:33:11 2007
@@ -149,7 +149,7 @@
         ((AddressingPropertiesImpl)maps).exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
     }
 
-    private static String getRMPropertiesKey(boolean outbound) {
+    public static String getRMPropertiesKey(boolean outbound) {
         return outbound
             ? RMMessageConstants.RM_PROPERTIES_OUTBOUND : RMMessageConstants.RM_PROPERTIES_INBOUND;
     }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Thu May 24 10:33:11 2007
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.List;
 import java.util.logging.Level;
@@ -58,27 +57,25 @@
 import org.apache.neethi.Policy;
 
 public class RMEndpoint {
-    
+
     private static final Logger LOG = LogUtils.getL7dLogger(RMEndpoint.class);
-    
-    private static final QName SERVICE_NAME = 
-        new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractService");
-    private static final QName INTERFACE_NAME = 
-         new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractPortType");
-    private static final QName BINDING_NAME = 
-        new QName(RMConstants.getWsdlNamespace(), "SequenceAbstractSoapBinding");
-
-    private static final QName CREATE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "create");
-    private static final QName CREATE_RESPONSE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "createResponse");
-    private static final QName TERMINATE_PART_NAME =
-        new QName(RMConstants.getWsdlNamespace(), "terminate");
-        
+
+    private static final QName SERVICE_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                        "SequenceAbstractService");
+    private static final QName INTERFACE_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                          "SequenceAbstractPortType");
+    private static final QName BINDING_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                        "SequenceAbstractSoapBinding");
+
+    private static final QName CREATE_PART_NAME = new QName(RMConstants.getWsdlNamespace(), "create");
+    private static final QName CREATE_RESPONSE_PART_NAME = new QName(RMConstants.getWsdlNamespace(),
+                                                                     "createResponse");
+    private static final QName TERMINATE_PART_NAME = new QName(RMConstants.getWsdlNamespace(), "terminate");
+
     private RMManager manager;
     private Endpoint applicationEndpoint;
     private Conduit conduit;
-    private org.apache.cxf.ws.addressing.EndpointReferenceType replyTo; 
+    private org.apache.cxf.ws.addressing.EndpointReferenceType replyTo;
     private Source source;
     private Destination destination;
     private WrappedService service;
@@ -87,7 +84,7 @@
     private Servant servant;
     private long lastApplicationMessage;
     private long lastControlMessage;
-     
+
     public RMEndpoint(RMManager m, Endpoint ae) {
         manager = m;
         applicationEndpoint = ae;
@@ -96,56 +93,49 @@
         proxy = new Proxy(this);
         servant = new Servant(this);
     }
-    
-    public String getName() {
-        return MessageFormat.format("{0}.{1}", new Object[] {
-            applicationEndpoint.getEndpointInfo().getService().getName(),
-            applicationEndpoint.getEndpointInfo().getName()
-        });
-    }
-    
+
     /**
      * @return Returns the bus.
      */
     public RMManager getManager() {
         return manager;
     }
-      
+
     /**
      * @return Returns the application endpoint.
      */
     public Endpoint getApplicationEndpoint() {
         return applicationEndpoint;
     }
-    
+
     /**
      * @return Returns the RM protocol endpoint.
      */
     public Endpoint getEndpoint() {
         return endpoint;
     }
-    
+
     /**
      * @return Returns the RM protocol service.
      */
     public Service getService() {
         return service;
     }
-    
+
     /**
      * @return Returns the RM protocol binding info.
      */
     public BindingInfo getBindingInfo() {
         return service.getServiceInfo().getBinding(BINDING_NAME);
     }
-    
+
     /**
      * @return Returns the proxy.
      */
     public Proxy getProxy() {
         return proxy;
     }
-    
+
     /**
      * @return Returns the servant.
      */
@@ -153,35 +143,35 @@
         return servant;
     }
 
-    /** 
+    /**
      * @return Returns the destination.
      */
     public Destination getDestination() {
         return destination;
     }
-    
+
     /**
      * @param destination The destination to set.
      */
     public void setDestination(Destination destination) {
         this.destination = destination;
     }
-    
-    /** 
+
+    /**
      * @return Returns the source.
      */
     public Source getSource() {
         return source;
     }
-    
+
     /**
      * @param source The source to set.
      */
     public void setSource(Source source) {
         this.source = source;
-    } 
-    
-    /** 
+    }
+
+    /**
      * @return The time when last application message was received.
      */
     public long getLastApplicationMessage() {
@@ -195,7 +185,7 @@
         lastApplicationMessage = System.currentTimeMillis();
     }
 
-    /** 
+    /**
      * @return The time when last RM protocol message was received.
      */
     public long getLastControlMessage() {
@@ -208,46 +198,44 @@
     public void receivedControlMessage() {
         lastControlMessage = System.currentTimeMillis();
     }
-    
-    /** 
+
+    /**
      * @return Returns the conduit.
      */
     public Conduit getConduit() {
         return conduit;
     }
 
-    /** 
-     * Returns the replyTo address of the first application request, i.e. the target address to which to 
-     * send CreateSequence, CreateSequenceResponse and TerminateSequence messages originating from the
-     * from the server.
+    /**
+     * Returns the replyTo address of the first application request, i.e. the
+     * target address to which to send CreateSequence, CreateSequenceResponse
+     * and TerminateSequence messages originating from the from the server.
+     * 
      * @return the replyTo address
      */
     org.apache.cxf.ws.addressing.EndpointReferenceType getReplyTo() {
         return replyTo;
     }
-    
-    
-    void initialise(Conduit c, org.apache.cxf.ws.addressing.EndpointReferenceType r) {  
+
+    void initialise(Conduit c, org.apache.cxf.ws.addressing.EndpointReferenceType r) {
         conduit = c;
         replyTo = r;
         createService();
         createEndpoint();
         setPolicies();
     }
-    
+
     void createService() {
         ServiceInfo si = new ServiceInfo();
         si.setName(SERVICE_NAME);
         buildInterfaceInfo(si);
-        
+
         service = new WrappedService(applicationEndpoint.getService(), SERVICE_NAME, si);
-        
+
         DataBinding dataBinding = null;
         try {
-            dataBinding = new JAXBDataBinding(CreateSequenceType.class,
-                                              CreateSequenceResponseType.class,
-                                              TerminateSequenceType.class,
-                                              SequenceFaultType.class);
+            dataBinding = new JAXBDataBinding(CreateSequenceType.class, CreateSequenceResponseType.class,
+                                              TerminateSequenceType.class, SequenceFaultType.class);
         } catch (JAXBException e) {
             throw new ServiceConstructionException(e);
         }
@@ -261,57 +249,57 @@
         EndpointInfo aei = applicationEndpoint.getEndpointInfo();
         String transportId = aei.getTransportId();
         EndpointInfo ei = new EndpointInfo(si, transportId);
-        
+
         ei.setAddress(aei.getAddress());
-        
+
         ei.setName(RMConstants.getPortName());
         ei.setBinding(si.getBinding(BINDING_NAME));
 
-        // if addressing was enabled on the application endpoint by means 
-        // of the UsingAddressing element extensor, use this for the 
+        // if addressing was enabled on the application endpoint by means
+        // of the UsingAddressing element extensor, use this for the
         // RM endpoint also
-        
+
         Object ua = getUsingAddressing(aei);
         if (null != ua) {
             ei.addExtensor(ua);
-        } 
+        }
         si.addEndpoint(ei);
-        
+
         endpoint = new WrappedEndpoint(applicationEndpoint, ei, service);
         service.setEndpoint(endpoint);
     }
-    
+
     void setPolicies() {
         // use same WS-policies as for application endpoint
-        PolicyEngine engine = manager.getBus().getExtension(PolicyEngine.class);  
+        PolicyEngine engine = manager.getBus().getExtension(PolicyEngine.class);
         if (null == engine || !engine.isEnabled()) {
             return;
         }
-        
+
         EndpointInfo ei = getEndpoint().getEndpointInfo();
-                
-        PolicyInterceptorProviderRegistry reg = 
-            manager.getBus().getExtension(PolicyInterceptorProviderRegistry.class);
-        EndpointPolicy ep = null == conduit
-            ? engine.getServerEndpointPolicy(applicationEndpoint.getEndpointInfo(), null)
-            : engine.getClientEndpointPolicy(applicationEndpoint.getEndpointInfo(), conduit);
-        
+
+        PolicyInterceptorProviderRegistry reg = manager.getBus()
+            .getExtension(PolicyInterceptorProviderRegistry.class);
+        EndpointPolicy ep = null == conduit ? engine.getServerEndpointPolicy(applicationEndpoint
+            .getEndpointInfo(), null) : engine.getClientEndpointPolicy(applicationEndpoint.getEndpointInfo(),
+                                                                       conduit);
+
         engine.setEndpointPolicy(ei, ep);
-        
+
         EffectivePolicy effectiveOutbound = new EffectivePolicyImpl(ep, reg, true, false);
         EffectivePolicy effectiveInbound = new EffectivePolicyImpl(ep, reg, false, false);
-        
+
         BindingInfo bi = ei.getBinding();
         Collection<BindingOperationInfo> bois = bi.getOperations();
-        
+
         for (BindingOperationInfo boi : bois) {
             engine.setEffectiveServerRequestPolicy(ei, boi, effectiveInbound);
             engine.setEffectiveServerResponsePolicy(ei, boi, effectiveOutbound);
 
             engine.setEffectiveClientRequestPolicy(ei, boi, effectiveOutbound);
-            engine.setEffectiveClientResponsePolicy(ei, boi, effectiveInbound);            
+            engine.setEffectiveClientResponsePolicy(ei, boi, effectiveInbound);
         }
-        
+
         // TODO: FaultPolicy (SequenceFault)
     }
 
@@ -326,12 +314,12 @@
         buildSequenceAckOperationInfo(ii);
         buildLastMessageOperationInfo(ii);
         buildAckRequestedOperationInfo(ii);
-        
+
         // TODO: FaultInfo (SequenceFault)
     }
 
     void buildCreateSequenceOperationInfo(InterfaceInfo ii) {
-        
+
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
         MessageInfo messageInfo = null;
@@ -343,7 +331,7 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceType.class);
-        
+
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceResponseOperationName());
         operationInfo.setOutput(messageInfo.getName().getLocalPart(), messageInfo);
         partInfo = messageInfo.addMessagePart(CREATE_RESPONSE_PART_NAME);
@@ -351,7 +339,7 @@
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
         partInfo.setIndex(-1);
-        
+
         operationInfo = ii.addOperation(RMConstants.getCreateSequenceOnewayOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -359,7 +347,7 @@
         partInfo.setElementQName(RMConstants.getCreateSequenceOperationName());
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceType.class);
-        
+
         operationInfo = ii.addOperation(RMConstants.getCreateSequenceResponseOnewayOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getCreateSequenceResponseOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -368,13 +356,13 @@
         partInfo.setElement(true);
         partInfo.setTypeClass(CreateSequenceResponseType.class);
     }
-    
+
     void buildTerminateSequenceOperationInfo(InterfaceInfo ii) {
-        
+
         OperationInfo operationInfo = null;
         MessagePartInfo partInfo = null;
         MessageInfo messageInfo = null;
-        
+
         operationInfo = ii.addOperation(RMConstants.getTerminateSequenceOperationName());
         messageInfo = operationInfo.createMessage(RMConstants.getTerminateSequenceOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
@@ -393,7 +381,7 @@
         messageInfo = operationInfo.createMessage(RMConstants.getSequenceAckOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
-    
+
     void buildLastMessageOperationInfo(InterfaceInfo ii) {
 
         OperationInfo operationInfo = null;
@@ -403,7 +391,7 @@
         messageInfo = operationInfo.createMessage(RMConstants.getLastMessageOperationName());
         operationInfo.setInput(messageInfo.getName().getLocalPart(), messageInfo);
     }
-    
+
     void buildAckRequestedOperationInfo(InterfaceInfo ii) {
 
         OperationInfo operationInfo = null;
@@ -423,51 +411,50 @@
             BindingOperationInfo boi = null;
             SoapOperationInfo soi = null;
 
-            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), 
-                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getCreateSequenceOperationName(), RMConstants
+                .getCreateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), 
-                RMConstants.getTerminateSequenceOperationName().getLocalPart(), null);
+
+            boi = bi.buildOperation(RMConstants.getTerminateSequenceOperationName(), RMConstants
+                .getTerminateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getTerminateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(), 
-                null, null);
+
+            boi = bi.buildOperation(RMConstants.getSequenceAckOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getSequenceAckAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
+
             boi = bi.buildOperation(RMConstants.getLastMessageOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getLastMessageAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
+
             boi = bi.buildOperation(RMConstants.getAckRequestedOperationName(), null, null);
             assert null != boi;
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getAckRequestedAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
-            
-            boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(), 
-                RMConstants.getCreateSequenceOperationName().getLocalPart(), null);
+
+            boi = bi.buildOperation(RMConstants.getCreateSequenceOnewayOperationName(), RMConstants
+                .getCreateSequenceOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceAction());
             boi.addExtensor(soi);
             bi.addOperation(boi);
 
-            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOnewayOperationName(), 
-                RMConstants.getCreateSequenceResponseOperationName().getLocalPart(), null);
+            boi = bi.buildOperation(RMConstants.getCreateSequenceResponseOnewayOperationName(), RMConstants
+                .getCreateSequenceResponseOperationName().getLocalPart(), null);
             soi = new SoapOperationInfo();
             soi.setAction(RMConstants.getCreateSequenceResponseAction());
             boi.addExtensor(soi);
@@ -475,10 +462,10 @@
 
             si.addBinding(bi);
         }
-        
+
         // TODO: BindingFaultInfo (SequenceFault)
     }
-    
+
     Object getUsingAddressing(EndpointInfo endpointInfo) {
         if (null == endpointInfo) {
             return null;
@@ -489,21 +476,21 @@
         if (null != ua) {
             return ua;
         }
-        exts = endpointInfo.getBinding() != null
-            ? endpointInfo.getBinding().getExtensors(ExtensibilityElement.class) : null;
+        exts = endpointInfo.getBinding() != null ? endpointInfo.getBinding()
+            .getExtensors(ExtensibilityElement.class) : null;
         ua = getUsingAddressing(exts);
         if (null != ua) {
             return ua;
         }
-        exts = endpointInfo.getService() != null
-            ? endpointInfo.getService().getExtensors(ExtensibilityElement.class) : null;
+        exts = endpointInfo.getService() != null ? endpointInfo.getService()
+            .getExtensors(ExtensibilityElement.class) : null;
         ua = getUsingAddressing(exts);
         if (null != ua) {
             return ua;
         }
-        return ua;        
+        return ua;
     }
-    
+
     Object getUsingAddressing(List<ExtensibilityElement> exts) {
         Object ua = null;
         if (exts != null) {
@@ -515,25 +502,25 @@
         }
         return ua;
     }
-    
+
     void setAplicationEndpoint(Endpoint ae) {
         applicationEndpoint = ae;
     }
-    
+
     void setManager(RMManager m) {
         manager = m;
     }
-    
+
     void shutdown() {
         // cancel outstanding timer tasks (deferred acknowledgements)
         // and scheduled termination for all
         // destination sequences of this endpoint
-        
+
         for (DestinationSequence ds : getDestination().getAllSequences()) {
             ds.cancelDeferredAcknowledgments();
             ds.cancelTermination();
         }
-        
+
         // try terminating sequences
         SourcePolicyType sp = manager.getSourcePolicy();
         SequenceTerminationPolicyType stp = null;
@@ -541,47 +528,45 @@
             stp = sp.getSequenceTerminationPolicy();
         }
         if (null != stp && stp.isTerminateOnShutdown()) {
-            
+
             Collection<SourceSequence> seqs = source.getAllUnacknowledgedSequences();
             LOG.log(Level.FINE, "Trying to terminate {0} sequences", seqs.size());
             for (SourceSequence seq : seqs) {
                 try {
-                    // destination MUST respond with a 
+                    // destination MUST respond with a
                     // sequence acknowledgement
                     if (seq.isLastMessage()) {
                         // REVISIT: this may be non-standard
                         // getProxy().ackRequested(seq);
                     } else {
-                        
+
                         getProxy().lastMessage(seq);
                     }
                 } catch (RMException ex) {
                     // already logged
                 }
             }
-        }     
-        
+        }
+
         // cancel outstanding resends for all source sequences
         // of this endpoint
-        
+
         for (SourceSequence ss : getSource().getAllSequences()) {
             manager.getRetransmissionQueue().stop(ss);
         }
     }
-    
-    
-    
+
     class EffectivePolicyImpl implements EffectivePolicy {
-        
+
         private EndpointPolicy endpointPolicy;
         private List<Interceptor> interceptors;
 
-        EffectivePolicyImpl(EndpointPolicy ep, PolicyInterceptorProviderRegistry reg, 
-                            boolean outbound, boolean fault) {
+        EffectivePolicyImpl(EndpointPolicy ep, PolicyInterceptorProviderRegistry reg, boolean outbound,
+                            boolean fault) {
             endpointPolicy = ep;
             interceptors = reg.getInterceptors(endpointPolicy.getChosenAlternative(), outbound, fault);
         }
-        
+
         public Collection<Assertion> getChosenAlternative() {
             return endpointPolicy.getChosenAlternative();
         }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu May 24 10:33:11 2007
@@ -20,6 +20,7 @@
 package org.apache.cxf.ws.rm;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -34,10 +35,22 @@
 import org.apache.cxf.Bus;
 import org.apache.cxf.binding.Binding;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientLifeCycleListener;
+import org.apache.cxf.endpoint.ClientLifeCycleManager;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.endpoint.ServerLifeCycleListener;
+import org.apache.cxf.endpoint.ServerLifeCycleManager;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
 import org.apache.cxf.ws.addressing.RelatesToType;
@@ -46,6 +59,7 @@
 import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
 import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.apache.cxf.ws.rm.policy.RMAssertion.BaseRetransmissionInterval;
@@ -55,7 +69,7 @@
 /**
  * 
  */
-public class RMManager {
+public class RMManager implements ServerLifeCycleListener, ClientLifeCycleListener {
 
     private static final Logger LOG = LogUtils.getL7dLogger(RMManager.class);
 
@@ -65,30 +79,33 @@
     private RetransmissionQueue retransmissionQueue;
     private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
     private Timer timer = new Timer(true);
-    private final ServerLifeCycleListener serverLifeCycleListener;
     private RMAssertion rmAssertion;
     private DeliveryAssuranceType deliveryAssurance;
     private SourcePolicyType sourcePolicy;
     private DestinationPolicyType destinationPolicy;
     
+    // ServerLifeCycleListener
+    
+    public void startServer(Server server) {
+        recoverReliableEndpoint(server.getEndpoint(), null);
+    }
 
-    public RMManager() {
-        serverLifeCycleListener = new ServerLifeCycleListener() {
-
-            public void startServer(Server server) {
-            }
-
-            public void stopServer(Server server) {
-                RMManager.this.shutdownReliableEndpoint(server.getEndpoint());
-            }
-            
-        };        
+    public void stopServer(Server server) {
+        shutdownReliableEndpoint(server.getEndpoint());
     }
     
-    public ServerLifeCycleListener getServerLifeCycleListener() {
-        return serverLifeCycleListener;
+    // ClientLifeCycleListener
+    
+    public void clientCreated(Client client) {
+        recoverReliableEndpoint(client.getEndpoint(), client.getConduit());
+    }
+    
+    public void clientDestroyed(Client client) {
+        shutdownReliableEndpoint(client.getEndpoint());
     }
 
+    // Configuration
+    
     public Bus getBus() {
         return bus;
     }
@@ -136,6 +153,85 @@
     public BindingFaultFactory getBindingFaultFactory(Binding binding) {
         return new SoapFaultFactory(binding);
     }
+    
+    /**  
+     * @return Returns the deliveryAssurance.
+     */
+    public DeliveryAssuranceType getDeliveryAssurance() {
+        return deliveryAssurance;
+    }
+
+    /**
+     * @param deliveryAssurance The deliveryAssurance to set.
+     */
+    public void setDeliveryAssurance(DeliveryAssuranceType deliveryAssurance) {
+        this.deliveryAssurance = deliveryAssurance;
+    }
+
+    /**
+     * @return Returns the destinationPolicy.
+     */
+    public DestinationPolicyType getDestinationPolicy() {
+        return destinationPolicy;
+    }
+
+    /**
+     * @param destinationPolicy The destinationPolicy to set.
+     */
+    public void setDestinationPolicy(DestinationPolicyType destinationPolicy) {
+        this.destinationPolicy = destinationPolicy;
+    }
+
+    /** 
+     * @return Returns the rmAssertion.
+     */
+    public RMAssertion getRMAssertion() {
+        return rmAssertion;
+    }
+
+    /**
+     * @param rma The rmAssertion to set.
+     */
+    public void setRMAssertion(RMAssertion rma) {
+        org.apache.cxf.ws.rm.policy.ObjectFactory factory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
+        if (null == rma) {
+            rma = factory.createRMAssertion();
+            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
+        }
+        BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+        if (null == bri) {
+            bri = factory.createRMAssertionBaseRetransmissionInterval();
+            rma.setBaseRetransmissionInterval(bri);
+        }
+        if (null == bri.getMilliseconds()) {
+            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
+        }
+
+        rmAssertion = rma;
+    }
+
+    /** 
+     * @return Returns the sourcePolicy.
+     */
+    public SourcePolicyType getSourcePolicy() {
+        return sourcePolicy;
+    }
+    
+    /**
+     * @param sp The sourcePolicy to set.
+     */
+    public void setSourcePolicy(SourcePolicyType sp) {
+        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
+        if (null == sp) {
+            sp = factory.createSourcePolicyType();
+        }
+        if (!sp.isSetSequenceTerminationPolicy()) {
+            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
+        }
+        sourcePolicy = sp;
+    }
+    
+    // The real stuff ...
 
     public synchronized RMEndpoint getReliableEndpoint(Message message) {
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
@@ -149,7 +245,7 @@
         }
         RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
-            rme = createReliableEndpoint(this, endpoint);
+            rme = createReliableEndpoint(endpoint);
             org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
             org.apache.cxf.ws.addressing.EndpointReferenceType replyTo = null;
             if (null != destination) {
@@ -232,10 +328,11 @@
 
     @PreDestroy
     public void shutdown() {
-        
         // shutdown remaining endpoints 
         
-        for (RMEndpoint rme : reliableEndpoints.values()) {
+        LOG.log(Level.FINE, "Shutting down RMManager with {0} remaining endpoints.",
+                reliableEndpoints.size());
+        for (RMEndpoint rme : reliableEndpoints.values()) {            
             rme.shutdown();
         }
 
@@ -261,12 +358,72 @@
         reliableEndpoints.remove(e);
     }
     
-    RMEndpoint createReliableEndpoint(RMManager manager, Endpoint endpoint) {
-        return new RMEndpoint(manager, endpoint);
+    void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
+        if (null == store || null == retransmissionQueue) {
+            return;
+        }
+        
+        RMEndpoint rme = createReliableEndpoint(endpoint);        
+        rme.initialise(conduit, null);
+        reliableEndpoints.put(endpoint, rme);
+        
+        String id = RMUtils.getEndpointIdentifier(endpoint);
+        LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+                new Object[] {null == conduit ? "client" : "server", id});
+        Collection<SourceSequence> sss = store.getSourceSequences(id);
+        if (null == sss || 0 == sss.size()) {                        
+            return;
+        }
+        LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
+        for (SourceSequence ss : sss) {
+            rme.getSource().addSequence(ss, false);
+ 
+            Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
+            if (null == ms) {
+                continue;
+            }
+            LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+            for (RMMessage m : ms) {
+                
+                Message message = new MessageImpl();
+                Exchange exchange = new ExchangeImpl();
+                message.setExchange(exchange);
+                if (null != conduit) {
+                    exchange.setConduit(conduit);
+                    message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+                }
+                exchange.put(Endpoint.class, endpoint);
+                exchange.put(Service.class, endpoint.getService());
+                if (endpoint.getEndpointInfo().getService() != null) {
+                    exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+                    exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+                }
+                exchange.put(Binding.class, endpoint.getBinding());
+                exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+                exchange.put(Bus.class, bus);
+                
+                SequenceType st = RMUtils.getWSRMFactory().createSequenceType();
+                st.setIdentifier(ss.getIdentifier());
+                st.setMessageNumber(m.getMessageNumber());
+                if (ss.isLastMessage() && ss.getCurrentMessageNr().equals(m.getMessageNumber())) {
+                    st.setLastMessage(RMUtils.getWSRMFactory().createSequenceTypeLastMessage());
+                }
+                RMProperties rmps = new RMProperties();
+                rmps.setSequence(st);
+                RMContextUtils.storeRMProperties(message, rmps, true);
+                                    
+                message.setContent(byte[].class, m.getContent());
+                          
+                retransmissionQueue.addUnacknowledged(message);
+            }
+        }
+        retransmissionQueue.start();
+        
     }
     
-    // configuration
-    
+    RMEndpoint createReliableEndpoint(Endpoint endpoint) {
+        return new RMEndpoint(this, endpoint);
+    }  
    
     @PostConstruct
     void initialise() {
@@ -295,6 +452,21 @@
             idGenerator = new DefaultSequenceIdentifierGenerator();
         }
     }
+    
+    @PostConstruct
+    void registerListeners() {
+        if (null == bus) { 
+            return;
+        }
+        ServerLifeCycleManager slm = bus.getExtension(ServerLifeCycleManager.class);
+        if (null != slm) {
+            slm.registerListener(this);
+        }
+        ClientLifeCycleManager clm = bus.getExtension(ClientLifeCycleManager.class);
+        if (null != clm) {
+            clm.registerListener(this);
+        }
+    }
 
    
     Map<Endpoint, RMEndpoint> getReliableEndpointsMap() {
@@ -315,81 +487,6 @@
         }
     }
 
-    /**  
-     * @return Returns the deliveryAssurance.
-     */
-    public DeliveryAssuranceType getDeliveryAssurance() {
-        return deliveryAssurance;
-    }
-
-    /**
-     * @param deliveryAssurance The deliveryAssurance to set.
-     */
-    public void setDeliveryAssurance(DeliveryAssuranceType deliveryAssurance) {
-        this.deliveryAssurance = deliveryAssurance;
-    }
-
-    /**
-     * @return Returns the destinationPolicy.
-     */
-    public DestinationPolicyType getDestinationPolicy() {
-        return destinationPolicy;
-    }
-
-    /**
-     * @param destinationPolicy The destinationPolicy to set.
-     */
-    public void setDestinationPolicy(DestinationPolicyType destinationPolicy) {
-        this.destinationPolicy = destinationPolicy;
-    }
-
-    /** 
-     * @return Returns the rmAssertion.
-     */
-    public RMAssertion getRMAssertion() {
-        return rmAssertion;
-    }
-
-    /**
-     * @param rma The rmAssertion to set.
-     */
-    public void setRMAssertion(RMAssertion rma) {
-        org.apache.cxf.ws.rm.policy.ObjectFactory factory = new org.apache.cxf.ws.rm.policy.ObjectFactory();
-        if (null == rma) {
-            rma = factory.createRMAssertion();
-            rma.setExponentialBackoff(factory.createRMAssertionExponentialBackoff());
-        }
-        BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
-        if (null == bri) {
-            bri = factory.createRMAssertionBaseRetransmissionInterval();
-            rma.setBaseRetransmissionInterval(bri);
-        }
-        if (null == bri.getMilliseconds()) {
-            bri.setMilliseconds(new BigInteger(RetransmissionQueue.DEFAULT_BASE_RETRANSMISSION_INTERVAL));
-        }
-
-        rmAssertion = rma;
-    }
-
-    /** 
-     * @return Returns the sourcePolicy.
-     */
-    public SourcePolicyType getSourcePolicy() {
-        return sourcePolicy;
-    }
     
-    /**
-     * @param sp The sourcePolicy to set.
-     */
-    public void setSourcePolicy(SourcePolicyType sp) {
-        org.apache.cxf.ws.rm.manager.ObjectFactory factory = new org.apache.cxf.ws.rm.manager.ObjectFactory();
-        if (null == sp) {
-            sp = factory.createSourcePolicyType();
-        }
-        if (!sp.isSetSequenceTerminationPolicy()) {
-            sp.setSequenceTerminationPolicy(factory.createSequenceTerminationPolicyType());
-        }
-        sourcePolicy = sp;
-    }
 
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMOutInterceptor.java Thu May 24 10:33:11 2007
@@ -21,6 +21,7 @@
 
 import java.math.BigInteger;
 import java.util.Collection;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -112,7 +113,6 @@
         
         if ((isApplicationMessage || isLastMessage)
             && !isPartialResponse) {
-      
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue()));
             }
@@ -120,7 +120,13 @@
             // get the current sequence, requesting the creation of a new one if necessary
             
             synchronized (source) {
-                SourceSequence seq = getManager().getSequence(inSeqId, message, maps);
+                SourceSequence seq = null;
+                if (isLastMessage) {
+                    Map<?, ?> invocationContext = (Map)message.get(Message.INVOCATION_CONTEXT);
+                    seq = (SourceSequence)invocationContext.get(SourceSequence.class.getName());
+                } else {
+                    seq = getManager().getSequence(inSeqId, message, maps);
+                }
                 assert null != seq;
 
                 // increase message number and store a sequence type object in

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMUtils.java Thu May 24 10:33:11 2007
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.text.MessageFormat;
+
+import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.ws.addressing.AddressingConstants;
 import org.apache.cxf.ws.addressing.AddressingConstantsImpl;
 import org.apache.cxf.ws.addressing.VersionTransformer;
@@ -87,4 +90,11 @@
         epr.setAddress(uri);
         return epr;
     } 
+    
+    public static String getEndpointIdentifier(Endpoint endpoint) {
+        return MessageFormat.format("{0}.{1}", new Object[] {
+            endpoint.getEndpointInfo().getService().getName(),
+            endpoint.getEndpointInfo().getName()
+        });
+    }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu May 24 10:33:11 2007
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.ws.rm;
 
-import java.util.Collection;
-
 import org.apache.cxf.message.Message;
 
 public interface RetransmissionQueue {
@@ -39,10 +37,9 @@
      * @return true if there are no unacknowledged messages in the queue
      */
     boolean isEmpty();
-   
+    
     /**
-     * Accepts a new context for posible future retransmission.
-     * 
+     * Accepts a new context for posible future retransmission. 
      * @param ctx the message context.
      */
     void addUnacknowledged(Message message);
@@ -64,12 +61,6 @@
      * Stops retransmission queue.
      */
     void stop(SourceSequence seq);
-    
-    /**
-     * Populates the retransmission queue with messages recovered from
-     * persistent store.
-     */
-    void populate(Collection<SourceSequence> sss);
     
     
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Thu May 24 10:33:11 2007
@@ -58,6 +58,9 @@
             LOG.fine("No operation info."); 
             return null;
         }
+        
+        // TODO: throw Fault, see AbstractRMInterceptor
+        
         if (RMConstants.getCreateSequenceOperationName().equals(oi.getName())
             || RMConstants.getCreateSequenceOnewayOperationName().equals(oi.getName())) {
             try {
@@ -73,6 +76,14 @@
             try {
                 createSequenceResponse(createResponse);
             } catch (SequenceFault ex) {
+                ex.printStackTrace();
+            }
+        } else if (RMConstants.getTerminateSequenceOperationName().equals(oi.getName())) {            
+            try {
+                terminateSequence(exchange.getInMessage());
+            } catch (SequenceFault ex) {
+                ex.printStackTrace();
+            } catch (RMException ex) {
                 ex.printStackTrace();
             }
         }

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu May 24 10:33:11 2007
@@ -107,10 +107,6 @@
         return 0 == getUnacknowledged().size();
     }
 
-    public void populate(Collection<SourceSequence> sss) {
-        // TODO Auto-generated method stub
-    }
-
     /**
      * Purge all candidates for the given sequence that have been acknowledged.
      * 
@@ -199,14 +195,16 @@
      * 
      * @param ctx the message context.
      * @return ResendCandidate
-     */
+     */    
     protected ResendCandidate cacheUnacknowledged(Message message) {
-        ResendCandidate candidate = null;
         RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
         SequenceType st = rmps.getSequence();
         Identifier sid = st.getIdentifier();
+        String key = sid.getValue();
+        
+        ResendCandidate candidate = null;
+        
         synchronized (this) {
-            String key = sid.getValue();
             List<ResendCandidate> sequenceCandidates = getSequenceCandidates(key);
             if (null == sequenceCandidates) {
                 sequenceCandidates = new ArrayList<ResendCandidate>();
@@ -327,7 +325,15 @@
             }
             ByteArrayOutputStream savedOutputStream = (ByteArrayOutputStream)message
                 .get(RMMessageConstants.SAVED_OUTPUT_STREAM);
-            ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
+            byte[] content = null;
+            if (null == savedOutputStream) {                
+                content = message.getContent(byte[].class); 
+                LOG.fine("Using saved byte array: " + content);
+            } else {
+                content = savedOutputStream.toByteArray();
+                LOG.fine("Using saved output stream: " + savedOutputStream);
+            }
+            ByteArrayInputStream bis = new ByteArrayInputStream(content);
 
             // copy saved output stream to new output stream in chunks of 1024
             AbstractCachedOutputStream.copyStream(bis, os, 1024);

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/AbstractEndpointTest.java Thu May 24 10:33:11 2007
@@ -48,15 +48,12 @@
     
     @Test
     public void testAccessors() {
-        String n = "abc";
-        EasyMock.expect(rme.getName()).andReturn(n);
         Endpoint ae = control.createMock(Endpoint.class);
         EasyMock.expect(rme.getApplicationEndpoint()).andReturn(ae);
         RMManager mgr = control.createMock(RMManager.class);
         EasyMock.expect(rme.getManager()).andReturn(mgr);
         control.replay();
         AbstractEndpoint tested = new AbstractEndpoint(rme);
-        assertSame(n, tested.getName());
         assertSame(rme, tested.getReliableEndpoint());
         assertSame(ae, tested.getEndpoint());
         assertSame(mgr, tested.getManager());

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ProxyTest.java Thu May 24 10:33:11 2007
@@ -164,7 +164,7 @@
         EasyMock.expect(si.getInterface()).andReturn(ii);
         OperationInfo oi = control.createMock(OperationInfo.class);
         EasyMock.expect(ii.getOperation(RMConstants.getLastMessageOperationName())).andReturn(oi);
-        expectInvoke(proxy, oi, null);
+        expectInvokeWithContext(proxy, oi, null);
         control.replay();
         
         proxy.lastMessage(ss);
@@ -350,5 +350,12 @@
     private void expectInvoke(Proxy proxy, OperationInfo oi, Object expectedReturn) throws RMException {
         EasyMock.expect(proxy.invoke(EasyMock.same(oi), EasyMock.isA(Object[].class), 
             (Map)EasyMock.isNull())).andReturn(expectedReturn);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void expectInvokeWithContext(Proxy proxy, OperationInfo oi, Object expectedReturn) 
+        throws RMException {
+        EasyMock.expect(proxy.invoke(EasyMock.same(oi), EasyMock.isA(Object[].class), 
+            EasyMock.isA(Map.class))).andReturn(expectedReturn);
     }
 }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Thu May 24 10:33:11 2007
@@ -84,20 +84,6 @@
     }
 
     @Test
-    public void testGetName() {
-        EndpointInfo aei = control.createMock(EndpointInfo.class);
-        EasyMock.expect(ae.getEndpointInfo()).andReturn(aei).times(2);
-        QName eqn = new QName("ns2", "endpoint");
-        EasyMock.expect(aei.getName()).andReturn(eqn);
-        ServiceInfo asi = control.createMock(ServiceInfo.class);
-        EasyMock.expect(aei.getService()).andReturn(asi);
-        QName sqn = new QName("ns1", "service");
-        EasyMock.expect(asi.getName()).andReturn(sqn);
-        control.replay();
-        assertEquals("{ns1}service.{ns2}endpoint", rme.getName());
-    }
-
-    @Test
     public void testGetManager() {
         control.replay();
         assertSame(manager, rme.getManager());

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=541364&r1=541363&r2=541364
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Thu May 24 10:33:11 2007
@@ -21,21 +21,29 @@
 
 import java.lang.reflect.Method;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.TimerTask;
 
 import javax.xml.namespace.QName;
 
 import org.apache.cxf.Bus;
+import org.apache.cxf.binding.Binding;
 import org.apache.cxf.binding.soap.SoapBinding;
 import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.endpoint.Server;
-import org.apache.cxf.endpoint.ServerLifeCycleListener;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
@@ -45,6 +53,7 @@
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
 import org.apache.cxf.ws.rm.manager.SourcePolicyType;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMAssertion;
 import org.easymock.classextension.EasyMock;
@@ -55,22 +64,23 @@
 
 public class RMManagerTest extends Assert {
     
-    private IMocksControl control;
+    private MyControl control;
+    private RMManager manager;
     
     @Before
     public void setUp() {
-        control = EasyMock.createNiceControl();
+        // control = EasyMock.createNiceControl();
+        control = new MyControl();
     }
    
     @Test
     public void testAccessors() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull(manager.getBus());
         assertNull(manager.getStore());
         assertNull(manager.getRetransmissionQueue());
         assertNotNull(manager.getTimer());
-        assertNotNull(manager.getServerLifeCycleListener());
-        
+
         Bus bus = control.createMock(Bus.class);
         RMStore store = control.createMock(RMStore.class);
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
@@ -83,12 +93,11 @@
         assertSame(queue, manager.getRetransmissionQueue());
         control.replay();
         control.verify();
-        
     }
     
     @Test
     public void testInitialisation() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull("RMAssertion is set.", manager.getRMAssertion());
         assertNull("sourcePolicy is set.", manager.getSourcePolicy());
         assertNull("destinationPolicy is set.", manager.getDestinationPolicy());
@@ -120,25 +129,64 @@
     } 
     
     @Test
-    public void testServerLifecycleLister() {
-        RMManager manager = new RMManager();
+    public void testStartServer() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("recoverReliableEndpoint", new Class[] {Endpoint.class, Conduit.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Server s = control.createMock(Server.class);
         Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(s.getEndpoint()).andReturn(e);
+        manager.recoverReliableEndpoint(e, null);
+        EasyMock.expectLastCall();
         control.replay();
-        ServerLifeCycleListener sl = manager.getServerLifeCycleListener();
-        sl.startServer(s);
-        sl.stopServer(s);
+        manager.startServer(s);
         control.verify();
-        
-        control.reset();
+    }
+    
+    @Test
+    public void testStopServer() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("shutdownReliableEndpoint", new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Server s = control.createMock(Server.class);
+        Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(s.getEndpoint()).andReturn(e);
-        RMEndpoint rme = control.createMock(RMEndpoint.class);
-        manager.getReliableEndpointsMap().put(e, rme);
-        rme.shutdown();
+        manager.shutdownReliableEndpoint(e);
         EasyMock.expectLastCall();
         control.replay();
-        sl.stopServer(s);
+        manager.stopServer(s);
+        control.verify();
+    }
+    
+    @Test
+    public void testClientCreated() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("recoverReliableEndpoint", new Class[] {Endpoint.class, Conduit.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Client client = control.createMock(Client.class);
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EasyMock.expect(client.getEndpoint()).andReturn(endpoint);
+        Conduit conduit = control.createMock(Conduit.class);
+        EasyMock.expect(client.getConduit()).andReturn(conduit);
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        EasyMock.expectLastCall();
+        control.replay();
+        manager.clientCreated(client);
+        control.verify();
+    }
+    
+    @Test
+    public void testClientDestroyed() throws NoSuchMethodException {
+        Method m = RMManager.class
+            .getDeclaredMethod("shutdownReliableEndpoint", new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
+        Client c = control.createMock(Client.class);
+        Endpoint e = control.createMock(Endpoint.class);
+        EasyMock.expect(c.getEndpoint()).andReturn(e);
+        manager.shutdownReliableEndpoint(e);
+        EasyMock.expectLastCall();
+        control.replay();
+        manager.clientDestroyed(c);
         control.verify();
     }
     
@@ -151,8 +199,8 @@
     @Test
     public void testGetReliableEndpointServerSideCreate() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -166,7 +214,7 @@
         Endpoint e = control.createMock(Endpoint.class);
         EasyMock.expect(wre.getWrappedEndpoint()).andReturn(e);        
         RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.createReliableEndpoint(manager, e)).andReturn(rme);
+        EasyMock.expect(manager.createReliableEndpoint(e)).andReturn(rme);
         org.apache.cxf.transport.Destination destination = control
             .createMock(org.apache.cxf.transport.Destination.class);
         EasyMock.expect(exchange.getDestination()).andReturn(destination);
@@ -198,8 +246,8 @@
     @Test
     public void testGetReliableEndpointClientSideCreate() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -211,7 +259,7 @@
         QName name = new QName("http://x.y.z/a", "GreeterPort");
         EasyMock.expect(ei.getName()).andReturn(name);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.createReliableEndpoint(manager, endpoint)).andReturn(rme);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
         EasyMock.expect(exchange.getDestination()).andReturn(null);
         Conduit conduit = control.createMock(Conduit.class);
         EasyMock.expect(exchange.getConduit(message)).andReturn(conduit);
@@ -236,8 +284,8 @@
     @Test
     public void testGetReliableEndpointExisting() throws NoSuchMethodException {
         Method m = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
-            new Class[] {RMManager.class, Endpoint.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+            new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
@@ -260,7 +308,7 @@
     public void testGetDestination() throws NoSuchMethodException {
         Method  m = RMManager.class
             .getDeclaredMethod("getReliableEndpoint", new Class[] {Message.class});        
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);    
@@ -282,7 +330,7 @@
     public void testGetSource() throws NoSuchMethodException {
         Method m = RMManager.class
             .getDeclaredMethod("getReliableEndpoint", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.getReliableEndpoint(message)).andReturn(rme);
@@ -304,7 +352,7 @@
     public void testGetExistingSequence() throws NoSuchMethodException, SequenceFault, RMException {
         Method m = RMManager.class
            .getDeclaredMethod("getSource", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         Identifier inSid = control.createMock(Identifier.class);
         
@@ -320,7 +368,7 @@
     @Test
     public void testGetNewSequence() throws NoSuchMethodException, SequenceFault, RMException {
         Method m = RMManager.class.getDeclaredMethod("getSource", new Class[] {Message.class});
-        RMManager manager = control.createMock(RMManager.class, new Method[] {m});
+        manager = control.createMock(RMManager.class, new Method[] {m});
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
         EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
@@ -366,7 +414,7 @@
     @Test
     public void testShutdown() {
         Bus bus = new SpringBusFactory().createBus("org/apache/cxf/ws/rm/rmmanager.xml", false);
-        RMManager manager = bus.getExtension(RMManager.class);        
+        manager = bus.getExtension(RMManager.class);        
         Endpoint e = control.createMock(Endpoint.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         manager.getReliableEndpointsMap().put(e, rme);
@@ -385,11 +433,12 @@
         } catch (IllegalStateException ex) {
             // expected
         }
+        control.verify();
     }
     
     @Test
     public void testShutdownReliableEndpoint() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         Endpoint e = control.createMock(Endpoint.class);
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         control.replay();
@@ -402,12 +451,131 @@
         EasyMock.expectLastCall();
         control.replay();
         manager.shutdownReliableEndpoint(e);
-        assertNull(manager.getReliableEndpointsMap().get(e));        
+        assertNull(manager.getReliableEndpointsMap().get(e));  
+        control.verify();
+    }
+    
+    @Test
+    public void testRecoverReliableEndpoint() {
+        manager = new RMManager();
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        Conduit conduit = control.createMock(Conduit.class);
+                
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        
+        RMStore store = control.createMock(RMStore.class);
+        manager.setStore(store);
+       
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();           
+    }
+    
+    @Test
+    public void testRecoverReliableClientEndpoint() throws NoSuchMethodException {
+        Method method = RMManager.class.getDeclaredMethod("createReliableEndpoint", 
+                                                          new Class[] {Endpoint.class});
+        manager = control.createMock(RMManager.class, new Method[] {method});
+        Endpoint endpoint = control.createMock(Endpoint.class);
+        EndpointInfo ei = control.createMock(EndpointInfo.class);
+        ServiceInfo si = control.createMock(ServiceInfo.class);  
+        BindingInfo bi = control.createMock(BindingInfo.class);
+        InterfaceInfo ii = control.createMock(InterfaceInfo.class);
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
+        Conduit conduit = control.createMock(Conduit.class);        
+        setUpRecoverReliableEndpoint(endpoint, conduit, null, null);
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
+        SourceSequence ss = control.createMock(SourceSequence.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, null);
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();
+        
+        control.reset();
+        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);  
+        RMMessage m = control.createMock(RMMessage.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, m);        
+        control.replay();
+        manager.recoverReliableEndpoint(endpoint, conduit);
+        control.verify();        
+    }
+    
+    Endpoint setUpEndpointForRecovery(Endpoint endpoint, 
+                                      EndpointInfo ei, 
+                                    ServiceInfo si,
+                                    BindingInfo bi,
+                                    InterfaceInfo ii) {   
+        EasyMock.expect(endpoint.getEndpointInfo()).andReturn(ei).anyTimes();     
+        EasyMock.expect(ei.getService()).andReturn(si).anyTimes();
+        EasyMock.expect(si.getName()).andReturn(new QName("S", "s")).anyTimes();
+        EasyMock.expect(ei.getName()).andReturn(new QName("P", "p")).anyTimes();
+        EasyMock.expect(si.getInterface()).andReturn(ii).anyTimes();
+        return endpoint;
+    }
+    
+    void setUpRecoverReliableEndpoint(Endpoint endpoint,
+                                      Conduit conduit, 
+                                      SourceSequence ss, 
+                                      RMMessage m)  {                
+        RMStore store = control.createMock(RMStore.class);
+        RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
+        manager.setStore(store);
+        manager.setRetransmissionQueue(queue);
+        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+        Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
+        if (null != ss) {
+            sss.add(ss);            
+        }
+        EasyMock.expect(store.getSourceSequences("{S}s.{P}p")).andReturn(sss);
+        if (null == ss) {
+            return;
+        } 
+        Source source = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(source);
+        source.addSequence(ss, false);
+        EasyMock.expectLastCall();
+        
+        Collection<RMMessage> ms = new ArrayList<RMMessage>();
+        if (null != m) {
+            ms.add(m);
+        }
+        Identifier id = new Identifier();
+        id.setValue("S1");
+        EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
+        EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
+        if (null == m) {
+            return;
+        }
+        Service service = control.createMock(Service.class);
+        EasyMock.expect(endpoint.getService()).andReturn(service);
+        Binding binding = control.createMock(Binding.class);
+        EasyMock.expect(endpoint.getBinding()).andReturn(binding);
+       
+        EasyMock.expect(ss.isLastMessage()).andReturn(true);
+        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN);
+        EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2);
+        byte[] content = new byte[] {'x', '9'};
+        EasyMock.expect(m.getContent()).andReturn(content);
+        queue.addUnacknowledged(EasyMock.isA(Message.class));
+        EasyMock.expectLastCall();
+        queue.start();
+        EasyMock.expectLastCall(); 
     }
     
     @Test
     public void testDefaultSequenceIdentifierGenerator() {
-        RMManager manager = new RMManager();
+        manager = new RMManager();
         assertNull(manager.getIdGenerator());
         SequenceIdentifierGenerator generator = manager.new DefaultSequenceIdentifierGenerator();
         manager.setIdGenerator(generator);
@@ -419,7 +587,41 @@
         assertTrue(id1 != id2);
         assertTrue(!id1.getValue().equals(id2.getValue()));     
         control.replay();
-    }
+    }   
     
-     
+    class MyControl {
+        private IMocksControl c;
+        private List<Object> mocks;
+        
+        MyControl() {
+            c = EasyMock.createNiceControl();
+            mocks = new ArrayList<Object>();
+        }
+        
+        void replay() {
+            c.replay();
+        }
+        
+        void reset() {
+            c.reset();
+        }
+        
+        void verify() {
+            c.verify();
+        }
+        
+        <T> T createMock(Class<T> cls) {
+            T mock = c.createMock(cls);
+            mocks.add(mock);
+            return mock;
+        }
+        
+        <T> T createMock(Class<T> cls, Method[] m) {
+            T mock = c.createMock(cls, m);
+            mocks.add(mock);
+            return mock;
+        }
+        
+         
+    }
 } 



Mime
View raw message