Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 65951 invoked from network); 19 Feb 2008 06:23:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Feb 2008 06:23:28 -0000 Received: (qmail 80021 invoked by uid 500); 19 Feb 2008 06:23:22 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 80009 invoked by uid 500); 19 Feb 2008 06:23:22 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 80000 invoked by uid 99); 19 Feb 2008 06:23:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Feb 2008 22:23:22 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Feb 2008 06:22:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 076AA1A9832; Mon, 18 Feb 2008 22:23:06 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r629000 - in /activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf: ./ feature/ interceptors/ invoker/ Date: Tue, 19 Feb 2008 06:23:04 -0000 To: camel-commits@activemq.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080219062307.076AA1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Mon Feb 18 22:22:57 2008 New Revision: 629000 URL: http://svn.apache.org/viewvc?rev=629000&view=rev Log: CAMEL-339 using the feature to setup the client side interceptors Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java Mon Feb 18 22:22:57 2008 @@ -99,7 +99,6 @@ outMessage = endpoint.getBinding().createMessage(); } exchange.setOutMessage(outMessage); - outMessage.setExchange(exchange); } } @@ -149,8 +148,8 @@ exchange.get(Service.class).get(MethodDispatcher.class.getName()); Method m = md.getMethod(bop); - - if (bop.getOperationInfo().isOneWay()) { + // The SEI could be the provider class which will not have the bop information. + if (bop != null && bop.getOperationInfo().isOneWay()) { cxfExchange.setPattern(ExchangePattern.InOnly); } else { cxfExchange.setPattern(ExchangePattern.InOut); Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java Mon Feb 18 22:22:57 2008 @@ -24,6 +24,8 @@ import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.cxf.feature.MessageDataFormatFeature; +import org.apache.camel.component.cxf.feature.PayLoadDataFormatFeature; import org.apache.camel.component.cxf.invoker.CxfClient; import org.apache.camel.component.cxf.invoker.CxfClientFactoryBean; import org.apache.camel.component.cxf.invoker.InvokingContext; @@ -37,6 +39,7 @@ import org.apache.cxf.common.classloader.ClassLoaderUtils; import org.apache.cxf.endpoint.Client; import org.apache.cxf.endpoint.Endpoint; +import org.apache.cxf.feature.AbstractFeature; import org.apache.cxf.frontend.ClientFactoryBean; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; @@ -84,6 +87,19 @@ boolean jsr181Enabled = CxfEndpointUtils.hasWebServiceAnnotation(serviceClass); cfb.setJSR181Enabled(jsr181Enabled); + dataFormat = CxfEndpointUtils.getDataFormat(endpoint); + List features = new ArrayList(); + if (dataFormat.equals(DataFormat.MESSAGE)) { + features.add(new MessageDataFormatFeature()); + //features.add(new LoggingFeature()); + } + if (dataFormat.equals(DataFormat.PAYLOAD)) { + features.add(new PayLoadDataFormatFeature()); + //features.add(new LoggingFeature()); + } + cfb.setFeatures(features); + + return createClientFormClientFactoryBean(cfb); } @@ -209,7 +225,6 @@ CxfClient cxfClient = (CxfClient) client; // invoke the message //TODO need setup the call context here - //TODO need to handle the one way message Object result = cxfClient.dispatch(params, null, ex); // need to get the binding object to create the message BindingOperationInfo boi = ex.get(BindingOperationInfo.class); Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/MessageInvokerInterceptor.java Mon Feb 18 22:22:57 2008 @@ -52,6 +52,7 @@ if (!exchange.isOneWay()) { Endpoint ep = exchange.get(Endpoint.class); Message outMessage = runableEx.getOutMessage(); + copyJaxwsProperties(message, outMessage); if (outMessage == null) { outMessage = ep.getBinding().createMessage(); exchange.setOutMessage(outMessage); @@ -79,6 +80,14 @@ */ private Executor getExecutor(final Endpoint endpoint) { return endpoint.getService().getExecutor(); + } + + private void copyJaxwsProperties(Message inMsg, Message outMsg) { + outMsg.put(Message.WSDL_OPERATION, inMsg.get(Message.WSDL_OPERATION)); + outMsg.put(Message.WSDL_SERVICE, inMsg.get(Message.WSDL_SERVICE)); + outMsg.put(Message.WSDL_INTERFACE, inMsg.get(Message.WSDL_INTERFACE)); + outMsg.put(Message.WSDL_PORT, inMsg.get(Message.WSDL_PORT)); + outMsg.put(Message.WSDL_DESCRIPTION, inMsg.get(Message.WSDL_DESCRIPTION)); } } Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/AbstractInvokerInterceptor.java Mon Feb 18 22:22:57 2008 @@ -57,7 +57,7 @@ if (isRequestor(inMessage)) { return; } - + /* Exchange exchange = inMessage.getExchange(); Message outMessage = null; try { @@ -94,7 +94,7 @@ //Initiate the OutBound Chain. chain.doIntercept(outMessage); - } + }*/ } /** Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/CxfClient.java Mon Feb 18 22:22:57 2008 @@ -56,47 +56,47 @@ private static final Logger LOG = Logger.getLogger(CxfClient.class.getName()); private Endpoint endpoint; - + public CxfClient(Bus b, Endpoint e) { super(b, e); - endpoint = e; + endpoint = e; } - - public Object dispatch(Object params, + + public Object dispatch(Object params, Map context, Exchange exchange) throws Exception { - + Object retval = null; InvokingContext invokingContext = exchange.get(InvokingContext.class); assert invokingContext != null; // get InBound binding operation info from the exchange object BindingOperationInfo inBoundOp = exchange.get(BindingOperationInfo.class); - + BindingOperationInfo outBoundOp = null; if (inBoundOp != null) { //Get the BindingOperationInfo for the outbound binding. BindingInfo bi = getEndpoint().getEndpointInfo().getBinding(); outBoundOp = bi.getOperation(inBoundOp.getOperationInfo().getName()); - if (outBoundOp != null + if (outBoundOp != null && inBoundOp.isUnwrapped()) { outBoundOp = outBoundOp.getUnwrappedOperation(); } } - - + + retval = invokeWithMessageStream(outBoundOp, params, context, invokingContext); - + return retval; - - + + } - + @SuppressWarnings("unchecked") - public Object invokeWithMessageStream(BindingOperationInfo bi, - Object param, + public Object invokeWithMessageStream(BindingOperationInfo bi, + Object param, Map context, InvokingContext invokingContext) throws Exception { @@ -120,25 +120,28 @@ if (bi != null) { //Set The InputMessage exchange.put(BindingOperationInfo.class, bi); - exchange.put(BindingMessageInfo.class, bi.getInput()); + exchange.put(BindingMessageInfo.class, bi.getInput()); exchange.setOneWay(bi.getOperationInfo().isOneWay()); } Message message = prepareMessage(exchange, requestContext, param, invokingContext); - - PhaseInterceptorChain chain = setupOutChain(requestContext, message, invokingContext); + PhaseInterceptorChain chain = setupInterceptorChain(endpoint); + + message.setInterceptorChain(chain); + modifyChain(chain, requestContext); + chain.setFaultObserver(outFaultObserver); // setup conduit selector prepareConduitSelector(message); modifyChain(chain, null); // execute chain - + chain.doIntercept(message); - - //it will close all the stream in the message, so we do not call it + + //it will close all the stream in the message, so we do not call it //getConduitSelector().complete(exchange); - + // Check to see if there is a Fault from the outgoing chain Exception ex = message.getContent(Exception.class); @@ -150,25 +153,25 @@ } if (!exchange.isOneWay()) { - + synchronized (exchange) { waitResponse(exchange); } ex = getException(exchange); - + if (ex != null) { if (LOG.isLoggable(Level.FINE)) { LOG.fine("Exception in incoming chain: " + ex.toString()); } throw ex; } - retval = invokingContext.getResponseObject(exchange, responseContext); - + retval = invokingContext.getResponseObject(exchange, responseContext); + } return retval; } - + private void waitResponse(Exchange exchange) { int remaining = synchronousTimeout; while (!Boolean.TRUE.equals(exchange.get(FINISHED)) && remaining > 0) { @@ -187,59 +190,7 @@ } } - public void onMessage(Message message) { - Exchange exchange = message.getExchange(); - - - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("call the cxf client on message , exchange is " + exchange); - } - - try { - - if (exchange.get(InvokingContext.class) == null) { - super.onMessage(message); - } else { - - message = getEndpoint().getBinding().createMessage(message); - message.put(Message.REQUESTOR_ROLE, Boolean.TRUE); - message.put(Message.INBOUND_MESSAGE, Boolean.TRUE); - - exchange.put(Binding.class, getEndpoint().getBinding()); - BindingOperationInfo bi = exchange.get(BindingOperationInfo.class); - if (bi != null) { - //Set The OutputMessage - exchange.put(BindingMessageInfo.class, bi.getOutput()); - } - InvokingContext invokingContext = exchange.get(InvokingContext.class); - assert invokingContext != null; - - // setup interceptor chain - PhaseInterceptorChain chain = invokingContext.getResponseInInterceptorChain(exchange); - message.setInterceptorChain(chain); - modifyChain(chain, null); - // execute chain - chain.doIntercept(message); - - // set inMessage in the exchange - exchange.setInMessage(message); - } - - } finally { - synchronized (message.getExchange()) { - if (!isPartialResponse(message)) { - message.getExchange().put(FINISHED, Boolean.TRUE); - message.getExchange().setInMessage(message); - message.getExchange().notifyAll(); - } - } - } - } - private boolean isPartialResponse(Message in) { - return Boolean.TRUE.equals(in.get(Message.PARTIAL_RESPONSE_MESSAGE)); - } - private Message prepareMessage(Exchange exchange, Map requestContext, Object param, InvokingContext InvokingContext) { @@ -264,26 +215,10 @@ return message; } - private PhaseInterceptorChain setupOutChain(Map requestContext, - Message message, InvokingContext invokingContext) { - - if (LOG.isLoggable(Level.FINEST)) { - LOG.finest("Build an out interceptor chain to send request to server"); - } - Exchange exchange = message.getExchange(); - PhaseInterceptorChain chain = invokingContext.getRequestOutInterceptorChain(exchange); - message.setInterceptorChain(chain); - modifyChain(chain, requestContext); - chain.setFaultObserver(outFaultObserver); - return chain; - } - - - public Endpoint getEndpoint() { return endpoint; } - + public Bus getBus() { return bus; } Modified: activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java?rev=629000&r1=628999&r2=629000&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java (original) +++ activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/invoker/InvokingContext.java Mon Feb 18 22:22:57 2008 @@ -25,18 +25,9 @@ import org.apache.cxf.phase.PhaseInterceptorChain; public interface InvokingContext { - /** - * This method is called when an request from a (routing) client is observed - * at the router's transport (inbound to the router from a client). It will - * return an "in" interceptor chain that will allow the appropriate routing - * interceptor to receive and handle the message. - * @param exchange - * @return in interceptor chain - */ - PhaseInterceptorChain getRequestInInterceptorChain(Exchange exchange); /** - * This method is called when the router is preparing an outbound message + * This method is called when the router is preparing an outbound message * (orignated from the router's client) to be sent to the target CXF server. * It sets the content in the given (out) message object. * @param content @@ -44,34 +35,6 @@ void setRequestOutMessageContent(Message message, Object content); /** - * This method is called when a response from a CXF server is observed at the - * router's transport (inbound to the router from a server). It will return an - * "in" interceptor chain that will allow the response to be returned to the - * involved routing interceptor (with the appropriate interceptors in between). - * @param exchange - * @return in interceptor chain - */ - PhaseInterceptorChain getResponseInInterceptorChain(Exchange exchange); - - /** - * This method is called when the router is ready to forward a request from a client - * to the target CXF server. It returns an "out" intercetptor chain that will deliver - * the request message to the CXF server. - * @param exchange - * @return out interceptor chain - */ - PhaseInterceptorChain getRequestOutInterceptorChain(Exchange exchange); - - /** - * This method is called when the router is ready to forward a response from a CXF - * to the client who has made the request. It returns an "out" interceptor chain that - * will deliver the response message to the client. - * @param exchange - * @return out interceptor chain - */ - PhaseInterceptorChain getResponseOutInterceptorChain(Exchange exchange); - - /** * This method is call when the CxfClient receives a response from a CXF server and needs * to extract the response object from the message. * @param exchange @@ -79,14 +42,6 @@ * @return response object */ Object getResponseObject(Exchange exchange, Map responseContext); - - /** - * This method is called to set the fault observers on the endpoint that are specified - * to the phases meaningful to the routing context. - * @param endpointImpl - * @param bus - */ - void setEndpointFaultObservers(EndpointImpl endpointImpl, Bus bus); /** * This method is called when the routing interceptor has received a response message @@ -96,11 +51,11 @@ * @param resultPayload */ void setResponseContent(Message outMessage, Object resultPayload); - + /** * This method is called when the routing interceptor has intercepted a message from * the client and needs to extract the request content from the message. It retreives - * and receives the request content from the incoming message. + * and receives the request content from the incoming message. * @param inMessage * @return the request from client */