cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From egl...@apache.org
Subject svn commit: r501388 [1/2] - in /incubator/cxf/trunk: api/src/main/java/org/apache/cxf/wsdl/ integration/jbi/src/main/java/org/apache/cxf/jbi/transport/ rt/core/src/main/java/org/apache/cxf/transport/ rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws...
Date Tue, 30 Jan 2007 12:21:07 GMT
Author: eglynn
Date: Tue Jan 30 04:21:03 2007
New Revision: 501388

URL: http://svn.apache.org/viewvc?view=rev&rev=501388
Log:
Refactored non-HTTP transports to extend Abstract{Destination|Conduit}. 

Modified:
    incubator/cxf/trunk/api/src/main/java/org/apache/cxf/wsdl/EndpointReferenceUtils.java
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
    incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java
    incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractDestination.java
    incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/servlet/ServletDestination.java
    incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
    incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java
    incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java
    incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
    incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java
    incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportBase.java
    incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
    incubator/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    incubator/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
    incubator/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java

Modified: incubator/cxf/trunk/api/src/main/java/org/apache/cxf/wsdl/EndpointReferenceUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/api/src/main/java/org/apache/cxf/wsdl/EndpointReferenceUtils.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/api/src/main/java/org/apache/cxf/wsdl/EndpointReferenceUtils.java (original)
+++ incubator/cxf/trunk/api/src/main/java/org/apache/cxf/wsdl/EndpointReferenceUtils.java Tue Jan 30 04:21:03 2007
@@ -68,6 +68,8 @@
  */
 public final class EndpointReferenceUtils {
 
+    public static final String ANONYMOUS_ADDRESS = "http://www.w3.org/2005/08/addressing/anonymous";
+
     static WeakHashMap<ServiceInfo, Schema> schemaMap = new WeakHashMap<ServiceInfo, Schema>();
 
     private static final Logger LOG = LogUtils.getL7dLogger(EndpointReferenceUtils.class);
@@ -550,7 +552,7 @@
     }
     
     /**
-     * Create an endpoint reference for the provided .
+     * Create an endpoint reference for the provided address.
      * @param address - address URI
      * @return EndpointReferenceType - the endpoint reference
      */
@@ -560,6 +562,18 @@
         setAddress(reference, address);
         return reference;
     }
+    
+    /**
+     * Create an anonymous endpoint reference.
+     * @return EndpointReferenceType - the endpoint reference
+     */
+    public static EndpointReferenceType getAnonymousEndpointReference() {
+        
+        EndpointReferenceType reference = new EndpointReferenceType();
+        setAddress(reference, ANONYMOUS_ADDRESS);
+        return reference;
+    }
+
 
     /**
      * Get the WebService for the provided class.  If the class

Modified: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java (original)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIConduit.java Tue Jan 30 04:21:03 2007
@@ -30,58 +30,29 @@
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 
-public class JBIConduit implements Conduit {
-    
+public class JBIConduit extends AbstractConduit {
+
     private static final Logger LOG = LogUtils.getL7dLogger(JBIConduit.class);
-       
-    private MessageObserver incomingObserver;
-    private EndpointReferenceType target;
+
     private DeliveryChannel channel;
            
-    
-    
     public JBIConduit(EndpointReferenceType target, DeliveryChannel dc) {           
-        this.target = target;
+        super(target);
         channel = dc;
     }
 
+    protected Logger getLogger() {
+        return LOG;
+    }
+    
     public void send(Message message) throws IOException {
-        LOG.log(Level.FINE, "JBIConduit send message");
+        getLogger().log(Level.FINE, "JBIConduit send message");
                 
         message.setContent(OutputStream.class,
                            new JBIConduitOutputStream(message, channel, target, this));
-    }
-
-    public void close(Message message) throws IOException {
-        message.getContent(OutputStream.class).close();        
-    }
-
-    public EndpointReferenceType getTarget() {
-        return target;
-    }
-
-    public Destination getBackChannel() {
-        return null;
-    }
-
-    public void close() {
-        
-    }
-
-    public void setMessageObserver(MessageObserver observer) {
-        incomingObserver = observer;     
-    }
-    
-    public MessageObserver getMessageObserver() {
-        return incomingObserver;
-    }
-
-    
-     
+    }    
 }

Modified: incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java (original)
+++ incubator/cxf/trunk/integration/jbi/src/main/java/org/apache/cxf/jbi/transport/JBIDestination.java Tue Jan 30 04:21:03 2007
@@ -37,22 +37,21 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
-public class JBIDestination implements Destination {
-
+public class JBIDestination extends AbstractDestination {
+    
     private static final Logger LOG = LogUtils.getL7dLogger(JBIDestination.class);
+    
     private final DeliveryChannel channel;
     private final CXFServiceUnitManager suManager; 
     private ConduitInitiator conduitInitiator;
-    private EndpointInfo endpointInfo;
-    private EndpointReferenceType reference;
-    private MessageObserver incomingObserver;
     private JBIDispatcher dispatcher;
     private volatile boolean running; 
     
@@ -60,106 +59,51 @@
                           EndpointInfo info,
                           DeliveryChannel dc,
                           CXFServiceUnitManager sum) {
+        super(getTargetReference(info.getAddress()), info);
         this.conduitInitiator = ci;
-        this.endpointInfo = info;
         this.channel = dc;
         this.suManager = sum;
-        reference = new EndpointReferenceType();
-        AttributedURIType address = new AttributedURIType();
-        address.setValue(endpointInfo.getAddress());
-        reference.setAddress(address);        
-    }
-    
-    public EndpointReferenceType getAddress() {
-        return reference;
-    }
-
-    public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType address)
-        throws IOException {
-        Conduit backChannel = null;
-        if (address == null) {
-            backChannel = new BackChannelConduit(address, inMessage, this);
-        } else {
-            if (partialResponse != null) {
-                // just send back the partialResponse 
-                backChannel = new BackChannelConduit(address, inMessage , this);
-            } else {                
-                backChannel = conduitInitiator.getConduit(endpointInfo, address);
-                // ensure decoupled back channel input stream is closed
-                backChannel.setMessageObserver(new MessageObserver() {
-                    public void onMessage(Message m) {
-                        //need to set up the headers 
-                        if (m.getContentFormats().contains(InputStream.class)) {
-                            InputStream is = m.getContent(InputStream.class);
-                            try {
-                                is.close();
-                            } catch (Exception e) {
-                                // ignore
-                            }
-                        }
-                    }
-                });
-            }
-        }
-        return backChannel;
-
     }
-
-    public void shutdown() {
-        running = false;
+    
+    protected Logger getLogger() {
+        return LOG;
     }
-
-    public void setMessageObserver(MessageObserver observer) {
-        if (null != observer) {
-            try {
-                activate();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        } else {
-            LOG.log(Level.FINE, "JBIDestination shutdown()");
-            try {
-                deactivate();
-            } catch (IOException e) {
-                //Ignore for now.
-            }
-        }
-        incomingObserver = observer;
-
+    
+    /**
+     * @param inMessage the incoming message
+     * @return the inbuilt backchannel
+     */
+    protected Conduit getInbuiltBackChannel(Message inMessage) {
+        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
+                                      inMessage);
     }
     
-    public MessageObserver getMessageObserver() {
-        return incomingObserver;
+    public void shutdown() {
+        running = false;
     }
-
-    private void deactivate() throws IOException {
+    
+    public void deactivate() {
         running = false;
     }
 
-    private void activate() throws IOException {
-        LOG.info(new org.apache.cxf.common.i18n.Message(
-            "ACTIVE.JBI.SERVER.TRANSPORT", LOG).toString());
+    public void activate()  {
+        getLogger().info(new org.apache.cxf.common.i18n.Message(
+            "ACTIVE.JBI.SERVER.TRANSPORT", getLogger()).toString());
         dispatcher = new JBIDispatcher();
         new Thread(dispatcher).start();
     }
     
     // this should deal with the cxf message 
-    protected class BackChannelConduit implements Conduit {
+    protected class BackChannelConduit extends AbstractConduit {
         
         protected Message inMessage;
-        protected EndpointReferenceType target;
         protected JBIDestination jbiDestination;
                 
-        BackChannelConduit(EndpointReferenceType ref, Message message, JBIDestination dest) {
+        BackChannelConduit(EndpointReferenceType ref, Message message) {
+            super(ref);
             inMessage = message;
-            target = ref;
-            jbiDestination = dest;
         }
         
-        public void close(Message msg) throws IOException {
-            msg.getContent(OutputStream.class).close();        
-        }
-
         /**
          * Register a message observer for incoming messages.
          * 
@@ -181,29 +125,10 @@
                 inMessage.get(JBIConstants.MESSAGE_EXCHANGE_PROPERTY));
             message.setContent(OutputStream.class,
                                new JBIDestinationOutputStream(inMessage, channel));
-        }
-        
-        /**
-         * @return the reference associated with the target Destination
-         */    
-        public EndpointReferenceType getTarget() {
-            return target;
-        }
-        
-        /**
-         * Retreive the back-channel Destination.
-         * 
-         * @return the backchannel Destination (or null if the backchannel is
-         * built-in)
-         */
-        public Destination getBackChannel() {
-            return null;
-        }
-        
-        /**
-         * Close the conduit
-         */
-        public void close() {
+        }        
+
+        protected Logger getLogger() {
+            return LOG;
         }
     }
 
@@ -213,8 +138,8 @@
             
             try { 
                 running = true;
-                LOG.info(new org.apache.cxf.common.i18n.Message(
-                    "RECEIVE.THREAD.START", LOG).toString());
+                getLogger().info(new org.apache.cxf.common.i18n.Message(
+                    "RECEIVE.THREAD.START", getLogger()).toString());
                 do {
                     MessageExchange exchange = null;
                     synchronized (channel) {
@@ -230,12 +155,12 @@
                         try { 
                             Thread.currentThread().setContextClassLoader(csu.getClassLoader());
                             if (csu != null) { 
-                                LOG.info(new org.apache.cxf.common.i18n.Message(
-                                    "DISPATCH.TO.SU", LOG).toString());
+                                getLogger().info(new org.apache.cxf.common.i18n.Message(
+                                    "DISPATCH.TO.SU", getLogger()).toString());
                                 dispatch(exchange);
                             } else {
-                                LOG.info(new org.apache.cxf.common.i18n.Message(
-                                    "NO.SU.FOUND", LOG).toString());
+                                getLogger().info(new org.apache.cxf.common.i18n.Message(
+                                    "NO.SU.FOUND", getLogger()).toString());
                             }
                         } finally { 
                             Thread.currentThread().setContextClassLoader(oldLoader);
@@ -243,18 +168,18 @@
                     } 
                 } while(running);
             } catch (Exception ex) {
-                LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
-                    "ERROR.DISPATCH.THREAD", LOG).toString(), ex);
+                getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                    "ERROR.DISPATCH.THREAD", getLogger()).toString(), ex);
             } 
-            LOG.fine(new org.apache.cxf.common.i18n.Message(
-                "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT", LOG).toString());
+            getLogger().fine(new org.apache.cxf.common.i18n.Message(
+                "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT", getLogger()).toString());
         }
 
     }
     
     private void dispatch(MessageExchange exchange) throws IOException {
         QName opName = exchange.getOperation(); 
-        LOG.fine("dispatch method: " + opName);
+        getLogger().fine("dispatch method: " + opName);
         
         NormalizedMessage nm = exchange.getMessage("in");
         try {
@@ -274,8 +199,8 @@
                 getMessageObserver().onMessage(inMessage);
             
         } catch (Exception ex) {
-            LOG.log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
-                "ERROR.PREPARE.MESSAGE", LOG).toString(), ex);
+            getLogger().log(Level.SEVERE, new org.apache.cxf.common.i18n.Message(
+                "ERROR.PREPARE.MESSAGE", getLogger()).toString(), ex);
             throw new IOException(ex.getMessage());
         }
 

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractConduit.java Tue Jan 30 04:21:03 2007
@@ -40,8 +40,6 @@
         target = t;
     }
 
-    protected abstract Logger getLogger();
-    
     /**
      * @return the reference associated with the target Destination
      */    
@@ -65,14 +63,35 @@
     public void close(Message msg) throws IOException {
         msg.getContent(OutputStream.class).close();        
     }
+    
+    /**
+     * Close the conduit.
+     */
+    public void close() {
+        // nothing to do by default
+    }
 
     /**
      * Register a message observer for incoming messages.
      * 
      * @param observer the observer to notify on receipt of incoming
+     * message
      */
     public void setMessageObserver(MessageObserver observer) {
         incomingObserver = observer;
         getLogger().info("registering incoming observer: " + incomingObserver);
     }
+    
+    /**
+     * @return the observer to notify on receipt of incoming message
+     */
+    public MessageObserver getMessageObserver() {
+        return incomingObserver;
+    }
+
+    /**
+     * @return the logger to use
+     */
+    protected abstract Logger getLogger();
+
 }

Modified: incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractDestination.java (original)
+++ incubator/cxf/trunk/rt/core/src/main/java/org/apache/cxf/transport/AbstractDestination.java Tue Jan 30 04:21:03 2007
@@ -26,7 +26,9 @@
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
 /**
  * Abstract base class factoring out common Destination logic, 
@@ -45,8 +47,6 @@
         endpointInfo = ei;
     }
     
-    protected abstract Logger getLogger();
-    
     /**
      * @return the reference associated with this Destination
      */    
@@ -118,18 +118,25 @@
             if (observer != null) {
                 getLogger().info("registering incoming observer: " + observer);
                 if (old == null) {
-                    activateIncoming();
+                    activate();
                 }
             } else {
                 getLogger().info("unregistering incoming observer: " + incomingObserver);
                 if (old != null) {
-                    deactivateIncoming();
+                    deactivate();
                 }
             }
         }
     }
     
     /**
+     * @return the observer to notify on receipt of incoming message
+     */
+    public MessageObserver getMessageObserver() {
+        return incomingObserver;
+    }
+    
+    /**
      * Shutdown the Destination, i.e. stop accepting incoming messages.
      */
     public void shutdown() {
@@ -160,16 +167,62 @@
     /**
      * Activate receipt of incoming messages.
      */
-    protected abstract void activateIncoming();
+    protected void activate() {
+        // nothing to do by default
+    }
 
     /**
      * Deactivate receipt of incoming messages.
      */
-    protected abstract void deactivateIncoming();
+    protected void deactivate() {
+        // nothing to do by default        
+    }
+    
+    /**
+     * Get the exposed reference.
+     * 
+     * @param address the corresponding EndpointInfo
+     * @return the actual reference
+     */
+    protected static EndpointReferenceType getTargetReference(String addr) {
+        EndpointReferenceType ref = new EndpointReferenceType();
+        AttributedURIType address = new AttributedURIType();
+        address.setValue(addr);
+        ref.setAddress(address);        
+        return ref;
+    }
     
     /**
+     * @return the logger to use
+     */
+    protected abstract Logger getLogger();
+
+    /**
      * @param inMessage the incoming message
      * @return the inbuilt backchannel
      */
     protected abstract Conduit getInbuiltBackChannel(Message inMessage);
+    
+    /**
+     * Backchannel conduit.
+     */
+    protected abstract class AbstractBackChannelConduit extends AbstractConduit {
+
+        public AbstractBackChannelConduit() {
+            super(EndpointReferenceUtils.getAnonymousEndpointReference());
+        }
+
+        /**
+         * Register a message observer for incoming messages.
+         * 
+         * @param observer the observer to notify on receipt of incoming
+         */
+        public void setMessageObserver(MessageObserver observer) {
+            // shouldn't be called for a back channel conduit
+        }
+        
+        protected Logger getLogger() {
+            return AbstractDestination.this.getLogger();
+        }
+    }
 }

Modified: incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/servlet/ServletDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/servlet/ServletDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/servlet/ServletDestination.java (original)
+++ incubator/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/servlet/ServletDestination.java Tue Jan 30 04:21:03 2007
@@ -20,7 +20,6 @@
 package org.apache.cxf.jaxws.servlet;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
@@ -44,15 +43,14 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 
-public class ServletDestination implements Destination {
+public class ServletDestination extends AbstractDestination {
 
     public static final String HTTP_REQUEST =
         "HTTP_SERVLET_REQUEST";
@@ -65,11 +63,8 @@
 
     protected final Bus bus;
     protected final ConduitInitiator conduitInitiator;
-    protected final EndpointInfo endpointInfo;
-    protected final EndpointReferenceType reference;
     protected String name;
     protected URL nurl;
-    protected MessageObserver incomingObserver;
     
     
     /**
@@ -85,21 +80,9 @@
                               ConduitInitiator ci,
                               EndpointInfo ei)
         throws IOException {
+        super(getTargetReference(ei.getAddress()), ei);  
         bus = b;
         conduitInitiator = ci;
-        endpointInfo = ei;
-        
-        reference = new EndpointReferenceType();
-        AttributedURIType add = new AttributedURIType();
-        add.setValue(ei.getAddress());
-        reference.setAddress(add);
-    }
-
-    /**
-     * @return the reference associated with this Destination
-     */    
-    public EndpointReferenceType getAddress() {
-        return reference;
     }
 
     /**
@@ -147,68 +130,32 @@
         }
     }
     
-
-
-    /**
-     * Register a message observer for incoming messages.
-     * 
-     * @param observer the observer to notify on receipt of incoming
-     */
-    public synchronized void setMessageObserver(MessageObserver observer) {
-        LOG.info("set the observer for address " + getAddress().getAddress().getValue());
-        incomingObserver = observer;
+    protected Logger getLogger() {
+        return LOG;
     }
-    
+
     /**
-     * Retreive a back-channel Conduit, which must be policy-compatible
-     * with the current Message and associated Destination. For example
-     * compatible Quality of Protection must be asserted on the back-channel.
-     * This would generally only be an issue if the back-channel is decoupled.
-     * 
-     * @param inMessage the current inbound message (null to indicate a 
-     * disassociated back-channel)
-     * @param partialResponse in the decoupled case, this is expected to be the
-     * outbound Message to be sent over the in-built back-channel. 
-     * @param address the backchannel address (null to indicate anonymous)
-     * @return a suitable Conduit
+     * @param inMessage the incoming message
+     * @return the inbuilt backchannel
      */
-    public Conduit getBackChannel(Message inMessage,
-                                  Message partialResponse,
-                                  EndpointReferenceType address) throws IOException {
+    protected Conduit getInbuiltBackChannel(Message inMessage) {
         HttpServletResponse response = (HttpServletResponse)inMessage.get(HTTP_RESPONSE);
-        Conduit backChannel = null;
-        if (address == null) {
-            backChannel = new BackChannelConduit(address, response);
-        } else {
-            if (partialResponse != null) {
-                // setup the outbound message to for 202 Accepted
-                partialResponse.put(Message.RESPONSE_CODE,
-                                    HttpURLConnection.HTTP_ACCEPTED);
-                backChannel = new BackChannelConduit(address, response);
-            } else {
-                backChannel = conduitInitiator.getConduit(endpointInfo, address);
-                // ensure decoupled back channel input stream is closed
-                backChannel.setMessageObserver(new MessageObserver() {
-                    public void onMessage(Message m) {
-                        if (m.getContentFormats().contains(InputStream.class)) {
-                            InputStream is = m.getContent(InputStream.class);
-                            try {
-                                is.close();
-                            } catch (Exception e) {
-                                // ignore
-                            }
-                        }
-                    }
-                });
-            }
-        }
-        return backChannel;
+        return new BackChannelConduit(response);
     }
-
+   
     /**
-     * Shutdown the Destination, i.e. stop accepting incoming messages.
+     * Mark message as a partial message.
+     * 
+     * @param partialResponse the partial response message
+     * @param the decoupled target
+     * @return true iff partial responses are supported
      */
-    public void shutdown() {  
+    protected boolean markPartialResponse(Message partialResponse,
+                                          EndpointReferenceType decoupledTarget) {
+        partialResponse.put(Message.RESPONSE_CODE,
+                            HttpURLConnection.HTTP_ACCEPTED);
+        partialResponse.getExchange().put(EndpointReferenceType.class, decoupledTarget);
+        return true;
     }
         
     /**
@@ -273,28 +220,14 @@
         }
         
     }
-
     
-    protected class BackChannelConduit implements Conduit {
+    protected class BackChannelConduit
+        extends AbstractDestination.AbstractBackChannelConduit {
         
         protected HttpServletResponse response;
-        protected EndpointReferenceType target;
         
-        BackChannelConduit(EndpointReferenceType ref, HttpServletResponse resp) {
+        BackChannelConduit(HttpServletResponse resp) {
             response = resp;
-            target = ref;
-        }
-        public void close(Message msg) throws IOException {
-            msg.getContent(OutputStream.class).close();        
-        }
-
-        /**
-         * Register a message observer for incoming messages.
-         * 
-         * @param observer the observer to notify on receipt of incoming
-         */
-        public void setMessageObserver(MessageObserver observer) {
-            // shouldn't be called for a back channel conduit
         }
 
         /**
@@ -308,29 +241,6 @@
             message.setContent(OutputStream.class,
                                new WrappedOutputStream(message, response));
         }
-        
-        /**
-         * @return the reference associated with the target Destination
-         */    
-        public EndpointReferenceType getTarget() {
-            return target;
-        }
-        
-        /**
-         * Retreive the back-channel Destination.
-         * 
-         * @return the backchannel Destination (or null if the backchannel is
-         * built-in)
-         */
-        public Destination getBackChannel() {
-            return null;
-        }
-        
-        /**
-         * Close the conduit
-         */
-        public void close() {
-        }
     }
     
     private class WrappedOutputStream extends AbstractWrappedOutputStream {
@@ -415,5 +325,4 @@
     public EndpointInfo getEndpointInfo() {
         return endpointInfo;
     }
-
 }

Modified: incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java Tue Jan 30 04:21:03 2007
@@ -39,9 +39,6 @@
 import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.http.destination.HTTPDestinationConfigBean;
 import org.apache.cxf.transports.http.configuration.HTTPServerPolicy;
-import org.apache.cxf.ws.addressing.AttributedURIType;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
-
 
 /**
  * Common base for HTTP Destination implementations.
@@ -68,18 +65,14 @@
                                    ConduitInitiator ci,
                                    EndpointInfo ei)
         throws IOException {
-        super(new EndpointReferenceType(), ei);
+        super(getTargetReference(getAddressValue(ei)), ei);  
         bus = b;
         conduitInitiator = ci;
         
         initConfig();
  
-        nurl = new URL(getAddressValue());
+        nurl = new URL(getAddressValue(ei));
         name = nurl.getPath();
-
-        AttributedURIType address = new AttributedURIType();
-        address.setValue(getAddressValue());
-        reference.setAddress(address);
     }
 
     /**
@@ -145,8 +138,8 @@
     protected abstract void copyRequestHeaders(Message message,
                                                Map<String, List<String>> headers);
 
-    protected final String getAddressValue() {       
-        return StringUtils.addDefaultPortIfMissing(endpointInfo.getAddress());
+    protected static String getAddressValue(EndpointInfo ei) {       
+        return StringUtils.addDefaultPortIfMissing(ei.getAddress());
     }        
 
     private void initConfig() {

Modified: incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java Tue Jan 30 04:21:03 2007
@@ -41,17 +41,14 @@
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.io.AbstractWrappedOutputStream;
-import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.destination.HTTPDestinationConfigBean;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.apache.cxf.wsdl11.ServiceWSDLBuilder;
 import org.mortbay.http.HttpRequest;
 import org.mortbay.http.HttpResponse;
@@ -62,8 +59,6 @@
     public static final String HTTP_REQUEST = JettyHTTPDestination.class.getName() + ".REQUEST";
     public static final String HTTP_RESPONSE = JettyHTTPDestination.class.getName() + ".RESPONSE";
     
-    protected static final String ANONYMOUS_ADDRESS = "http://www.w3.org/2005/08/addressing/anonymous";
-
     private static final Logger LOG = LogUtils.getL7dLogger(JettyHTTPDestination.class);
 
     protected ServerEngine engine;
@@ -123,10 +118,10 @@
     /**
      * Activate receipt of incoming messages.
      */
-    protected void activateIncoming() {
+    protected void activate() {
         LOG.log(Level.INFO, "Activating receipt of incoming messages");
         try {
-            URL url = new URL(getAddressValue());
+            URL url = new URL(getAddressValue(endpointInfo));
             if (contextMatchOnExact()) {
                 engine.addServant(url, new AbstractHttpHandler() {
                     public void handle(String pathInContext, String pathParams, HttpRequest req,
@@ -154,7 +149,7 @@
     /**
      * Deactivate receipt of incoming messages.
      */
-    protected void deactivateIncoming() {
+    protected void deactivate() {
         LOG.log(Level.INFO, "Deactivating receipt of incoming messages");
         engine.removeServant(nurl);   
     }
@@ -191,53 +186,6 @@
     }
 
     /**
-     * Retreive a back-channel Conduit, which must be policy-compatible
-     * with the current Message and associated Destination. For example
-     * compatible Quality of Protection must be asserted on the back-channel.
-     * This would generally only be an issue if the back-channel is decoupled.
-     * 
-     * @param inMessage the current inbound message (null to indicate a 
-     * disassociated back-channel)
-     * @param partialResponse in the decoupled case, this is expected to be the
-     * outbound Message to be sent over the in-built back-channel. 
-     * @param address the backchannel address (null to indicate anonymous)
-     * @return a suitable Conduit
-     */
-    public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType address)
-        throws IOException {
-        HttpResponse response = (HttpResponse)inMessage.get(HTTP_RESPONSE);
-        Conduit backChannel = null;
-        Exchange ex = inMessage.getExchange();
-        EndpointReferenceType target = address != null ? address : ex.get(EndpointReferenceType.class);
-        if (target == null) {
-            backChannel = new BackChannelConduit(response);
-        } else {
-            if (partialResponse != null) {
-                // setup the outbound message to for 202 Accepted
-                partialResponse.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_ACCEPTED);
-                backChannel = new BackChannelConduit(response);
-                ex.put(EndpointReferenceType.class, target);
-            } else {
-                backChannel = conduitInitiator.getConduit(endpointInfo, target);
-                // ensure decoupled back channel input stream is closed
-                backChannel.setMessageObserver(new MessageObserver() {
-                    public void onMessage(Message m) {
-                        if (m.getContentFormats().contains(InputStream.class)) {
-                            InputStream is = m.getContent(InputStream.class);
-                            try {
-                                is.close();
-                            } catch (Exception e) {
-                                // ignore
-                            }
-                        }
-                    }
-                });
-            }
-        }
-        return backChannel;
-    }
-
-    /**
      * Copy the request headers into the message.
      * 
      * @param message the current message
@@ -329,8 +277,8 @@
             inMessage.put(Message.PATH_INFO, req.getPath());
             inMessage.put(Message.QUERY_STRING, req.getQuery());
             inMessage.put(Message.CONTENT_TYPE, req.getContentType());
-            if (!StringUtils.isEmpty(getAddressValue())) {
-                inMessage.put(Message.BASE_PATH, new URL(getAddressValue()).getPath());
+            if (!StringUtils.isEmpty(getAddressValue(endpointInfo))) {
+                inMessage.put(Message.BASE_PATH, new URL(getAddressValue(endpointInfo)).getPath());
             }
             inMessage.put(Message.FIXED_PARAMETER_ORDER, config.isFixedParameterOrder());
             inMessage.put(Message.ASYNC_POST_RESPONSE_DISPATCH, Boolean.TRUE); 
@@ -397,27 +345,13 @@
     /**
      * Backchannel conduit.
      */
-    protected class BackChannelConduit implements Conduit {
+    protected class BackChannelConduit 
+        extends AbstractDestination.AbstractBackChannelConduit {
 
         protected HttpResponse response;
-        protected EndpointReferenceType target;
 
         BackChannelConduit(HttpResponse resp) {
             response = resp;
-            target = EndpointReferenceUtils.getEndpointReference(ANONYMOUS_ADDRESS);
-        }
-
-        public void close(Message msg) throws IOException {
-            msg.getContent(OutputStream.class).close();
-        }
-
-        /**
-         * Register a message observer for incoming messages.
-         * 
-         * @param observer the observer to notify on receipt of incoming
-         */
-        public void setMessageObserver(MessageObserver observer) {
-            // shouldn't be called for a back channel conduit
         }
 
         /**
@@ -430,29 +364,6 @@
         public void send(Message message) throws IOException {
             message.put(HTTP_RESPONSE, response);
             message.setContent(OutputStream.class, new WrappedOutputStream(message, response));
-        }
-
-        /**
-         * @return the reference associated with the target Destination
-         */
-        public EndpointReferenceType getTarget() {
-            return target;
-        }
-
-        /**
-         * Retreive the back-channel Destination.
-         * 
-         * @return the backchannel Destination (or null if the backchannel is
-         * built-in)
-         */
-        public Destination getBackChannel() {
-            return null;
-        }
-
-        /**
-         * Close the conduit
-         */
-        public void close() {
         }
     }
 

Modified: incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java Tue Jan 30 04:21:03 2007
@@ -176,7 +176,7 @@
         assertNull("unexpected backchannel-backchannel",
                    backChannel.getBackChannel());
         assertEquals("unexpected target",
-                     JettyHTTPDestination.ANONYMOUS_ADDRESS,
+                     EndpointReferenceUtils.ANONYMOUS_ADDRESS,
                      backChannel.getTarget().getAddress().getValue());
     }
     

Modified: incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java Tue Jan 30 04:21:03 2007
@@ -40,9 +40,6 @@
 import org.apache.cxf.transport.ConduitInitiator;
 import org.apache.cxf.transport.http.destination.HTTPDestinationConfigBean;
 import org.apache.cxf.transports.http.configuration.HTTPServerPolicy;
-import org.apache.cxf.ws.addressing.AttributedURIType;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
-
 
 /**
  * Common base for HTTP Destination implementations.
@@ -69,18 +66,14 @@
                                    ConduitInitiator ci,
                                    EndpointInfo ei)
         throws IOException {
-        super(new EndpointReferenceType(), ei);
+        super(getTargetReference(getAddressValue(ei)), ei);  
         bus = b;
         conduitInitiator = ci;
         
         initConfig();
          
-        nurl = new URL(getAddressValue());
+        nurl = new URL(getAddressValue(ei));
         name = nurl.getPath();
-        
-        AttributedURIType address = new AttributedURIType();
-        address.setValue(getAddressValue());
-        reference.setAddress(address);
     }
 
     /**
@@ -146,8 +139,8 @@
     protected abstract void copyRequestHeaders(Message message,
                                                Map<String, List<String>> headers);
 
-    protected final String getAddressValue() {       
-        return StringUtils.addDefaultPortIfMissing(endpointInfo.getAddress());
+    protected static String getAddressValue(EndpointInfo ei) {       
+        return StringUtils.addDefaultPortIfMissing(ei.getAddress());
     }        
 
     private void initConfig() {

Modified: incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/JettyHTTPDestination.java Tue Jan 30 04:21:03 2007
@@ -46,13 +46,11 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.destination.HTTPDestinationConfigBean;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.apache.cxf.wsdl11.ServiceWSDLBuilder;
 import org.mortbay.jetty.HttpConnection;
 import org.mortbay.jetty.Request;
@@ -64,8 +62,6 @@
     public static final String HTTP_REQUEST = JettyHTTPDestination.class.getName() + ".REQUEST";
     public static final String HTTP_RESPONSE = JettyHTTPDestination.class.getName() + ".RESPONSE";
 
-    protected static final String ANONYMOUS_ADDRESS = "http://www.w3.org/2005/08/addressing/anonymous";
-
     private static final Logger LOG = LogUtils.getL7dLogger(JettyHTTPDestination.class);
     
     protected ServerEngine engine;
@@ -124,10 +120,10 @@
     /**
      * Activate receipt of incoming messages.
      */
-    protected void activateIncoming() {
+    protected void activate() {
         LOG.log(Level.INFO, "Activating receipt of incoming messages");
         try {
-            URL url = new URL(getAddressValue());
+            URL url = new URL(getAddressValue(endpointInfo));
             //The handler is bind with the context, 
             //we need to set the things on on context
             if (contextMatchOnExact()) {
@@ -158,7 +154,7 @@
     /**
      * Deactivate receipt of incoming messages.
      */
-    protected void deactivateIncoming() {
+    protected void deactivate() {
         LOG.log(Level.INFO, "Deactivating receipt of incoming messages");
         engine.removeServant(nurl);        
     }
@@ -293,8 +289,8 @@
             inMessage.put(Message.QUERY_STRING, req.getQueryString());
             inMessage.put(Message.CONTENT_TYPE, req.getContentType());
             inMessage.setContent(InputStream.class, req.getInputStream());
-            if (!StringUtils.isEmpty(getAddressValue())) {
-                inMessage.put(Message.BASE_PATH, new URL(getAddressValue()).getPath());
+            if (!StringUtils.isEmpty(getAddressValue(endpointInfo))) {
+                inMessage.put(Message.BASE_PATH, new URL(getAddressValue(endpointInfo)).getPath());
             }
             inMessage.put(Message.FIXED_PARAMETER_ORDER, config.isFixedParameterOrder());
             inMessage.put(Message.ASYNC_POST_RESPONSE_DISPATCH, Boolean.TRUE); 
@@ -356,27 +352,13 @@
     /**
      * Backchannel conduit.
      */
-    protected class BackChannelConduit implements Conduit {
+    protected class BackChannelConduit
+        extends AbstractDestination.AbstractBackChannelConduit {
 
         protected HttpServletResponse response;
-        protected EndpointReferenceType target;
 
         BackChannelConduit(HttpServletResponse resp) {
             response = resp;
-            target = EndpointReferenceUtils.getEndpointReference(ANONYMOUS_ADDRESS);
-        }
-
-        public void close(Message msg) throws IOException {
-            msg.getContent(OutputStream.class).close();
-        }
-
-        /**
-         * Register a message observer for incoming messages.
-         * 
-         * @param observer the observer to notify on receipt of incoming
-         */
-        public void setMessageObserver(MessageObserver observer) {
-            // shouldn't be called for a back channel conduit
         }
 
         /**
@@ -389,29 +371,6 @@
         public void send(Message message) throws IOException {
             message.put(HTTP_RESPONSE, response);
             message.setContent(OutputStream.class, new WrappedOutputStream(message, response));
-        }
-
-        /**
-         * @return the reference associated with the target Destination
-         */
-        public EndpointReferenceType getTarget() {
-            return target;
-        }
-
-        /**
-         * Retreive the back-channel Destination.
-         * 
-         * @return the backchannel Destination (or null if the backchannel is
-         * built-in)
-         */
-        public Destination getBackChannel() {
-            return null;
-        }
-
-        /**
-         * Close the conduit
-         */
-        public void close() {
         }
     }
 

Modified: incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/JettyHTTPDestinationTest.java Tue Jan 30 04:21:03 2007
@@ -168,7 +168,7 @@
         assertNull("unexpected backchannel-backchannel",
                    backChannel.getBackChannel());
         assertEquals("unexpected target",
-                     JettyHTTPDestination.ANONYMOUS_ADDRESS,
+                     EndpointReferenceUtils.ANONYMOUS_ADDRESS,
                      backChannel.getTarget().getAddress().getValue());
     }
     

Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Tue Jan 30 04:21:03 2007
@@ -45,6 +45,7 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
@@ -54,14 +55,15 @@
 import org.apache.cxf.transports.jms.jms_conf.JMSClientConfig;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-public class JMSConduit extends JMSTransportBase implements Conduit, Configurable {
+public class JMSConduit extends AbstractConduit {
+
+    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base";
+
     private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
-      
-    protected JMSConduitConfigBean jmsConduitConfigBean;   
     
-    private MessageObserver incomingObserver;
-    private EndpointReferenceType target;
-   
+    protected final JMSTransportBase base;
+    protected JMSConduitConfigBean config;
+       
     public JMSConduit(Bus b, EndpointInfo endpointInfo) {
         this(b, endpointInfo, null);
     }
@@ -69,32 +71,30 @@
     public JMSConduit(Bus b,
                       EndpointInfo endpointInfo,
                       EndpointReferenceType target) {           
-        super(b, endpointInfo, false);        
+        super(target);        
 
+        base = new JMSTransportBase(b, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
+        
         initConfig();
     } 
     
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-conduit-base";
-    }
-
     // prepare the message for send out , not actually send out the message
     public void send(Message message) throws IOException {        
-        LOG.log(Level.FINE, "JMSConduit send message");
+        getLogger().log(Level.FINE, "JMSConduit send message");
 
         try {
-            if (null == sessionFactory) {
-                JMSProviderHub.connect(this, null);
+            if (null == base.sessionFactory) {
+                JMSProviderHub.connect(base, null);
             }
         } catch (JMSException jmsex) {
-            LOG.log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
+            getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
             throw new IOException(jmsex.toString());
         } catch (NamingException ne) {
-            LOG.log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
+            getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne);
             throw new IOException(ne.toString());
         }
 
-        if (sessionFactory == null) {
+        if (base.sessionFactory == null) {
             throw new java.lang.IllegalStateException("JMSClientTransport not connected");
         }
 
@@ -106,7 +106,7 @@
                 isOneWay = ex.isOneWay();
             }    
             //get the pooledSession with response expected 
-            PooledSession pooledSession = sessionFactory.get(!isOneWay);            
+            PooledSession pooledSession = base.sessionFactory.get(!isOneWay);            
             // put the PooledSession into the outMessage
             message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession);
             
@@ -119,35 +119,20 @@
       
     }
 
-    public void close(Message message) throws IOException {
-        // TODO Auto-generated method stub
-        message.getContent(OutputStream.class).close();
-        //using the outputStream to setup the corralated response
-    }
-
-    public EndpointReferenceType getTarget() {
-        return target;
-    }
-
-    public Destination getBackChannel() {
-        return null;
-       //TODO now didn't support this asychronized request
-    }
-
     public void close() {       
-        LOG.log(Level.FINE, "JMSConduit closed ");
+        getLogger().log(Level.FINE, "JMSConduit closed ");
 
         // ensure resources held by session factory are released
         //
-        if (sessionFactory != null) {
-            sessionFactory.shutdown();
+        if (base.sessionFactory != null) {
+            base.sessionFactory.shutdown();
         }
     }
-
-    public void setMessageObserver(MessageObserver observer) {
-        incomingObserver = observer;        
-        LOG.info("registering incoming observer: " + incomingObserver);        
+    
+    protected Logger getLogger() {
+        return LOG;
     }
+    
 
     /**
      * Receive mechanics.
@@ -160,7 +145,7 @@
         
         Object result = null;
         
-        long timeout = jmsConduitConfigBean.getClientConfig().getClientReceiveTimeout();
+        long timeout = config.getClientConfig().getClientReceiveTimeout();
 
         Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT);
 
@@ -169,18 +154,18 @@
         }
         
         javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout);
-        LOG.log(Level.FINE, "client received reply: " , jmsMessage);
+        getLogger().log(Level.FINE, "client received reply: " , jmsMessage);
 
         if (jmsMessage != null) {
             
-            populateIncomingContext(jmsMessage, outMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+            base.populateIncomingContext(jmsMessage, outMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
             String messageType = jmsMessage instanceof TextMessage 
                         ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE;
-            result = unmarshal(jmsMessage, messageType);
+            result = base.unmarshal(jmsMessage, messageType);
             return result;
         } else {
             String error = "JMSClientTransport.receive() timed out. No message available.";
-            LOG.log(Level.SEVERE, error);
+            getLogger().log(Level.SEVERE, error);
             //TODO: Review what exception should we throw.
             throw new JMSException(error);
             
@@ -188,33 +173,34 @@
     }
 
     private void initConfig() {
-        
-        final class JMSConduitConfiguration extends JMSConduitConfigBean implements Configurable {
 
+        final class JMSConduitConfiguration extends JMSConduitConfigBean implements Configurable {
             public String getBeanName() {
-                return endpointInfo.getName().toString() + ".jms-conduit";
+                return base.endpointInfo.getName().toString() + ".jms-conduit";
             }
         }
         
-        // It'd be really nice if we did this in a way that wsa more friendly to the API
+        // It'd be really nice if we did this in a way that was more friendly to the API
         JMSConduitConfigBean bean = new JMSConduitConfiguration();
         
-        bean.setClient(endpointInfo.getTraversedExtensor(new JMSClientBehaviorPolicyType(), 
-                                                         JMSClientBehaviorPolicyType.class));
-        bean.setClientConfig(endpointInfo.getTraversedExtensor(new JMSClientConfig(), JMSClientConfig.class));
+        bean.setClient(base.endpointInfo.getTraversedExtensor(new JMSClientBehaviorPolicyType(),
+                                                              JMSClientBehaviorPolicyType.class));
+        bean.setClientConfig(base.endpointInfo.getTraversedExtensor(new JMSClientConfig(),
+                                                                    JMSClientConfig.class));
 
-        Configurer configurer = bus.getExtension(Configurer.class);
+        Configurer configurer = base.bus.getExtension(Configurer.class);
         if (null != configurer) {
             configurer.configureBean(bean);
+            configurer.configureBean(base);
         }
         
-        jmsConduitConfigBean = bean;
+        config = bean;
 
     }
 
     private boolean isTextPayload() {
         return JMSConstants.TEXT_MESSAGE_TYPE.equals(
-            jmsConduitConfigBean.getClient().getMessageType().value());
+            config.getClient().getMessageType().value());
     }
     
     private class JMSOutputStream extends AbstractCachedOutputStream {
@@ -239,9 +225,9 @@
                 if (!isOneWay) {
                     handleResponse();
                 }
-                sessionFactory.recycle(pooledSession);
+                base.sessionFactory.recycle(pooledSession);
             } catch (JMSException jmsex) {
-                LOG.log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
+                getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex);            
                 throw new IOException(jmsex.toString());
             }
         }
@@ -259,7 +245,7 @@
                 request = ((ByteArrayOutputStream)currentStream).toByteArray();
             }
             
-            LOG.log(Level.FINE, "Conduit Request is :[" + request + "]");
+            getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
             javax.jms.Destination replyTo = pooledSession.destination();
             
             //TODO setting up the responseExpected
@@ -268,25 +254,25 @@
             //We don't want to send temp queue in
             //replyTo header for oneway calls
             if (isOneWay
-                && (getAddressPolicy().getJndiReplyDestinationName() == null)) {
+                && (base.getAddressPolicy().getJndiReplyDestinationName() == null)) {
                 replyTo = null;
             }
 
-            jmsMessage = marshal(request, pooledSession.session(), replyTo,
-                jmsConduitConfigBean.getClient().getMessageType().value());
+            jmsMessage = base.marshal(request, pooledSession.session(), replyTo,
+                config.getClient().getMessageType().value());
             
             JMSMessageHeadersType headers =
                 (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
 
-            int deliveryMode = getJMSDeliveryMode(headers);
-            int priority = getJMSPriority(headers);
-            String correlationID = getCorrelationId(headers);
-            long ttl = getTimeToLive(headers);
+            int deliveryMode = base.getJMSDeliveryMode(headers);
+            int priority = base.getJMSPriority(headers);
+            String correlationID = base.getCorrelationId(headers);
+            long ttl = base.getTimeToLive(headers);
             if (ttl <= 0) {
-                ttl = jmsConduitConfigBean.getClientConfig().getMessageTimeToLive();
+                ttl = config.getClientConfig().getMessageTimeToLive();
             }
             
-            setMessageProperties(headers, jmsMessage);           
+            base.setMessageProperties(headers, jmsMessage);           
             
             if (!isOneWay) {
                 String id = pooledSession.getCorrelationID();
@@ -309,16 +295,16 @@
                 // We assume that it will only happen in case of the temp. reply queue.
             }
 
-            LOG.log(Level.FINE, "client sending request: ",  jmsMessage);
+            getLogger().log(Level.FINE, "client sending request: ",  jmsMessage);
             //getting  Destination Style
-            if (isDestinationStyleQueue()) {
+            if (base.isDestinationStyleQueue()) {
                 QueueSender sender = (QueueSender)pooledSession.producer();
                 sender.setTimeToLive(ttl);
-                sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl);
+                sender.send((Queue)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
             } else {
                 TopicPublisher publisher = (TopicPublisher)pooledSession.producer();
                 publisher.setTimeToLive(ttl);
-                publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl);
+                publisher.publish((Topic)base.targetDestination, jmsMessage, deliveryMode, priority, ttl);
             }
         }
 
@@ -336,7 +322,7 @@
             try {
                 response = receive(pooledSession, outMessage);
             } catch (JMSException jmsex) {
-                LOG.log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);            
+                getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex);            
                 throw new IOException(jmsex.toString());
             }  
 
@@ -344,7 +330,7 @@
             inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, 
                           outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS));
 
-            LOG.log(Level.FINE, "The Response Message is : [" + response + "]");
+            getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
             
             // setup the inMessage response stream
             byte[] bytes = null;
@@ -355,7 +341,7 @@
                 bytes = (byte[])response;
             }
             inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
-            LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
+            getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
             incomingObserver.onMessage(inMessage);
         }
     }

Modified: incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?view=diff&rev=501388&r1=501387&r2=501388
==============================================================================
--- incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ incubator/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Jan 30 04:21:03 2007
@@ -48,129 +48,82 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.AbstractDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
-import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.jms.destination.JMSDestinationConfigBean;
 import org.apache.cxf.transports.jms.JMSServerBehaviorPolicyType;
 import org.apache.cxf.transports.jms.context.JMSMessageHeadersType;
 import org.apache.cxf.transports.jms.jms_conf.JMSServerConfig;
 import org.apache.cxf.workqueue.WorkQueueManager;
-import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.wsdl.EndpointReferenceUtils;
 
 
 
-public class JMSDestination extends JMSTransportBase implements Destination, Configurable {
-    static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);    
-    final EndpointReferenceType reference;
+public class JMSDestination extends AbstractDestination {
+        
+    protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base";
+
+    private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
+
     final ConduitInitiator conduitInitiator;
-    JMSDestinationConfigBean jmsDestinationConfigBean;
+    final JMSTransportBase base;
+    JMSDestinationConfigBean config;
     PooledSession listenerSession;
     JMSListenerThread listenerThread;
-    MessageObserver incomingObserver;
     
     public JMSDestination(Bus b,
                           ConduitInitiator ci,
                           EndpointInfo info) throws IOException {
-        super(b, info, true);        
-        initConfig();
+        super(getTargetReference(info.getAddress()), info);    
+        
+        base = new JMSTransportBase(b, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
+
         conduitInitiator = ci;
-        reference = new EndpointReferenceType();
-        AttributedURIType address = new AttributedURIType();
-        address.setValue(endpointInfo.getAddress());
-        reference.setAddress(address);        
-    }
-   
-    public String getBeanName() {
-        return endpointInfo.getName().toString() + ".jms-destination-base";
-    }
-    
-    public EndpointReferenceType getAddress() {       
-        return reference;
-    }
 
-    public Conduit getBackChannel(Message inMessage, 
-                                  Message partialResponse, 
-                                  EndpointReferenceType address) throws IOException {
-                
-        Conduit backChannel = null;
-        if (address == null) {
-            backChannel = new BackChannelConduit(address, inMessage, this);
-        } else {
-            if (partialResponse != null) {
-                // just send back the partialResponse 
-                backChannel = new BackChannelConduit(address, inMessage , this);
-            } else {                
-                backChannel = conduitInitiator.getConduit(endpointInfo, address);
-                // ensure decoupled back channel input stream is closed
-                backChannel.setMessageObserver(new MessageObserver() {
-                    public void onMessage(Message m) {
-                        //need to set up the headers 
-                        if (m.getContentFormats().contains(InputStream.class)) {
-                            InputStream is = m.getContent(InputStream.class);
-                            try {
-                                is.close();
-                            } catch (Exception e) {
-                                // ignore
-                            }
-                        }
-                    }
-                });
-            }
-        }
-        return backChannel;
-        
+        initConfig();
     }
 
-  
-    public void setMessageObserver(MessageObserver observer) {
-        // to handle the incomming message        
-        if (null != observer) {
-            try {
-                activate();
-            } catch (IOException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        } else {
-            LOG.log(Level.FINE, "JMSDestination shutdown()");
-            try {
-                deactivate();
-            } catch (IOException e) {
-                //Ignore for now.
-            }
-        }
-        incomingObserver = observer;
+    protected Logger getLogger() {
+        return LOG;
     }
     
-    private void activate() throws IOException {
-        LOG.log(Level.INFO, "JMSServerTransport activate().... ");        
+    /**
+     * @param inMessage the incoming message
+     * @return the inbuilt backchannel
+     */
+    protected Conduit getInbuiltBackChannel(Message inMessage) {
+        return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
+                                      inMessage);
+    }
+     
+    public void activate()  {
+        getLogger().log(Level.INFO, "JMSServerTransport activate().... ");        
 
         try {
-            LOG.log(Level.FINE, "establishing JMS connection");
-            JMSProviderHub.connect(this, jmsDestinationConfigBean);
+            getLogger().log(Level.FINE, "establishing JMS connection");
+            JMSProviderHub.connect(base, config);
             //Get a non-pooled session. 
-            listenerSession = sessionFactory.get(targetDestination);
-            listenerThread = new JMSListenerThread(listenerSession, this);
+            listenerSession = base.sessionFactory.get(base.targetDestination);
+            listenerThread = new JMSListenerThread(listenerSession);
             listenerThread.start();
         } catch (JMSException ex) {
-            LOG.log(Level.FINE, "JMS connect failed with JMSException : ", ex);
-            throw new IOException(ex.getMessage());
+            getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", ex);
         } catch (NamingException nex) {
-            LOG.log(Level.FINE, "JMS connect failed with NamingException : ", nex);
-            throw new IOException(nex.getMessage());
+            getLogger().log(Level.FINE, "JMS connect failed with NamingException : ", nex);
         }
     }
     
-    public void deactivate() throws IOException {
+    public void deactivate()  {
         try {
             listenerSession.consumer().close();
             if (listenerThread != null) {
                 listenerThread.join();
             }
-            sessionFactory.shutdown();
+            base.sessionFactory.shutdown();
         } catch (InterruptedException e) {
             //Do nothing here
         } catch (JMSException ex) {
@@ -179,12 +132,8 @@
     }
 
     public void shutdown() {
-        LOG.log(Level.FINE, "JMSDestination shutdown()");
-        try {
-            this.deactivate();
-        } catch (IOException ex) {
-            // Ignore for now.
-        }         
+        getLogger().log(Level.FINE, "JMSDestination shutdown()");
+        this.deactivate();
     }
 
     public Queue getReplyToDestination(Message inMessage) 
@@ -194,11 +143,11 @@
             (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
         // If WS-Addressing had set the replyTo header.
         if  (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) {
-            replyTo = sessionFactory.getQueueFromInitialContext(
+            replyTo = base.sessionFactory.getQueueFromInitialContext(
                               (String)  inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO));
         } else {
             replyTo = (null != message.getJMSReplyTo()) 
-                ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination;
+                ? (Queue)message.getJMSReplyTo() : (Queue)base.replyDestination;
         }    
         return replyTo;
     }
@@ -210,7 +159,7 @@
         
         if (correlationID == null
             || "".equals(correlationID)
-            && jmsDestinationConfigBean.getServer().isUseMessageIDAsCorrelationID()) {
+            && config.getServer().isUseMessageIDAsCorrelationID()) {
             correlationID = request.getJMSMessageID();
         }
     
@@ -221,17 +170,17 @@
     
     protected void incoming(javax.jms.Message message) throws IOException {
         try {
-            LOG.log(Level.FINE, "server received request: ", message);           
+            getLogger().log(Level.FINE, "server received request: ", message);           
 
             String msgType = message instanceof TextMessage 
                     ? JMSConstants.TEXT_MESSAGE_TYPE : JMSConstants.BINARY_MESSAGE_TYPE;
-            Object request = unmarshal(message, msgType);
-            LOG.log(Level.FINE, "The Request Message is [ " + request + "]");
+            Object request = base.unmarshal(message, msgType);
+            getLogger().log(Level.FINE, "The Request Message is [ " + request + "]");
             byte[] bytes = null;
 
             if (JMSConstants.TEXT_MESSAGE_TYPE.equals(msgType)) {
                 String requestString = (String)request;
-                LOG.log(Level.FINE, "server received request: ", requestString);
+                getLogger().log(Level.FINE, "server received request: ", requestString);
                 bytes = requestString.getBytes();
             } else {
                 bytes = (byte[])request;
@@ -241,7 +190,7 @@
             MessageImpl inMessage = new MessageImpl();
             inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
             JMSMessageHeadersType headers = 
-                populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_HEADERS);
+                base.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_HEADERS);
             inMessage.put(JMSConstants.JMS_SERVER_HEADERS, headers);
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
                         
@@ -271,22 +220,19 @@
                                                          JMSServerBehaviorPolicyType.class));
         bean.setServerConfig(endpointInfo.getTraversedExtensor(new JMSServerConfig(), JMSServerConfig.class));
         
-        Configurer configurer = bus.getExtension(Configurer.class);
+        Configurer configurer = base.bus.getExtension(Configurer.class);
         if (null != configurer) {
             configurer.configureBean(bean);
         }
         
-        jmsDestinationConfigBean = bean;
+        config = bean;
     }
-
-    class JMSListenerThread extends Thread {
-        final JMSDestination jmsDestination;
+   
+    protected class JMSListenerThread extends Thread {
         private final PooledSession listenSession;
 
-        public JMSListenerThread(PooledSession session,
-                                 JMSDestination destination) {
+        public JMSListenerThread(PooledSession session) {
             listenSession = session;
-            jmsDestination = destination;
         }
 
         public void run() {
@@ -294,7 +240,7 @@
                 while (true) {
                     javax.jms.Message message = listenSession.consumer().receive();                   
                     if (message == null) {
-                        LOG.log(Level.WARNING,
+                        getLogger().log(Level.WARNING,
                                 "Null message received from message consumer.",
                                 " Exiting ListenerThread::run().");
                         return;
@@ -304,14 +250,15 @@
                         //Executor executor = jmsDestination.callback.getExecutor();
                         Executor executor = null;
                         if (executor == null) {
-                            WorkQueueManager wqm = jmsDestination.bus.getExtension(WorkQueueManager.class);
+                            WorkQueueManager wqm =
+                                base.bus.getExtension(WorkQueueManager.class);
                             if (null != wqm) {
                                 executor = wqm.getAutomaticWorkQueue();
                             }    
                         }
                         if (executor != null) {
                             try {
-                                executor.execute(new JMSExecutor(jmsDestination, message));
+                                executor.execute(new JMSExecutor(message));
                                 message = null;
                             } catch (RejectedExecutionException ree) {
                                 //FIXME - no room left on workqueue, what to do
@@ -319,11 +266,11 @@
                                 //although we could just dispatch on this thread.
                             }                            
                         } else {
-                            LOG.log(Level.INFO, "handle the incoming message in listener thread");
+                            getLogger().log(Level.INFO, "handle the incoming message in listener thread");
                             try {
-                                jmsDestination.incoming(message);
+                                incoming(message);
                             } catch (IOException ex) {
-                                LOG.log(Level.WARNING, "Failed to process incoming message : ", ex);
+                                getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
                             }                            
                         }                        
                         message = null;
@@ -331,30 +278,28 @@
                 }
             } catch (JMSException jmsex) {
                 jmsex.printStackTrace();
-                LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
+                getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
             } catch (Throwable jmsex) {
                 jmsex.printStackTrace();
-                LOG.log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
+                getLogger().log(Level.SEVERE, "Exiting ListenerThread::run(): ", jmsex.getMessage());
             }
         }
     }
     
-    static class JMSExecutor implements Runnable {
+    protected class JMSExecutor implements Runnable {
         javax.jms.Message message;
-        JMSDestination jmsDestination;
         
-        JMSExecutor(JMSDestination destionation, javax.jms.Message m) {
+        JMSExecutor(javax.jms.Message m) {
             message = m;
-            jmsDestination = destionation;
         }
 
         public void run() {
-            LOG.log(Level.INFO, "run the incoming message in the threadpool");
+            getLogger().log(Level.INFO, "run the incoming message in the threadpool");
             try {
-                jmsDestination.incoming(message);
+                incoming(message);
             } catch (IOException ex) {
                 //TODO: Decide what to do if we receive the exception.
-                LOG.log(Level.WARNING,
+                getLogger().log(Level.WARNING,
                         "Failed to process incoming message : ", ex);
             }
         }
@@ -362,22 +307,15 @@
     }
     
     // this should deal with the cxf message 
-    protected class BackChannelConduit implements Conduit {
+    protected class BackChannelConduit extends AbstractConduit {
         
         protected Message inMessage;
-        protected EndpointReferenceType target;
-        protected JMSDestination jmsDestination;
                 
-        BackChannelConduit(EndpointReferenceType ref, Message message, JMSDestination dest) {
+        BackChannelConduit(EndpointReferenceType ref, Message message) {
+            super(ref);
             inMessage = message;
-            target = ref;
-            jmsDestination = dest;
         }
         
-        public void close(Message msg) throws IOException {
-            msg.getContent(OutputStream.class).close();        
-        }
-
         /**
          * Register a message observer for incoming messages.
          * 
@@ -398,46 +336,25 @@
             message.put(JMSConstants.JMS_REQUEST_MESSAGE, 
                         inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE));
             message.setContent(OutputStream.class,
-                               new JMSOutputStream(inMessage, jmsDestination));
+                               new JMSOutputStream(inMessage));
         }
         
-        /**
-         * @return the reference associated with the target Destination
-         */    
-        public EndpointReferenceType getTarget() {
-            return target;
-        }
-        
-        /**
-         * Retreive the back-channel Destination.
-         * 
-         * @return the backchannel Destination (or null if the backchannel is
-         * built-in)
-         */
-        public Destination getBackChannel() {
-            return null;
-        }
-        
-        /**
-         * Close the conduit
-         */
-        public void close() {
+        protected Logger getLogger() {
+            return LOG;
         }
     }
     
     private class JMSOutputStream extends AbstractCachedOutputStream {
                 
         private Message inMessage;
-        private JMSDestination jmsDestination;
         private javax.jms.Message reply;
         private Queue replyTo;
         private QueueSender sender;
         
         // setup the ByteArrayStream
-        public JMSOutputStream(Message m, JMSDestination d) {
+        public JMSOutputStream(Message m) {
             super();
             inMessage = m;
-            jmsDestination = d;
         }
         
         //to prepear the message and get the send out message
@@ -450,28 +367,30 @@
             
             PooledSession replySession = null;          
             
-            if (jmsDestination.isDestinationStyleQueue()) {
+            if (base.isDestinationStyleQueue()) {
                 try {
                     //setup the reply message                
                     replyTo = getReplyToDestination(inMessage);
-                    replySession = sessionFactory.get(false);
+                    replySession = base.sessionFactory.get(false);
                     sender = (QueueSender)replySession.producer();
                     
                     boolean textPayload = request instanceof TextMessage 
                         ? true : false;
                     if (textPayload) {
                         
-                        reply = marshal(currentStream.toString(), 
+                        reply = base.marshal(currentStream.toString(), 
                                             replySession.session(), 
                                             null, 
                                             JMSConstants.TEXT_MESSAGE_TYPE);
-                        LOG.log(Level.FINE, "The response message is [" + currentStream.toString() + "]");
+                        getLogger().log(Level.FINE,
+                                        "The response message is ["
+                                        + currentStream.toString() + "]");
                     } else {
-                        reply = marshal(((ByteArrayOutputStream)currentStream).toByteArray(),
+                        reply = base.marshal(((ByteArrayOutputStream)currentStream).toByteArray(),
                                            replySession.session(),
                                            null, 
                                           JMSConstants.BINARY_MESSAGE_TYPE);
-                        LOG.log(Level.FINE, "The response message is [" 
+                        getLogger().log(Level.FINE, "The response message is [" 
                                            + new String(((ByteArrayOutputStream)currentStream).toByteArray()) 
                                            + "]");
                     }     
@@ -479,20 +398,20 @@
                     
                     setReplyCorrelationID(request, reply);
                     
-                    setMessageProperties(headers, reply);
+                    base.setMessageProperties(headers, reply);
 
                     sendResponse();
                     
                 } catch (JMSException ex) {
-                    LOG.log(Level.WARNING, "Failed in post dispatch ...", ex);                
+                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex);                
                     throw new IOException(ex.getMessage());                    
                 } catch (NamingException nex) {
-                    LOG.log(Level.WARNING, "Failed in post dispatch ...", nex);                
+                    getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex);                
                     throw new IOException(nex.getMessage());                    
                 } finally {
                     // house-keeping
                     if (replySession != null) {
-                        sessionFactory.recycle(replySession);
+                        base.sessionFactory.recycle(replySession);
                     }
                 }
             } else {
@@ -500,13 +419,13 @@
                 // domain from CXF client - however a mis-behaving pure JMS
                 // client could conceivably make suce an invocation, in which
                 // case we silently discard the reply
-                LOG.log(Level.WARNING,
+                getLogger().log(Level.WARNING,
                         "discarding reply for non-oneway invocation ",
                         "with 'topic' destinationStyle");
                 
             }        
             
-            LOG.log(Level.FINE, "just server sending reply: ", reply);
+            getLogger().log(Level.FINE, "just server sending reply: ", reply);
             // Check the reply time limit Stream close will call for this
             
            
@@ -518,12 +437,12 @@
             javax.jms.Message request = 
                 (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);   
             
-            int deliveryMode = getJMSDeliveryMode(headers);
-            int priority = getJMSPriority(headers);
-            long ttl = getTimeToLive(headers);
+            int deliveryMode = base.getJMSDeliveryMode(headers);
+            int priority = base.getJMSPriority(headers);
+            long ttl = base.getTimeToLive(headers);
             
             if (ttl <= 0) {
-                ttl = jmsDestinationConfigBean.getServerConfig().getMessageTimeToLive();
+                ttl = config.getServerConfig().getMessageTimeToLive();
             }
             
             long timeToLive = 0;
@@ -535,11 +454,11 @@
             
             if (timeToLive >= 0) {
                 ttl = ttl > 0 ? ttl : timeToLive;
-                LOG.log(Level.FINE, "send out the message!");
+                getLogger().log(Level.FINE, "send out the message!");
                 sender.send(replyTo, reply, deliveryMode, priority, ttl);
             } else {
                 // the request message had dead
-                LOG.log(Level.INFO, "Message time to live is already expired skipping response.");
+                getLogger().log(Level.INFO, "Message time to live is already expired skipping response.");
             }         
         }
            



Mime
View raw message