camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r629000 - in /activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf: ./ feature/ interceptors/ invoker/
Date Tue, 19 Feb 2008 06:23:04 GMT
Author: ningjiang
Date: Mon Feb 18 22:22:57 2008
New Revision: 629000

URL: http://svn.apache.org/viewvc?rev=629000&view=rev
Log:
CAMEL-339 using the feature to setup the client side interceptors

Modified:
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
    activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
Mon Feb 18 22:22:57 2008
@@ -99,7 +99,6 @@
                 outMessage = endpoint.getBinding().createMessage();
             }
             exchange.setOutMessage(outMessage);
-            outMessage.setExchange(exchange);
         }
 
     }
@@ -149,8 +148,8 @@
             exchange.get(Service.class).get(MethodDispatcher.class.getName());
         Method m = md.getMethod(bop);
 
-
-        if (bop.getOperationInfo().isOneWay()) {
+        // The SEI could be the provider class which will not have the bop information.
+        if (bop != null && bop.getOperationInfo().isOneWay()) {
             cxfExchange.setPattern(ExchangePattern.InOnly);
         } else {
             cxfExchange.setPattern(ExchangePattern.InOut);

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java
Mon Feb 18 22:22:57 2008
@@ -24,6 +24,8 @@
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.cxf.feature.MessageDataFormatFeature;
+import org.apache.camel.component.cxf.feature.PayLoadDataFormatFeature;
 import org.apache.camel.component.cxf.invoker.CxfClient;
 import org.apache.camel.component.cxf.invoker.CxfClientFactoryBean;
 import org.apache.camel.component.cxf.invoker.InvokingContext;
@@ -37,6 +39,7 @@
 import org.apache.cxf.common.classloader.ClassLoaderUtils;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.feature.AbstractFeature;
 import org.apache.cxf.frontend.ClientFactoryBean;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -84,6 +87,19 @@
         boolean jsr181Enabled = CxfEndpointUtils.hasWebServiceAnnotation(serviceClass);
         cfb.setJSR181Enabled(jsr181Enabled);
 
+        dataFormat = CxfEndpointUtils.getDataFormat(endpoint);
+        List<AbstractFeature> features = new ArrayList<AbstractFeature>();
+        if (dataFormat.equals(DataFormat.MESSAGE)) {
+            features.add(new MessageDataFormatFeature());
+            //features.add(new LoggingFeature());
+        }
+        if (dataFormat.equals(DataFormat.PAYLOAD)) {
+            features.add(new PayLoadDataFormatFeature());
+            //features.add(new LoggingFeature());
+        }
+        cfb.setFeatures(features);
+
+
         return createClientFormClientFactoryBean(cfb);
     }
 
@@ -209,7 +225,6 @@
                 CxfClient cxfClient = (CxfClient) client;
                 // invoke the message
                 //TODO need setup the call context here
-                //TODO need to handle the one way message
                 Object result = cxfClient.dispatch(params, null, ex);
                 // need to get the binding object to create the message
                 BindingOperationInfo boi = ex.get(BindingOperationInfo.class);

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java
Mon Feb 18 22:22:57 2008
@@ -52,6 +52,7 @@
                 if (!exchange.isOneWay()) {
                     Endpoint ep = exchange.get(Endpoint.class);
                     Message outMessage = runableEx.getOutMessage();
+                    copyJaxwsProperties(message, outMessage);
                     if (outMessage == null) {
                         outMessage = ep.getBinding().createMessage();
                         exchange.setOutMessage(outMessage);
@@ -79,6 +80,14 @@
      */
     private Executor getExecutor(final Endpoint endpoint) {
         return endpoint.getService().getExecutor();
+    }
+
+    private void copyJaxwsProperties(Message inMsg, Message outMsg) {
+        outMsg.put(Message.WSDL_OPERATION, inMsg.get(Message.WSDL_OPERATION));
+        outMsg.put(Message.WSDL_SERVICE, inMsg.get(Message.WSDL_SERVICE));
+        outMsg.put(Message.WSDL_INTERFACE, inMsg.get(Message.WSDL_INTERFACE));
+        outMsg.put(Message.WSDL_PORT, inMsg.get(Message.WSDL_PORT));
+        outMsg.put(Message.WSDL_DESCRIPTION, inMsg.get(Message.WSDL_DESCRIPTION));
     }
 
 }

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java
Mon Feb 18 22:22:57 2008
@@ -57,7 +57,7 @@
         if (isRequestor(inMessage)) {
             return;
         }
-
+        /*
         Exchange exchange = inMessage.getExchange();
         Message outMessage = null;
         try {
@@ -94,7 +94,7 @@
 
             //Initiate the OutBound Chain.
             chain.doIntercept(outMessage);
-        }
+        }*/
     }
 
     /**

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java
Mon Feb 18 22:22:57 2008
@@ -56,47 +56,47 @@
     private static final Logger LOG = Logger.getLogger(CxfClient.class.getName());
 
     private Endpoint endpoint;
-    
+
     public CxfClient(Bus b, Endpoint e) {
         super(b, e);
-        endpoint = e; 
+        endpoint = e;
     }
-    
-    public Object dispatch(Object params, 
+
+    public Object dispatch(Object params,
                            Map<String, Object> context,
                            Exchange exchange) throws Exception {
-        
+
         Object retval = null;
         InvokingContext invokingContext = exchange.get(InvokingContext.class);
         assert invokingContext != null;
 
         // get InBound binding operation info from the exchange object
         BindingOperationInfo inBoundOp = exchange.get(BindingOperationInfo.class);
-        
+
         BindingOperationInfo outBoundOp = null;
 
         if (inBoundOp != null) {
             //Get the BindingOperationInfo for the outbound binding.
             BindingInfo bi = getEndpoint().getEndpointInfo().getBinding();
             outBoundOp = bi.getOperation(inBoundOp.getOperationInfo().getName());
-            if (outBoundOp != null 
+            if (outBoundOp != null
                 && inBoundOp.isUnwrapped()) {
                 outBoundOp = outBoundOp.getUnwrappedOperation();
             }
         }
-        
-       
+
+
         retval = invokeWithMessageStream(outBoundOp, params, context, invokingContext);
-        
+
         return retval;
-        
-        
+
+
     }
 
- 
+
     @SuppressWarnings("unchecked")
-    public Object invokeWithMessageStream(BindingOperationInfo bi, 
-                                          Object param, 
+    public Object invokeWithMessageStream(BindingOperationInfo bi,
+                                          Object param,
                                           Map<String, Object> context,
                                           InvokingContext invokingContext) throws Exception
{
 
@@ -120,25 +120,28 @@
         if (bi != null) {
             //Set The InputMessage
             exchange.put(BindingOperationInfo.class, bi);
-            exchange.put(BindingMessageInfo.class, bi.getInput());            
+            exchange.put(BindingMessageInfo.class, bi.getInput());
             exchange.setOneWay(bi.getOperationInfo().isOneWay());
         }
 
         Message message = prepareMessage(exchange, requestContext, param, invokingContext);
-        
-        PhaseInterceptorChain chain = setupOutChain(requestContext, message, invokingContext);
 
+        PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
+
+        message.setInterceptorChain(chain);
+        modifyChain(chain, requestContext);
+        chain.setFaultObserver(outFaultObserver);
         // setup conduit selector
         prepareConduitSelector(message);
 
         modifyChain(chain, null);
         // execute chain
-        
+
         chain.doIntercept(message);
-                
-        //it will close all the stream in the message, so we do not call it  
+
+        //it will close all the stream in the message, so we do not call it
         //getConduitSelector().complete(exchange);
-                
+
         // Check to see if there is a Fault from the outgoing chain
         Exception ex = message.getContent(Exception.class);
 
@@ -150,25 +153,25 @@
         }
 
         if (!exchange.isOneWay()) {
-            
+
             synchronized (exchange) {
                 waitResponse(exchange);
             }
             ex = getException(exchange);
-            
+
             if (ex != null) {
                 if (LOG.isLoggable(Level.FINE)) {
                     LOG.fine("Exception in incoming chain: " + ex.toString());
                 }
                 throw ex;
             }
-            retval = invokingContext.getResponseObject(exchange, responseContext);  
-            
+            retval = invokingContext.getResponseObject(exchange, responseContext);
+
         }
 
         return retval;
     }
-    
+
     private void waitResponse(Exchange exchange) {
         int remaining = synchronousTimeout;
         while (!Boolean.TRUE.equals(exchange.get(FINISHED)) && remaining > 0)
{
@@ -187,59 +190,7 @@
         }
     }
 
-    public void onMessage(Message message) {
-        Exchange exchange = message.getExchange();
-        
-        
-        if (LOG.isLoggable(Level.FINE)) {
-            LOG.fine("call the cxf client on message , exchange is " + exchange);
-        }    
-        
-        try {
-        	
-        	if (exchange.get(InvokingContext.class) == null) {
-                super.onMessage(message);
-            } else {
-
-                message = getEndpoint().getBinding().createMessage(message);
-                message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
-                message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
-                            
-                exchange.put(Binding.class, getEndpoint().getBinding());
-                BindingOperationInfo bi = exchange.get(BindingOperationInfo.class);     
  
-                if (bi != null) {
-                    //Set The OutputMessage
-                    exchange.put(BindingMessageInfo.class, bi.getOutput());
-                }
-                InvokingContext invokingContext = exchange.get(InvokingContext.class);
-                assert invokingContext != null;
-
-                // setup interceptor chain
-                PhaseInterceptorChain chain = invokingContext.getResponseInInterceptorChain(exchange);
-                message.setInterceptorChain(chain);
-                modifyChain(chain, null);
-                // execute chain
-                chain.doIntercept(message);
-
-                // set inMessage in the exchange
-                exchange.setInMessage(message);
-            }
-        	
-        } finally { 
-        	synchronized (message.getExchange()) {
-        		if (!isPartialResponse(message)) {
-        			message.getExchange().put(FINISHED, Boolean.TRUE);
-        			message.getExchange().setInMessage(message);
-        			message.getExchange().notifyAll();
-        		}
-        	}
-        }
-    }
 
-    private boolean isPartialResponse(Message in) {
-        return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE));
-    }
-    
     private Message prepareMessage(Exchange exchange, Map<String, Object> requestContext,
             Object param, InvokingContext InvokingContext) {
 
@@ -264,26 +215,10 @@
         return message;
     }
 
-    private PhaseInterceptorChain setupOutChain(Map<String, Object> requestContext,
-            Message message, InvokingContext invokingContext) {
-
-        if (LOG.isLoggable(Level.FINEST)) {
-            LOG.finest("Build an out interceptor chain to send request to server");
-        }
-        Exchange exchange = message.getExchange();
-        PhaseInterceptorChain chain = invokingContext.getRequestOutInterceptorChain(exchange);
-        message.setInterceptorChain(chain);
-        modifyChain(chain, requestContext);
-        chain.setFaultObserver(outFaultObserver);
-        return chain;
-    }
-    
-   
-    
     public Endpoint getEndpoint() {
         return endpoint;
     }
-    
+
     public Bus getBus() {
         return bus;
     }

Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java?rev=629000&r1=628999&r2=629000&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java
(original)
+++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java
Mon Feb 18 22:22:57 2008
@@ -25,18 +25,9 @@
 import org.apache.cxf.phase.PhaseInterceptorChain;
 
 public interface InvokingContext {
-    /**
-     * This method is called when an request from a (routing) client is observed
-     * at the router's transport (inbound to the router from a client).  It will 
-     * return an "in" interceptor chain that will allow the appropriate routing 
-     * interceptor to receive and handle the message.
-     * @param exchange
-     * @return in interceptor chain
-     */
-    PhaseInterceptorChain getRequestInInterceptorChain(Exchange exchange);
 
     /**
-     * This method is called when the router is preparing an outbound message 
+     * This method is called when the router is preparing an outbound message
      * (orignated from the router's client) to be sent to the target CXF server.
      * It sets the content in the given (out) message object.
      * @param content
@@ -44,34 +35,6 @@
     void setRequestOutMessageContent(Message message, Object content);
 
     /**
-     * This method is called when a response from a CXF server is observed at the
-     * router's transport (inbound to the router from a server).  It will return an
-     * "in" interceptor chain that will allow the response to be returned to the 
-     * involved routing interceptor (with the appropriate interceptors in between).
-     * @param exchange
-     * @return in interceptor chain
-     */
-    PhaseInterceptorChain getResponseInInterceptorChain(Exchange exchange);
-
-    /**
-     * This method is called when the router is ready to forward a request from a client
-     * to the target CXF server.  It returns an "out" intercetptor chain that will deliver

-     * the request message to the CXF server.
-     * @param exchange
-     * @return out interceptor chain
-     */
-    PhaseInterceptorChain getRequestOutInterceptorChain(Exchange exchange);
-
-    /**
-     * This method is called when the router is ready to forward a response from a CXF
-     * to the client who has made the request. It returns an "out" interceptor chain that

-     * will deliver the response message to the client.
-     * @param exchange
-     * @return out interceptor chain
-     */
-    PhaseInterceptorChain getResponseOutInterceptorChain(Exchange exchange);
-
-    /**
      * This method is call when the CxfClient receives a response from a CXF server and needs
      * to extract the response object from the message.
      * @param exchange
@@ -79,14 +42,6 @@
      * @return response object
      */
     Object getResponseObject(Exchange exchange, Map<String, Object> responseContext);
-    
-    /**
-     * This method is called to set the fault observers on the endpoint that are specified
-     * to the phases meaningful to the routing context.
-     * @param endpointImpl
-     * @param bus
-     */
-    void setEndpointFaultObservers(EndpointImpl endpointImpl, Bus bus);
 
     /**
      * This method is called when the routing interceptor has received a response message
@@ -96,11 +51,11 @@
      * @param resultPayload
      */
     void setResponseContent(Message outMessage, Object resultPayload);
-    
+
     /**
      * This method is called when the routing interceptor has intercepted a message from
      * the client and needs to extract the request content from the message.  It retreives
-     * and receives the request content from the incoming message. 
+     * and receives the request content from the incoming message.
      * @param inMessage
      * @return the request from client
      */



Mime
View raw message