cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r911624 - in /cxf/trunk: api/src/main/java/org/apache/cxf/transport/ api/src/main/java/org/apache/cxf/ws/addressing/ rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/ma...
Date Thu, 18 Feb 2010 22:37:11 GMT
Author: dkulp
Date: Thu Feb 18 22:37:11 2010
New Revision: 911624

URL: http://svn.apache.org/viewvc?rev=911624&view=rev
Log:
Pull the back channel stuff out of the conduit and put it in the ws-addr
layer so the ws-addressing client side stuff is transport agnostic in
regards to ws-addressing.

Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/transport/Conduit.java
    cxf/trunk/api/src/main/java/org/apache/cxf/transport/Observable.java
    cxf/trunk/api/src/main/java/org/apache/cxf/ws/addressing/WSAContextUtils.java
    cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaConduit.java
    cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaServerConduit.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java
    cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
    cxf/trunk/rt/transports/http/pom.xml
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
    cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
    cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
    cxf/trunk/rt/ws/addr/src/test/java/org/apache/cxf/ws/addressing/MAPAggregatorTest.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/transport/Conduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/transport/Conduit.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/transport/Conduit.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/transport/Conduit.java Thu Feb 18 22:37:11 2010
@@ -65,14 +65,6 @@
     EndpointReferenceType getTarget();
     
     /**
-     * Retreive the back-channel Destination.
-     * 
-     * @return the backchannel Destination (or null if the backchannel is
-     * built-in)
-     */
-    Destination getBackChannel();
-        
-    /**
      * Close the conduit
      */
     void close();

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/transport/Observable.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/transport/Observable.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/transport/Observable.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/transport/Observable.java Thu Feb 18 22:37:11 2010
@@ -29,4 +29,11 @@
      * @param observer the observer to notify on receipt of incoming
      */
     void setMessageObserver(MessageObserver observer);
+    
+    
+    /**
+     * Retrieves the message observer for incoming messages
+     * @return
+     */
+    MessageObserver getMessageObserver();
 }

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/ws/addressing/WSAContextUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/ws/addressing/WSAContextUtils.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/ws/addressing/WSAContextUtils.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/ws/addressing/WSAContextUtils.java Thu Feb 18 22:37:11 2010
@@ -28,11 +28,11 @@
  * Holder for utility methods relating to contexts.
  */
 public final class WSAContextUtils {
+    public static final String REPLYTO_PROPERTY =
+        "org.apache.cxf.ws.addressing.replyto";
 
     private static final String TO_PROPERTY =
         "org.apache.cxf.ws.addressing.to";
-    private static final String REPLYTO_PROPERTY =
-        "org.apache.cxf.ws.addressing.replyto";
     private static final String USING_PROPERTY =
         "org.apache.cxf.ws.addressing.using";    
 
@@ -91,29 +91,5 @@
         }
         return to;
     }
-    
-    /**
-     * Store ReplyTo EPR in the context
-     *
-     * @param replyTo the ReplyTo EPR
-     * @param message the current message
-     */   
-    public static void storeReplyTo(EndpointReferenceType replyTo,
-                                    Message message) {
-        message.put(REPLYTO_PROPERTY, replyTo);
-    }
 
-    /**
-     * Retrieve ReplyTo EPR from the context.
-     *
-     * @param conduit the Conduit if available
-     * @param message the current message
-     * @return the retrieved EPR
-     */
-    public static EndpointReferenceType retrieveReplyTo(Conduit conduit,
-                                                        Message message) {
-        return conduit != null
-               ? conduit.getBackChannel().getAddress()
-               : (EndpointReferenceType)message.get(REPLYTO_PROPERTY); 
-    }
 }

Modified: cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaConduit.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaConduit.java (original)
+++ cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaConduit.java Thu Feb 18 22:37:11 2010
@@ -50,7 +50,6 @@
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -156,10 +155,6 @@
         return target;
     }
 
-    public Destination getBackChannel() {
-        return null;
-    }
-
     public void close() {
         
     }
@@ -401,4 +396,9 @@
         }
 
     }
+
+    @Override
+    public MessageObserver getMessageObserver() {
+        return this.incomingObserver;
+    }
 }

Modified: cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaServerConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaServerConduit.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaServerConduit.java (original)
+++ cxf/trunk/rt/bindings/corba/src/main/java/org/apache/cxf/binding/corba/CorbaServerConduit.java Thu Feb 18 22:37:11 2010
@@ -31,7 +31,6 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -81,15 +80,14 @@
         return target;
     }
 
-    public Destination getBackChannel() {
-        return null;
-    }
-
     public void close() {
     }
 
     public void setMessageObserver(MessageObserver observer) {
     }
+    public MessageObserver getMessageObserver() {
+        return null;
+    }
 
     public final EndpointReferenceType getTargetReference(EndpointReferenceType t) {
         EndpointReferenceType ref = null;

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientLifeCycleManagerImpl.java Thu Feb 18 22:37:11 2010
@@ -35,7 +35,9 @@
     }
 
     public void registerListener(ClientLifeCycleListener listener) {
-        listeners.add(listener);
+        if (!listeners.contains(listener)) {
+            listeners.add(listener);
+        }
     }
 
     public void clientCreated(Client client) {

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java Thu Feb 18 22:37:11 2010
@@ -48,16 +48,6 @@
     }
 
     /**
-     * Retrieve the back-channel Destination.
-     * 
-     * @return the backchannel Destination (or null if the backchannel is
-     * built-in)
-     */
-    public Destination getBackChannel() {
-        return null;
-    }
-
-    /**
      * @param message for which content should be closed.
      */    
     public void close(Message msg) throws IOException {

Modified: cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Thu Feb 18 22:37:11 2010
@@ -399,8 +399,6 @@
         Conduit backChannel = destination.getBackChannel(inMessage, null, null);
         
         assertNotNull("expected back channel", backChannel);
-        assertNull("unexpected backchannel-backchannel",
-                   backChannel.getBackChannel());
         assertEquals("unexpected target",
                      EndpointReferenceUtils.ANONYMOUS_ADDRESS,
                      backChannel.getTarget().getAddress().getValue());

Modified: cxf/trunk/rt/transports/http/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/pom.xml?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/pom.xml (original)
+++ cxf/trunk/rt/transports/http/pom.xml Thu Feb 18 22:37:11 2010
@@ -89,6 +89,7 @@
                                     <catalog>${basedir}/src/main/build-resources/catalog.cat</catalog>
                                     <extensionArgs>
                                         <extensionArg>-Xdv</extensionArg>
+                                        <extensionArg>-Xproperty-listener</extensionArg>
                                     </extensionArgs>
                                     <deleteDirs>
                                         <deleteDir>${basedir}/target/generated/src/main/java/org/apache/cxf/wsdl</deleteDir>
@@ -103,6 +104,7 @@
                 </executions>
                 <configuration>
                      <extensions>
+                         <extension>org.apache.cxf.xjcplugins:cxf-xjc-pl:${cxf.xjc-utils.version}</extension>
                          <extension>org.apache.cxf.xjcplugins:cxf-xjc-dv:${cxf.xjc-utils.version}</extension>
                      </extensions>
                  </configuration>

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Thu Feb 18 22:37:11 2010
@@ -19,6 +19,8 @@
 package org.apache.cxf.transport.http;
 
 
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -69,9 +71,6 @@
 import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.policy.PolicyUtils;
 import org.apache.cxf.transport.https.CertConstraints;
@@ -84,7 +83,6 @@
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.policy.Assertor;
 import org.apache.cxf.ws.policy.PolicyEngine;
-import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
 import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
 
@@ -147,7 +145,7 @@
 @NoJSR250Annotations
 public class HTTPConduit 
     extends AbstractConduit 
-    implements Configurable, Assertor {  
+    implements Configurable, Assertor, PropertyChangeListener {  
 
     /**
      *  This constant is the Message(Map) key for the HttpURLConnection that
@@ -209,10 +207,6 @@
     private URL defaultEndpointURL;
     private boolean fromEndpointReferenceType;
 
-    private Destination decoupledDestination;
-    private MessageObserver decoupledObserver;
-    private int decoupledDestinationRefCount;
-    
     // Configurable values
     
     /**
@@ -431,6 +425,14 @@
         // We have finalized the configuration. Any configurable entity
         // set now, must make changes dynamically.
         configFinalized = true;
+        
+        if (getClient().getDecoupledEndpoint() != null) {
+            this.endpointInfo.setProperty("org.apache.cxf.ws.addressing.replyto",
+                                          getClient().getDecoupledEndpoint());
+        }
+        if (clientSidePolicy != null) {
+            clientSidePolicy.addPropertyChangeListener(this);
+        }
     }
     
     /**
@@ -762,20 +764,7 @@
         }        
         return new URL(result);    
     }
-    
-    /**
-     * Retreive the back-channel Destination.
-     * 
-     * @return the backchannel Destination (or null if the backchannel is
-     * built-in)
-     */
-    public synchronized Destination getBackChannel() {
-        if (decoupledDestination == null
-            &&  getClient().getDecoupledEndpoint() != null) {
-            setUpDecoupledDestination(); 
-        }
-        return decoupledDestination;
-    }
+
 
     /**
      * Close the conduit
@@ -793,13 +782,6 @@
             //defaultEndpointURL = null;
         }
     
-        // in decoupled case, close response Destination if reference count
-        // hits zero
-        //
-        if (decoupledDestination != null) {
-            releaseDecoupledDestination();
-            
-        }
     }
 
     /**
@@ -960,67 +942,6 @@
         
     }
     
-    /**
-     * Set up the decoupled Destination if necessary.
-     */
-    private void setUpDecoupledDestination() {        
-        EndpointReferenceType reference =
-            EndpointReferenceUtils.getEndpointReference(
-                getClient().getDecoupledEndpoint());
-        if (reference != null) {
-            String decoupledAddress = reference.getAddress().getValue();
-            LOG.info("creating decoupled endpoint: " + decoupledAddress);
-            try {
-                decoupledDestination = getDestination(decoupledAddress);
-                duplicateDecoupledDestination();
-            } catch (Exception e) {
-                // REVISIT move message to localizable Messages.properties
-                LOG.log(Level.WARNING, 
-                        "decoupled endpoint creation failed: ", e);
-            }
-        }
-    }
-
-    /**
-     * @param address the address
-     * @return a Destination for the address
-     */
-    private Destination getDestination(String address) throws IOException {
-        Destination destination = null;
-        DestinationFactoryManager factoryManager =
-            bus.getExtension(DestinationFactoryManager.class);
-        DestinationFactory factory =
-            factoryManager.getDestinationFactoryForUri(address);
-        if (factory != null) {
-            EndpointInfo ei = new EndpointInfo();
-            ei.setAddress(address);
-            destination = factory.getDestination(ei);
-            decoupledObserver = new InterposedMessageObserver();
-            destination.setMessageObserver(decoupledObserver);
-        }
-        return destination;
-    }
-    
-    /**
-     * @return the decoupled observer
-     */
-    protected MessageObserver getDecoupledObserver() {
-        return decoupledObserver;
-    }
-    
-    private synchronized void duplicateDecoupledDestination() {
-        decoupledDestinationRefCount++;
-    }
-    
-    private synchronized void releaseDecoupledDestination() {
-        if (--decoupledDestinationRefCount == 0) {
-            LOG.log(Level.FINE, "shutting down decoupled destination");
-            decoupledDestination.shutdown();
-
-            //this way we can release the port of decoupled destination
-            decoupledDestination.setMessageObserver(null);
-        }
-    }
     
     /**
      * This predicate returns true iff the exchange indicates 
@@ -1031,14 +952,7 @@
     private boolean isOneway(Exchange exchange) {
         return exchange != null && exchange.isOneWay();
     }
-    
-    /**
-     * @return true if expecting a decoupled response
-     */
-    private boolean isDecoupled() {
-        return decoupledDestination != null;
-    }
-    
+
     /**
      * Get an input stream containing the partial response if one is present.
      * 
@@ -1344,6 +1258,8 @@
      */
     public void setClient(HTTPClientPolicy client) {
         this.clientSidePolicy = client;
+        client.addPropertyChangeListener(this);
+        endpointInfo.setProperty("org.apache.cxf.ws.addressing.replyto", client.getDecoupledEndpoint());
     }
 
     /**
@@ -2191,7 +2107,7 @@
             Exchange exchange = outMessage.getExchange();
 
             InputStream in = null;
-            if (isOneway(exchange) || isDecoupled()) {
+            if (isOneway(exchange)) {
                 in = getPartialResponse(connection, responseCode);
                 if (in == null) {
                     // oneway operation or decoupled MEP without 
@@ -2257,7 +2173,6 @@
             
         }
 
-
     }
     
     /**
@@ -2299,7 +2214,6 @@
             } catch (IOException e) {
                 e.printStackTrace();
             }
-
         }
     }
     
@@ -2315,5 +2229,14 @@
     public void setBasicAuthSupplier(HttpBasicAuthSupplier basicAuthSupplier) {
         setAuthSupplier(basicAuthSupplier);
     }
+
+    @Override
+    public void propertyChange(PropertyChangeEvent evt) {
+        if (evt.getSource() == clientSidePolicy
+            && "decoupledEndpoint".equals(evt.getPropertyName())) {
+            this.endpointInfo.setProperty("org.apache.cxf.ws.addressing.replyto",
+                                          evt.getNewValue());
+        }
+    }
     
 }

Modified: cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java (original)
+++ cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java Thu Feb 18 22:37:11 2010
@@ -21,7 +21,6 @@
 
 
 import java.io.OutputStream;
-import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -36,25 +35,15 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
-import org.easymock.classextension.EasyMock;
-import org.easymock.classextension.IMocksControl;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
-
 
 public class HTTPConduitTest extends Assert {
-    private Message inMessage;
-    private IMocksControl control;
 
     @Before
     public void setUp() throws Exception {
@@ -250,82 +239,5 @@
                 headers.get("Authorization").get(0));
     }
 
-    public void testDecoupledEndpoint() throws Exception {
-        control = EasyMock.createNiceControl();
-
-        Bus bus = new CXFBusImpl();
-
-        URL decoupledURL = new URL("http://nowhere.com/response");
-        DestinationFactoryManager mgr =
-            control.createMock(DestinationFactoryManager.class);
-        DestinationFactory factory =
-            control.createMock(DestinationFactory.class);
-        Destination destination =
-            control.createMock(Destination.class);
-
-        bus.setExtension(mgr, DestinationFactoryManager.class);
-        mgr.getDestinationFactoryForUri(decoupledURL.toString());
-        EasyMock.expectLastCall().andReturn(factory);
-        factory.getDestination(EasyMock.isA(EndpointInfo.class));
-        EasyMock.expectLastCall().andReturn(destination);
-        destination.setMessageObserver(
-                EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
-
-        control.replay();
-
-        EndpointInfo ei = new EndpointInfo();
-        ei.setAddress("http://nowhere.com/bar/foo");
-        HTTPConduit conduit = new HTTPConduit(bus, ei, null);
-        conduit.finalizeConfig();
-
-        // Test call
-        conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
-
-        assertNotNull("expected back channel", conduit.getBackChannel());
-
-        MessageObserver observer = new MessageObserver() {
-            public void onMessage(Message m) {
-                inMessage = m;
-            }
-        };
-
-        // Test call
-        conduit.setMessageObserver(observer);
-
-        Message incoming = new MessageImpl();
-        conduit.getDecoupledObserver().onMessage(incoming);
-
-        assertSame("expected pass thru onMessage() notification",
-                   inMessage,
-                   incoming);
-        assertEquals("unexpected response code",
-                     HttpURLConnection.HTTP_OK,
-                     inMessage.get(Message.RESPONSE_CODE));
-        assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
-                     Boolean.TRUE,
-                     inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
-        assertEquals("unexpected HTTP_REQUEST set",
-                     false,
-                     inMessage.containsKey(AbstractHTTPDestination.HTTP_REQUEST));
-        assertEquals("unexpected HTTP_RESPONSE set",
-                     false,
-                     inMessage.containsKey(AbstractHTTPDestination.HTTP_RESPONSE));
-        assertEquals("unexpected Message.ASYNC_POST_RESPONSE_DISPATCH set",
-                     false,
-                     inMessage.containsKey(Message.ASYNC_POST_RESPONSE_DISPATCH));
-
-        // avoid intermittent spurious failures on EasyMock detecting finalize
-        // calls by mocking up only class data members (no local variables)
-        // and explicitly making available for GC post-verify
-        finalVerify();
-        inMessage = null;
-    }
-
-    private void finalVerify() {
-        if (control != null) {
-            control.verify();
-            control = null;
-        }
-    }
 
 }

Modified: cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java (original)
+++ cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java Thu Feb 18 22:37:11 2010
@@ -41,9 +41,6 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.DestinationFactory;
-import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.easymock.classextension.EasyMock;
@@ -54,8 +51,6 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
-
 /**
  */
 public class HTTPConduitURLEasyMockTest extends Assert {
@@ -127,7 +122,7 @@
     @Test
     public void testSend() throws Exception {
         control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, false);
+        HTTPConduit conduit = setUpConduit(true, false);
         Message message = new MessageImpl();
         conduit.prepare(message);
         verifySentMessage(conduit, message);
@@ -137,7 +132,7 @@
     @Test
     public void testSendWithHeaders() throws Exception {
         control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, false);
+        HTTPConduit conduit = setUpConduit(true, false);
         Message message = new MessageImpl();
         message.put("Content-Type", "text/xml;charset=utf8");
         setUpHeaders(message);
@@ -149,7 +144,7 @@
     @Test
     public void testSendHttpConnection() throws Exception {
         control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, false);
+        HTTPConduit conduit = setUpConduit(true, false);
         Message message = new MessageImpl();
         conduit.prepare(message);
         verifySentMessage(conduit, message);
@@ -159,48 +154,18 @@
     @Test
     public void testSendHttpConnectionAutoRedirect() throws Exception {
         control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, true, false);
+        HTTPConduit conduit = setUpConduit(true, true);
         Message message = new MessageImpl();
         conduit.prepare(message);
         verifySentMessage(conduit, message);
         finalVerify();
     }
-
-    @Test
-    public void testSendOnewayExplicitLenghtPartialResponse()
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);        
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.NONE,
-                          ResponseDelimiter.LENGTH,
-                          false); // non-empty response
-        finalVerify();
-    }
-        
-    @Test
-    public void testSendOnewayChunkedPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.NONE,
-                          ResponseDelimiter.CHUNKED,
-                          false); // non-empty response
-        finalVerify();
-    }
-    
+            
     @Test
     public void testSendOnewayChunkedEmptyPartialResponse() 
         throws Exception {
         control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
+        HTTPConduit conduit = setUpConduit(true, false);
         Message message = new MessageImpl();
         conduit.prepare(message);
         verifySentMessage(conduit, 
@@ -210,111 +175,6 @@
                           true);  // empty response
         finalVerify();
     }
-
-    @Test
-    public void testSendOnewayEOFTerminatedPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.NONE,
-                          ResponseDelimiter.EOF,
-                          false); // non-empty response
-        finalVerify();
-    }
-    
-    @Test
-    public void testSendOnewayEOFTerminatedEmptyPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.NONE,
-                          ResponseDelimiter.EOF,
-                          true); // empty response
-        finalVerify();
-    }
-    
-    @Test
-    public void testSendDecoupledExplicitLenghtPartialResponse()
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.DECOUPLED,
-                          ResponseDelimiter.LENGTH,
-                          false); // non-empty response
-        finalVerify();
-    }
-
-    @Test
-    public void testSendDecoupledChunkedPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.DECOUPLED,
-                          ResponseDelimiter.CHUNKED,
-                          false); // non-empty response
-        finalVerify();
-    }
-    
-    @Test
-    public void testSendDecoupledChunkedEmptyPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.DECOUPLED,
-                          ResponseDelimiter.CHUNKED,
-                          true);  // empty response
-        finalVerify();
-    }
-
-    @Test
-    public void testSendDecoupledEOFTerminatedPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.DECOUPLED,
-                          ResponseDelimiter.EOF,
-                          false); // non-empty response
-        finalVerify();
-    }
-    
-    @Test
-    public void testSendDecoupledEOFTerminatedEmptyPartialResponse() 
-        throws Exception {
-        control = EasyMock.createNiceControl();
-        HTTPConduit conduit = setUpConduit(true, false, true);
-        Message message = new MessageImpl();
-        conduit.prepare(message);
-        verifySentMessage(conduit, 
-                          message, 
-                          ResponseStyle.DECOUPLED,
-                          ResponseDelimiter.EOF,
-                          true); // empty response
-        finalVerify();
-    }
     
     private void setUpHeaders(Message message) {
         Map<String, List<String>> headers = new HashMap<String, List<String>>();
@@ -346,8 +206,7 @@
 
     private HTTPConduit setUpConduit(
         boolean send,
-        boolean autoRedirect,
-        boolean decoupled
+        boolean autoRedirect
     ) throws Exception {
         endpointInfo = new EndpointInfo();
         endpointInfo.setAddress(NOWHERE + "bar/foo");
@@ -390,23 +249,6 @@
         }
 
         CXFBusImpl bus = new CXFBusImpl();
-        URL decoupledURL = null;
-        if (decoupled) {
-            decoupledURL = new URL(NOWHERE + "response");
-            DestinationFactoryManager mgr =
-                control.createMock(DestinationFactoryManager.class);
-            DestinationFactory factory =
-                control.createMock(DestinationFactory.class);
-            Destination destination =
-                control.createMock(Destination.class);
-
-            bus.setExtension(mgr, DestinationFactoryManager.class);
-            mgr.getDestinationFactoryForUri(decoupledURL.toString());
-            EasyMock.expectLastCall().andReturn(factory);
-            factory.getDestination(EasyMock.isA(EndpointInfo.class));
-            EasyMock.expectLastCall().andReturn(destination);
-            destination.setMessageObserver(EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
-        }
         
         control.replay();
         
@@ -426,13 +268,6 @@
             }
         }
 
-        if (decoupled) {
-            conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
-            assertNotNull("expected back channel", conduit.getBackChannel());
-        } else {
-            assertNull("unexpected back channel", conduit.getBackChannel());
-        }
-
         observer = new MessageObserver() {
             public void onMessage(Message m) {
                 inMessage = m;
@@ -537,10 +372,6 @@
             }
         }
         
-        if (style == ResponseStyle.DECOUPLED) {
-            verifyDecoupledResponse(conduit);
-        }
-        
         conduit.close();
         
         finalVerify();
@@ -644,30 +475,6 @@
         }
     }
     
-    private void verifyDecoupledResponse(HTTPConduit conduit)
-        throws IOException {
-        Message incoming = new MessageImpl();
-        conduit.getDecoupledObserver().onMessage(incoming);
-        assertSame("expected pass thru onMessage() notification",
-                   inMessage,
-                   incoming);
-        assertEquals("unexpected response code",
-                     HttpURLConnection.HTTP_OK,
-                     inMessage.get(Message.RESPONSE_CODE));
-        assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
-                     Boolean.TRUE,
-                     inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
-        assertEquals("unexpected HTTP_REQUEST set",
-                     false,
-                     inMessage.containsKey(AbstractHTTPDestination.HTTP_REQUEST));
-        assertEquals("unexpected HTTP_RESPONSE set",
-                     false,
-                     inMessage.containsKey(AbstractHTTPDestination.HTTP_RESPONSE));
-        assertEquals("unexpected Message.ASYNC_POST_RESPONSE_DISPATCH set",
-                     false,
-                     inMessage.containsKey(Message.ASYNC_POST_RESPONSE_DISPATCH));
-    }
-
     private void finalVerify() {
         if (control != null) {
             control.verify();

Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/MAPAggregator.java Thu Feb 18 22:37:11 2010
@@ -19,6 +19,9 @@
 
 package org.apache.cxf.ws.addressing;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.Iterator;
@@ -33,15 +36,22 @@
 import javax.xml.namespace.QName;
 import javax.xml.ws.WebFault;
 
+import org.apache.cxf.Bus;
 import org.apache.cxf.binding.soap.SoapBindingConstants;
 import org.apache.cxf.binding.soap.SoapFault;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientLifeCycleListener;
+import org.apache.cxf.endpoint.ClientLifeCycleManager;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.OneWayProcessorInterceptor;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.FaultMode;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.AbstractPhaseInterceptor;
@@ -54,10 +64,15 @@
 import org.apache.cxf.service.model.UnwrappedOperationInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.Observable;
 import org.apache.cxf.ws.addressing.VersionTransformer.Names200408;
 import org.apache.cxf.ws.addressing.policy.MetadataConstants;
 import org.apache.cxf.ws.policy.AssertionInfo;
 import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
 
 /**
@@ -66,12 +81,30 @@
  */
 public class MAPAggregator extends AbstractPhaseInterceptor<Message> {
     public static final String USING_ADDRESSING = MAPAggregator.class.getName() + ".usingAddressing";
+    public static final String DECOUPLED_DESTINATION = MAPAggregator.class.getName() 
+        + ".decoupledDestination";
 
     private static final Logger LOG = 
         LogUtils.getL7dLogger(MAPAggregator.class);
     private static final ResourceBundle BUNDLE = LOG.getResourceBundle();
 
-    
+    private static final ClientLifeCycleListener DECOUPLED_DEST_CLEANER = new ClientLifeCycleListener() {
+        @Override
+        public void clientCreated(Client client) {
+            //ignore
+        }
+
+        @Override
+        public void clientDestroyed(Client client) {
+            Destination dest = client.getEndpoint().getEndpointInfo()
+                .getProperty(DECOUPLED_DESTINATION, Destination.class);
+            if (dest != null) {
+                dest.setMessageObserver(null);
+                dest.shutdown();
+            }
+        }
+        
+    };
 
     /**
      * REVISIT: map usage implies that the *same* interceptor instance 
@@ -582,10 +615,10 @@
             // add request-specific MAPs
             boolean isOneway = exchange.isOneWay();
             boolean isOutbound = ContextUtils.isOutbound(message);
-            Conduit conduit = null;
             
             // To
             if (maps.getTo() == null) {
+                Conduit conduit = null;
                 if (isOutbound) {
                     conduit = ContextUtils.getConduit(conduit, message);
                 }
@@ -600,13 +633,7 @@
             // current invocation
             EndpointReferenceType replyTo = maps.getReplyTo();
             if (ContextUtils.isGenericAddress(replyTo)) {
-                conduit = ContextUtils.getConduit(conduit, message);
-                if (conduit != null) {
-                    Destination backChannel = conduit.getBackChannel();
-                    if (backChannel != null) {
-                        replyTo = backChannel.getAddress();
-                    }
-                }
+                replyTo = getReplyTo(message);
                 if (replyTo == null
                     || (isOneway
                         && (replyTo.getAddress() == null
@@ -663,6 +690,122 @@
         }
     }
 
+    private EndpointReferenceType getReplyTo(Message message) {
+        Exchange exchange = message.getExchange();
+        Endpoint info = exchange.get(Endpoint.class);
+        if (info == null) {
+            return null;
+        }
+        synchronized (info) {
+            EndpointInfo ei = info.getEndpointInfo();
+            Destination dest = ei.getProperty(DECOUPLED_DESTINATION, Destination.class);
+            if (dest == null) {
+                dest = createDecoupledDestination(message);
+                if (dest != null) {
+                    info.getEndpointInfo().setProperty(DECOUPLED_DESTINATION, dest);
+                }
+            }
+            if (dest != null) {
+                return dest.getAddress();
+            }
+        }
+        return null;
+    }
+
+    private Destination createDecoupledDestination(Message message) {
+        String replyToAddress = (String)message.getContextualProperty(WSAContextUtils.REPLYTO_PROPERTY);
+        if (replyToAddress != null) {
+            return setUpDecoupledDestination(message.getExchange().get(Bus.class),
+                                             replyToAddress, 
+                                             message);
+        }
+        return null;
+    }
+    /**
+     * Set up the decoupled Destination if necessary.
+     */
+    private Destination setUpDecoupledDestination(Bus bus, String replyToAddress, Message message) {        
+        EndpointReferenceType reference =
+            EndpointReferenceUtils.getEndpointReference(replyToAddress);
+        if (reference != null) {
+            String decoupledAddress = reference.getAddress().getValue();
+            LOG.info("creating decoupled endpoint: " + decoupledAddress);
+            try {
+                Destination dest = getDestination(bus, replyToAddress, message);
+                bus.getExtension(ClientLifeCycleManager.class).registerListener(DECOUPLED_DEST_CLEANER);
+                return dest;
+            } catch (Exception e) {
+                // REVISIT move message to localizable Messages.properties
+                LOG.log(Level.WARNING, 
+                        "decoupled endpoint creation failed: ", e);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param address the address
+     * @return a Destination for the address
+     */
+    private Destination getDestination(Bus bus, String address, Message message) throws IOException {
+        Destination destination = null;
+        DestinationFactoryManager factoryManager =
+            bus.getExtension(DestinationFactoryManager.class);
+        DestinationFactory factory =
+            factoryManager.getDestinationFactoryForUri(address);
+        if (factory != null) {
+            EndpointInfo ei = new EndpointInfo();
+            ei.setAddress(address);
+            destination = factory.getDestination(ei);
+            Conduit conduit = ContextUtils.getConduit(null, message);
+            if (conduit instanceof Observable) {
+                MessageObserver ob = ((Observable)conduit).getMessageObserver();
+                ob = new InterposedMessageObserver(bus, ob);
+                destination.setMessageObserver(ob);
+            }
+        }
+        return destination;
+    }
+    protected static class InterposedMessageObserver implements MessageObserver {
+        Bus bus;
+        MessageObserver observer;
+        public InterposedMessageObserver(Bus b, MessageObserver o) {
+            bus = b;
+            observer = o;
+        }
+        
+        /**
+         * Called for an incoming message.
+         * 
+         * @param inMessage
+         */
+        public void onMessage(Message inMessage) {
+            // disposable exchange, swapped with real Exchange on correlation
+            inMessage.setExchange(new ExchangeImpl());
+            inMessage.getExchange().put(Bus.class, bus);
+            inMessage.put(Message.DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
+            inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
+
+            // remove server-specific properties
+            //inMessage.remove(AbstractHTTPDestination.HTTP_REQUEST);
+            //inMessage.remove(AbstractHTTPDestination.HTTP_RESPONSE);
+            inMessage.remove(Message.ASYNC_POST_RESPONSE_DISPATCH);
+
+            //cache this inputstream since it's defer to use in case of async
+            try {
+                InputStream in = inMessage.getContent(InputStream.class);
+                if (in != null) {
+                    CachedOutputStream cos = new CachedOutputStream();
+                    IOUtils.copy(in, cos);
+                    inMessage.setContent(InputStream.class, cos.getInputStream());
+                }
+                observer.onMessage(inMessage);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
     /**
      * Get the starting point MAPs (either empty or those set explicitly
      * by the application on the binding provider request context).

Modified: cxf/trunk/rt/ws/addr/src/test/java/org/apache/cxf/ws/addressing/MAPAggregatorTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/test/java/org/apache/cxf/ws/addressing/MAPAggregatorTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/test/java/org/apache/cxf/ws/addressing/MAPAggregatorTest.java (original)
+++ cxf/trunk/rt/ws/addr/src/test/java/org/apache/cxf/ws/addressing/MAPAggregatorTest.java Thu Feb 18 22:37:11 2010
@@ -464,7 +464,7 @@
         setUpExchangeGet(exchange, Endpoint.class, endpoint);
         EndpointInfo endpointInfo = control.createMock(EndpointInfo.class);
         endpoint.getEndpointInfo();
-        EasyMock.expectLastCall().andReturn(endpointInfo);
+        EasyMock.expectLastCall().andReturn(endpointInfo).anyTimes();
         List<ExtensibilityElement> endpointExts =
             new ArrayList<ExtensibilityElement>();
         endpointInfo.getExtensors(EasyMock.eq(ExtensibilityElement.class));

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java Thu Feb 18 22:37:11 2010
@@ -55,6 +55,7 @@
 import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.JAXWSAConstants;
+import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.addressing.Names;
 import org.apache.cxf.ws.policy.EffectivePolicy;
 import org.apache.cxf.ws.policy.EndpointPolicy;
@@ -228,11 +229,13 @@
         return replyTo;
     }
 
-    void initialise(Conduit c, org.apache.cxf.ws.addressing.EndpointReferenceType r) {
+    void initialise(Conduit c, 
+                    org.apache.cxf.ws.addressing.EndpointReferenceType r,
+                    org.apache.cxf.transport.Destination d) {
         conduit = c;
         replyTo = r;
         createService();
-        createEndpoint();
+        createEndpoint(d);
         setPolicies();
     }
 
@@ -281,12 +284,15 @@
         return rmSchema;
     }
 
-    void createEndpoint() {
+    void createEndpoint(org.apache.cxf.transport.Destination d) {
         ServiceInfo si = service.getServiceInfo();
         buildBindingInfo(si);
         EndpointInfo aei = applicationEndpoint.getEndpointInfo();
         String transportId = aei.getTransportId();
         EndpointInfo ei = new EndpointInfo(si, transportId);
+        if (d != null) {
+            ei.setProperty(MAPAggregator.DECOUPLED_DESTINATION, d);
+        }
 
         ei.setAddress(aei.getAddress());
 

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu Feb 18 22:37:11 2010
@@ -54,6 +54,7 @@
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
+import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.addressing.RelatesToType;
 import org.apache.cxf.ws.addressing.VersionTransformer;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
@@ -270,7 +271,12 @@
                 AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
                 replyTo = maps.getReplyTo();
             }
-            rme.initialise(message.getExchange().getConduit(message), replyTo);
+            Endpoint ei = message.getExchange().get(Endpoint.class);
+            org.apache.cxf.transport.Destination dest 
+                = ei == null ? null : ei.getEndpointInfo()
+                    .getProperty(MAPAggregator.DECOUPLED_DESTINATION, 
+                             org.apache.cxf.transport.Destination.class);
+            rme.initialise(message.getExchange().getConduit(message), replyTo, dest);
             reliableEndpoints.put(endpoint, rme);
             LOG.fine("Created new RMEndpoint.");
         }
@@ -320,8 +326,11 @@
                 to = RMUtils.createReference(maps.getTo().getValue());
                 acksTo = VersionTransformer.convert(maps.getReplyTo());
                 if (RMConstants.getNoneAddress().equals(acksTo.getAddress().getValue())) {
-                    org.apache.cxf.transport.Destination dest = message.getExchange().getConduit(message)
-                        .getBackChannel();
+                    Endpoint ei = message.getExchange().get(Endpoint.class);
+                    org.apache.cxf.transport.Destination dest 
+                        = ei == null ? null : ei.getEndpointInfo()
+                                .getProperty(MAPAggregator.DECOUPLED_DESTINATION, 
+                                         org.apache.cxf.transport.Destination.class);
                     if (null == dest) {
                         acksTo = RMUtils.createAnonymousReference2004();
                     } else {
@@ -408,7 +417,7 @@
                 LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
                         new Object[] {null == conduit ? "client" : "server", id});
                 rme = createReliableEndpoint(endpoint);
-                rme.initialise(conduit, null);
+                rme.initialise(conduit, null, null);
                 reliableEndpoints.put(endpoint, rme);
             }
             rme.getSource().addSequence(ss, false);

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMEndpointTest.java Thu Feb 18 22:37:11 2010
@@ -132,13 +132,14 @@
     @Test
     public void testInitialise() throws NoSuchMethodException {
         Method m1 = RMEndpoint.class.getDeclaredMethod("createService", new Class[] {});
-        Method m2 = RMEndpoint.class.getDeclaredMethod("createEndpoint", new Class[] {});
+        Method m2 = RMEndpoint.class
+            .getDeclaredMethod("createEndpoint", new Class[] {org.apache.cxf.transport.Destination.class});
         Method m3 = RMEndpoint.class.getDeclaredMethod("setPolicies", new Class[] {});
 
         rme = control.createMock(RMEndpoint.class, new Method[] {m1, m2, m3});
         rme.createService();
         EasyMock.expectLastCall();
-        rme.createEndpoint();
+        rme.createEndpoint(null);
         EasyMock.expectLastCall();
         rme.setPolicies();
         EasyMock.expectLastCall();
@@ -146,7 +147,7 @@
         org.apache.cxf.ws.addressing.EndpointReferenceType epr = control
             .createMock(org.apache.cxf.ws.addressing.EndpointReferenceType.class);
         control.replay();
-        rme.initialise(c, epr);
+        rme.initialise(c, epr, null);
         assertSame(c, rme.getConduit());
         assertSame(epr, rme.getReplyTo());
     }
@@ -188,7 +189,7 @@
         EasyMock.expect(rme.getUsingAddressing(aei)).andReturn(ua);
         control.replay();
         rme.createService();
-        rme.createEndpoint();
+        rme.createEndpoint(null);
         Endpoint e = rme.getEndpoint();
         WrappedEndpoint we = (WrappedEndpoint)e;
         assertSame(ae, we.getWrappedEndpoint());

Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=911624&r1=911623&r2=911624&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Thu Feb 18 22:37:11 2010
@@ -207,7 +207,7 @@
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
-        EasyMock.expect(message.getExchange()).andReturn(exchange).times(3);
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
         WrappedEndpoint wre = control.createMock(WrappedEndpoint.class);
         EasyMock.expect(exchange.get(Endpoint.class)).andReturn(wre);
         EndpointInfo ei = control.createMock(EndpointInfo.class);
@@ -227,7 +227,7 @@
         EndpointReferenceType replyTo = RMUtils.createAnonymousReference();
         EasyMock.expect(maps.getReplyTo()).andReturn(replyTo);
         EasyMock.expect(exchange.getConduit(message)).andReturn(null);
-        rme.initialise(null, replyTo);
+        rme.initialise(null, replyTo, null);
         EasyMock.expectLastCall();
 
         control.replay();
@@ -254,7 +254,7 @@
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         Message message = control.createMock(Message.class);
         Exchange exchange = control.createMock(Exchange.class);
-        EasyMock.expect(message.getExchange()).andReturn(exchange).times(3);
+        EasyMock.expect(message.getExchange()).andReturn(exchange).anyTimes();
         Endpoint endpoint = control.createMock(Endpoint.class);
         EasyMock.expect(exchange.get(Endpoint.class)).andReturn(endpoint);
         EndpointInfo ei = control.createMock(EndpointInfo.class);
@@ -266,7 +266,7 @@
         EasyMock.expect(exchange.getDestination()).andReturn(null);
         Conduit conduit = control.createMock(Conduit.class);
         EasyMock.expect(exchange.getConduit(message)).andReturn(conduit);
-        rme.initialise(conduit, null);
+        rme.initialise(conduit, null, null);
         EasyMock.expectLastCall();
 
         control.replay();
@@ -380,7 +380,6 @@
         EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null).anyTimes();
         Conduit conduit = control.createMock(Conduit.class);
         EasyMock.expect(exchange.getConduit(message)).andReturn(conduit).anyTimes();
-        EasyMock.expect(conduit.getBackChannel()).andReturn(null).anyTimes();
         Identifier inSid = control.createMock(Identifier.class);        
         AddressingProperties maps = control.createMock(AddressingProperties.class);
         Source source = control.createMock(Source.class);



Mime
View raw message