Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E046E200BE4 for ; Wed, 21 Dec 2016 18:01:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DED62160B18; Wed, 21 Dec 2016 17:01:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB150160B3A for ; Wed, 21 Dec 2016 18:01:01 +0100 (CET) Received: (qmail 52709 invoked by uid 500); 21 Dec 2016 17:01:00 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 52592 invoked by uid 99); 21 Dec 2016 17:01:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Dec 2016 17:01:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73CB1DFBAD; Wed, 21 Dec 2016 17:01:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 21 Dec 2016 17:01:02 -0000 Message-Id: <00c49ded7fdf4ab5a0e5b732f92ac574@git.apache.org> In-Reply-To: <3c697c5d7b5249ea8646488831f6882c@git.apache.org> References: <3c697c5d7b5249ea8646488831f6882c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] camel git commit: Add async support for camel-cxf JAX-RS producer archived-at: Wed, 21 Dec 2016 17:01:04 -0000 Add async support for camel-cxf JAX-RS producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/208e21c5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/208e21c5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/208e21c5 Branch: refs/heads/master Commit: 208e21c5a65ad2b561e8856f0a22949d7e7c4b80 Parents: 0b20682 Author: jpoth Authored: Tue Nov 22 19:07:26 2016 +0100 Committer: Claus Ibsen Committed: Wed Dec 21 18:00:48 2016 +0100 ---------------------------------------------------------------------- .../camel/component/cxf/jaxrs/CxfRsBinding.java | 12 + .../component/cxf/jaxrs/CxfRsEndpoint.java | 8 +- .../component/cxf/jaxrs/CxfRsProducer.java | 352 ++++++++++++- .../cxf/jaxrs/DefaultCxfRsBinding.java | 16 + .../jaxrs/CxfRsAsyncProducerSessionTest.java | 112 +++++ .../cxf/jaxrs/CxfRsAsyncProducerTest.java | 490 +++++++++++++++++++ .../cxf/jaxrs/CxfRsAsyncRelayTest.java | 147 ++++++ .../CxfRsBindingConfigurationSelectionTest.java | 6 + .../component/cxf/jaxrs/CxfRsProducerTest.java | 20 +- .../cxf/jaxrs/CxfRsSslAsyncProducerTest.java | 99 ++++ .../cxf/jaxrs/CxfRsSpringAsyncProducer.xml | 71 +++ .../jaxrs/CxfRsSpringAsyncProducerSession.xml | 90 ++++ .../cxf/jaxrs/CxfRsSpringAsyncRelay.xml | 47 ++ .../component/cxf/jaxrs/CxfRsSpringProducer.xml | 4 +- .../CxfRsSpringProducerAddressOverride.xml | 4 +- .../cxf/jaxrs/CxfRsSpringProducerSession.xml | 24 +- .../component/cxf/jaxrs/CxfRsSpringRelay.xml | 2 +- .../component/cxf/jaxrs/CxfRsSpringRouter.xml | 2 +- .../cxf/jaxrs/CxfRsSpringSslAsyncProducer.xml | 94 ++++ .../cxf/jaxrs/CxfRsSpringSslProducer.xml | 8 +- 20 files changed, 1558 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java index 50ca905..17b68ca 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsBinding.java @@ -19,6 +19,8 @@ package org.apache.camel.component.cxf.jaxrs; import java.lang.reflect.Method; import java.util.Map; +import javax.ws.rs.client.AsyncInvoker; +import javax.ws.rs.client.Entity; import javax.ws.rs.core.MultivaluedMap; import org.apache.cxf.message.Exchange; @@ -104,4 +106,14 @@ public interface CxfRsBinding { org.apache.camel.Exchange camelExchange) throws Exception; + /** + * Bind the Camel message to a request {@link Entity} that gets passed to {@link AsyncInvoker#method(java.lang.String, javax.ws.rs.client.Entity, javax.ws.rs.client.InvocationCallback)}. + * + * @param camelMessage the source message + * @param camelExchange the Camel exchange + * @param body the message body + * @throws Exception can be thrown if error in the binding process + * @return the {@link Entity} to use + */ + Entity bindCamelMessageToRequestEntity(Object body, org.apache.camel.Message camelMessage, org.apache.camel.Exchange camelExchange) throws Exception ; } http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java index 3435d20..822a07c 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsEndpoint.java @@ -34,6 +34,7 @@ import org.apache.camel.Service; import org.apache.camel.component.cxf.NullFaultListener; import org.apache.camel.http.common.cookie.CookieHandler; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.spi.UriEndpoint; @@ -195,7 +196,12 @@ public class CxfRsEndpoint extends DefaultEndpoint implements HeaderFilterStrate if (bindingStyle == BindingStyle.SimpleConsumer) { throw new IllegalArgumentException("The SimpleConsumer Binding Style cannot be used in a camel-cxfrs producer"); } - return new CxfRsProducer(this); + final CxfRsProducer cxfRsProducer = new CxfRsProducer(this); + if (isSynchronous()) { + return new SynchronousDelegateProducer(cxfRsProducer); + } else { + return cxfRsProducer; + } } public boolean isSingleton() { http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java index 8ff54c3..50aece0 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java @@ -30,9 +30,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.ResponseProcessingException; +import javax.ws.rs.core.GenericType; import javax.ws.rs.core.NewCookie; import javax.ws.rs.core.Response; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -57,7 +64,7 @@ import org.slf4j.LoggerFactory; * JAXRS client, it will turn the normal Object invocation to a RESTful request * according to resource annotation. Any response will be bound to Camel exchange. */ -public class CxfRsProducer extends DefaultProducer { +public class CxfRsProducer extends DefaultProducer implements AsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(CxfRsProducer.class); @@ -96,7 +103,122 @@ public class CxfRsProducer extends DefaultProducer { invokeProxyClient(exchange); } } - + + public boolean process(Exchange exchange, AsyncCallback callback) { + try { + Message inMessage = exchange.getIn(); + Boolean httpClientAPI = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class); + // set the value with endpoint's option + if (httpClientAPI == null) { + httpClientAPI = ((CxfRsEndpoint) getEndpoint()).isHttpClientAPI(); + } + if (httpClientAPI.booleanValue()) { + invokeAsyncHttpClient(exchange, callback); + } else { + invokeAsyncProxyClient(exchange, callback); + } + return false; + } catch (Exception exception) { + LOG.error("Error invoking request", exception); + exchange.setException(exception); + callback.done(true); + return true; + } + } + + protected void invokeAsyncHttpClient(Exchange exchange, final AsyncCallback callback) throws Exception { + Message inMessage = exchange.getIn(); + JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfEndpointUtils + .getEffectiveAddress(exchange, ((CxfRsEndpoint) getEndpoint()).getAddress())); + Bus bus = ((CxfRsEndpoint) getEndpoint()).getBus(); + // We need to apply the bus setting from the CxfRsEndpoint which is not use the default bus + if (bus != null) { + cfb.setBus(bus); + } + WebClient client = cfb.createWebClient(); + ((CxfRsEndpoint) getEndpoint()).getChainedCxfRsEndpointConfigurer().configureClient(client); + String httpMethod = inMessage.getHeader(Exchange.HTTP_METHOD, String.class); + Class responseClass = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Class.class); + Type genericType = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_GENERIC_TYPE, Type.class); + Object[] pathValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); + String path = inMessage.getHeader(Exchange.HTTP_PATH, String.class); + + if (LOG.isTraceEnabled()) { + LOG.trace("HTTP method = {}", httpMethod); + LOG.trace("path = {}", path); + LOG.trace("responseClass = {}", responseClass); + } + + // set the path + if (path != null) { + if (ObjectHelper.isNotEmpty(pathValues) && pathValues.length > 0) { + client.path(path, pathValues); + } else { + client.path(path); + } + } + + CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint(); + CxfRsBinding binding = cxfRsEndpoint.getBinding(); + Object body = getBody(exchange, inMessage, httpMethod, cxfRsEndpoint, binding); + setupClientMatrix(client, exchange); + setupClientQueryAndHeaders(client, exchange); + + //Build message entity + Entity entity = binding.bindCamelMessageToRequestEntity(body, inMessage, exchange); + + // handle cookies + CookieHandler cookieHandler = ((CxfRsEndpoint)getEndpoint()).getCookieHandler(); + loadCookies(exchange, client, cookieHandler); + + // invoke the client + client.async().method(httpMethod, entity, new CxfInvocationCallback(client, exchange, cxfRsEndpoint, responseClass, callback, genericType)); + } + + protected void invokeAsyncProxyClient(Exchange exchange, final AsyncCallback callback) throws Exception { + Message inMessage = exchange.getIn(); + Object[] varValues = inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class); + String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME, String.class); + Client target; + + JAXRSClientFactoryBean cfb = clientFactoryBeanCache.get(CxfEndpointUtils + .getEffectiveAddress(exchange, ((CxfRsEndpoint)getEndpoint()).getAddress())); + Bus bus = ((CxfRsEndpoint)getEndpoint()).getBus(); + // We need to apply the bus setting from the CxfRsEndpoint which is not use the default bus + if (bus != null) { + cfb.setBus(bus); + } + if (varValues == null) { + target = cfb.create(); + } else { + target = cfb.createWithValues(varValues); + } + + setupClientHeaders(target, exchange); + + // find out the method which we want to invoke + JAXRSServiceFactoryBean sfb = cfb.getServiceFactory(); + sfb.getResourceClasses(); + // check the null body first + Object[] parameters = null; + if (inMessage.getBody() != null) { + parameters = inMessage.getBody(Object[].class); + } + // get the method + Method method = findRightMethod(sfb.getResourceClasses(), methodName, getParameterTypes(parameters)); + + CxfRsEndpoint cxfRsEndpoint = (CxfRsEndpoint) getEndpoint(); + final CxfProxyInvocationCallback invocationCallback = new CxfProxyInvocationCallback(target, exchange, cxfRsEndpoint, callback); + WebClient.getConfig(target).getRequestContext().put(InvocationCallback.class.getName(), invocationCallback); + + // handle cookies + CookieHandler cookieHandler = ((CxfRsEndpoint)getEndpoint()).getCookieHandler(); + loadCookies(exchange, target, cookieHandler); + + method.invoke(target, parameters); + } + + @SuppressWarnings("unchecked") protected void setupClientQueryAndHeaders(WebClient client, Exchange exchange) throws Exception { Message inMessage = exchange.getIn(); @@ -192,19 +314,7 @@ public class CxfRsProducer extends DefaultProducer { CxfRsBinding binding = cxfRsEndpoint.getBinding(); - // set the body - Object body = null; - if (!"GET".equals(httpMethod)) { - // need to check the request object if the http Method is not GET - if ("DELETE".equals(httpMethod) && cxfRsEndpoint.isIgnoreDeleteMethodMessageBody()) { - // just ignore the message body if the ignoreDeleteMethodMessageBody is true - } else { - body = binding.bindCamelMessageBodyToRequestBody(inMessage, exchange); - if (LOG.isTraceEnabled()) { - LOG.trace("Request body = " + body); - } - } - } + Object body = getBody(exchange, inMessage, httpMethod, cxfRsEndpoint, binding); setupClientMatrix(client, exchange); @@ -507,7 +617,217 @@ public class CxfRsProducer extends DefaultProducer { return answer; } - + + private Object getBody(Exchange exchange, Message inMessage, String httpMethod, CxfRsEndpoint cxfRsEndpoint, CxfRsBinding binding) throws Exception { + Object body = null; + if (!"GET".equals(httpMethod)) { + // need to check the request object if the http Method is not GET + if ("DELETE".equals(httpMethod) && cxfRsEndpoint.isIgnoreDeleteMethodMessageBody()) { + // just ignore the message body if the ignoreDeleteMethodMessageBody is true + } else { + body = binding.bindCamelMessageBodyToRequestBody(inMessage, exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Request body = " + body); + } + } + } + return body; + } + private final class CxfInvocationCallback implements InvocationCallback { + + private final Exchange exchange; + private final CxfRsEndpoint cxfRsEndpoint; + private final Class responseClass; + private final AsyncCallback callback; + private final Type genericType; + private final Client client; + + private CxfInvocationCallback(Client client, Exchange exchange, CxfRsEndpoint cxfRsEndpoint, Class responseClass, AsyncCallback callback, Type genericType) { + this.exchange = exchange; + this.cxfRsEndpoint = cxfRsEndpoint; + this.responseClass = responseClass; + this.callback = callback; + this.genericType = genericType; + this.client = client; + } + + + @Override + public void completed(Response response) { + try { + if (shouldHandleError(response)) { + handleError(response); + return; + } + // handle cookies + saveCookies(exchange, client, cxfRsEndpoint.getCookieHandler()); + if (!exchange.getPattern().isOutCapable()) { + return; + } + + LOG.trace("Response body = {}", response); + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + final CxfRsBinding binding = cxfRsEndpoint.getBinding(); + exchange.getOut().getHeaders().putAll(binding.bindResponseHeadersToCamelHeaders(response, exchange)); + + if (genericType != null && !genericType.equals(Void.TYPE)) { + GenericType genericType = new GenericType(this.genericType); + exchange.getOut().setBody(binding.bindResponseToCamelBody(response.readEntity(genericType), exchange)); + } else if (responseClass != null && !responseClass.equals(Void.TYPE)) { + exchange.getOut().setBody(binding.bindResponseToCamelBody(response.readEntity(responseClass), exchange)); + } else { + exchange.getOut().setBody(binding.bindResponseToCamelBody(response, exchange)); + } + exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, response.getStatus()); + } catch (Exception exception) { + LOG.error("Error while processing response", exception); + fail(exception); + } finally { + callback.done(false); + } + } + + @Override + public void failed(Throwable throwable) { + LOG.error("Failed request ", throwable); + try { + // handle cookies + saveCookies(exchange, client, cxfRsEndpoint.getCookieHandler()); + fail(throwable); + } catch (Exception error) { + LOG.error("Error while processing failed request", error); + } finally { + callback.done(false); + } + } + + private void fail(Throwable throwable) { + if (throwable.getClass().isInstance(WebApplicationException.class)) { + final WebApplicationException cast = WebApplicationException.class.cast(throwable); + final Response response = cast.getResponse(); + if (shouldHandleError(response)) { + handleError(response); + } + } else if (throwable.getClass().isInstance(ResponseProcessingException.class)) { + final ResponseProcessingException cast = ResponseProcessingException.class.cast(throwable); + final Response response = cast.getResponse(); + if (shouldHandleError(response)) { + handleError(response); + } + } else { + exchange.setException(throwable); + } + } + + private boolean shouldHandleError(Response response) { + //Throw exception on a response > 207 + //http://en.wikipedia.org/wiki/List_of_HTTP_status_codes + if (response != null && throwException) { + Integer respCode = response.getStatus(); + if (respCode > 207) { + return true; + } + } + return false; + } + + private void handleError(Response response) { + exchange.setException(populateCxfRsProducerException(exchange, response, response.getStatus())); + } + } + + private final class CxfProxyInvocationCallback implements InvocationCallback { + + private final Exchange exchange; + private final CxfRsEndpoint cxfRsEndpoint; + private final AsyncCallback callback; + private final Client client; + + private CxfProxyInvocationCallback(Client client, Exchange exchange, CxfRsEndpoint cxfRsEndpoint, AsyncCallback callback) { + this.exchange = exchange; + this.cxfRsEndpoint = cxfRsEndpoint; + this.callback = callback; + this.client = client; + } + + @Override + public void completed(Object body) { + try { + Response response = client.getResponse(); + // handle cookies + saveCookies(exchange, client, cxfRsEndpoint.getCookieHandler()); + //handle error + if (shouldHandleError(response)) { + handleError(response); + return; + } + if (!exchange.getPattern().isOutCapable()) { + return; + } + + LOG.trace("Response body = {}", response); + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + final CxfRsBinding binding = cxfRsEndpoint.getBinding(); + exchange.getOut().getHeaders().putAll(binding.bindResponseHeadersToCamelHeaders(response, exchange)); + exchange.getOut().setBody(binding.bindResponseToCamelBody(body, exchange)); + exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, response.getStatus()); + } catch (Exception exception) { + LOG.error("Error while processing response", exception); + fail(exception); + } finally { + callback.done(false); + } + } + + @Override + public void failed(Throwable throwable) { + LOG.error("Failed request ", throwable); + try { + // handle cookies + saveCookies(exchange, client, cxfRsEndpoint.getCookieHandler()); + fail(throwable); + } catch (Exception error) { + LOG.error("Error while processing failed request", error); + } finally { + callback.done(false); + } + } + + private void fail(Throwable throwable) { + if (throwable.getClass().isInstance(WebApplicationException.class)) { + final WebApplicationException cast = WebApplicationException.class.cast(throwable); + final Response response = cast.getResponse(); + if (shouldHandleError(response)) { + handleError(response); + } + } else if (throwable.getClass().isInstance(ResponseProcessingException.class)) { + final ResponseProcessingException cast = ResponseProcessingException.class.cast(throwable); + final Response response = cast.getResponse(); + if (shouldHandleError(response)) { + handleError(response); + } + } else { + exchange.setException(throwable); + } + } + + private void handleError(Response response) { + exchange.setException(populateCxfRsProducerException(exchange, response, response.getStatus())); + } + + private boolean shouldHandleError(Response response) { + //Throw exception on a response > 207 + //http://en.wikipedia.org/wiki/List_of_HTTP_status_codes + if (response != null && throwException) { + Integer respCode = response.getStatus(); + if (respCode > 207) { + return true; + } + } + return false; + } + } + /** * Cache contains {@link org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean} */ http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java index 673ec6a..1658a94 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/DefaultCxfRsBinding.java @@ -21,11 +21,15 @@ import java.lang.reflect.Method; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import javax.security.auth.Subject; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Variant; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -183,6 +187,18 @@ public class DefaultCxfRsBinding implements CxfRsBinding, HeaderFilterStrategyAw return answer; } + public Entity bindCamelMessageToRequestEntity(Object body, Message camelMessage, Exchange camelExchange) throws Exception { + if (body == null) { + return null; + } + String contentType = camelMessage.getHeader(Exchange.CONTENT_TYPE, String.class); + if (contentType == null) { + contentType = MediaType.WILDCARD; + } + String contentEncoding = camelMessage.getHeader(Exchange.CONTENT_ENCODING, String.class); + return Entity.entity(body, new Variant(MediaType.valueOf(contentType), Locale.US, contentEncoding)); + } + /** * By default, we just return the response object. */ http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java new file mode 100644 index 0000000..4546aa0 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerSessionTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.component.cxf.common.message.CxfConstants; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class CxfRsAsyncProducerSessionTest extends CamelSpringTestSupport { + private static int port1 = CXFTestSupport.getPort1(); + private static int port2 = CXFTestSupport.getPort("CxfRsProducerSessionTest.jetty"); + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + public int getPort1() { + return port1; + } + + public int getPort2() { + return port2; + } + + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml"); + } + + @Test + public void testNoSessionProxy() { + String response = sendMessage("direct://proxy", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("New New World", response); + response = sendMessage("direct://proxy", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("New New World", response); + } + + @Test + public void testExchangeSessionProxy() { + String response = sendMessage("direct://proxyexchange", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("Old New World", response); + response = sendMessage("direct://proxyexchange", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("Old New World", response); + } + + @Test + public void testInstanceSession() { + String response = sendMessage("direct://proxyinstance", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("Old New World", response); + response = sendMessage("direct://proxyinstance", "World", Boolean.FALSE).getOut().getBody(String.class); + assertEquals("Old Old World", response); + // we do the instance tests for proxy and http in one test because order + // matters here + response = sendMessage("direct://httpinstance", "World", Boolean.TRUE).getOut().getBody(String.class); + assertEquals("Old Old World", response); + } + + @Test + public void testNoSessionHttp() { + String response = sendMessage("direct://http", "World", Boolean.TRUE).getOut().getBody(String.class); + assertEquals("New New World", response); + response = sendMessage("direct://http", "World", Boolean.TRUE).getOut().getBody(String.class); + assertEquals("New New World", response); + } + + @Test + public void testExchangeSessionHttp() { + String response = sendMessage("direct://httpexchange", "World", Boolean.TRUE).getOut().getBody(String.class); + assertEquals("Old New World", response); + response = sendMessage("direct://httpexchange", "World", Boolean.TRUE).getOut().getBody(String.class); + assertEquals("Old New World", response); + } + + private Exchange sendMessage(String endpoint, String body, Boolean httpApi) { + Exchange exchange = template.send(endpoint, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message inMessage = exchange.getIn(); + inMessage.setHeader(CxfConstants.OPERATION_NAME, "echo"); + inMessage.setHeader(Exchange.HTTP_METHOD, "POST"); + inMessage.setHeader(Exchange.HTTP_PATH, "/echoservice/echo"); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, httpApi); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, String.class); + inMessage.setHeader(Exchange.ACCEPT_CONTENT_TYPE, "application/json"); + inMessage.setBody(body); + } + }); + return exchange; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java new file mode 100644 index 0000000..b7ee1c4 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncProducerTest.java @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.Response; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.component.cxf.CxfOperationException; +import org.apache.camel.component.cxf.common.message.CxfConstants; +import org.apache.camel.component.cxf.jaxrs.testbean.Customer; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.apache.camel.util.CastUtils; +import org.apache.cxf.Bus; +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.Server; +import org.apache.cxf.feature.Feature; +import org.apache.cxf.interceptor.InterceptorProvider; +import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType; +import org.junit.Test; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; + +public class CxfRsAsyncProducerTest extends CamelSpringTestSupport { + private static int port1 = CXFTestSupport.getPort1(); + private static int port2 = CXFTestSupport.getPort("CxfRsAsyncProducerTest.jetty"); + + public static class JettyProcessor implements Processor { + public void process(Exchange exchange) throws Exception { + // check the query + Message inMessage = exchange.getIn(); + exchange.getOut().setBody(inMessage.getHeader(Exchange.HTTP_QUERY, String.class)); + } + } + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + public int getPort1() { + return port1; + } + public int getPort2() { + return port2; + } + + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml"); + } + + @Test + public void testGetCustomerWithClientProxyAPI() { + // START SNIPPET: ProxyExample + Exchange exchange = template.send("direct://proxy", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message inMessage = exchange.getIn(); + // set the operation name + inMessage.setHeader(CxfConstants.OPERATION_NAME, "getCustomer"); + // using the proxy client API + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE); + // set a customer header + inMessage.setHeader("key", "value"); + // setup the accept content type + inMessage.setHeader(Exchange.ACCEPT_CONTENT_TYPE, "application/json"); + // set the parameters , if you just have one parameter + // camel will put this object into an Object[] itself + inMessage.setBody("123"); + } + }); + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + + assertNotNull("The response should not be null ", response); + assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); + assertEquals("Get a wrong customer name", response.getName(), "John"); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + assertEquals("Get a wrong header value", "value", exchange.getOut().getHeader("key")); + // END SNIPPET: ProxyExample + } + + @Test + public void testGetCustomersWithClientProxyAPI() { + Exchange exchange = template.send("direct://proxy", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the operation name + inMessage.setHeader(CxfConstants.OPERATION_NAME, "getCustomers"); + // using the proxy client API + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE); + // camel will put this object into an Object[] itself + inMessage.setBody(null); + }); + + // get the response message + List response = CastUtils.cast((List) exchange.getOut().getBody()); + + assertNotNull("The response should not be null ", response); + assertTrue("Dan is missing!", response.contains(new Customer(113, "Dan"))); + assertTrue("John is missing!", response.contains(new Customer(123, "John"))); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testGetCustomersWithHttpCentralClientAPI() { + Exchange exchange = template.send("direct://proxy", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/"); + // using the proxy client API + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.TRUE); + // set the headers + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, List.class); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_GENERIC_TYPE, new ParameterizedCollectionType(Customer.class)); + // camel will put this object into an Object[] itself + inMessage.setBody(null); + }); + + // get the response message + List response = CastUtils.cast((List) exchange.getOut().getBody()); + + assertNotNull("The response should not be null ", response); + assertTrue("Dan is missing!", response.contains(new Customer(113, "Dan"))); + assertTrue("John is missing!", response.contains(new Customer(123, "John"))); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testGetCustomerWithHttpCentralClientAPI() { + Exchange exchange = template.send("direct://http", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // using the http central client API + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.TRUE); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/123"); + // Specify the response class , cxfrs will use InputStream as the response object type + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); + // set a customer header + inMessage.setHeader("key", "value"); + // since we use the Get method, so we don't need to set the message body + inMessage.setBody(null); + }); + + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + + assertNotNull("The response should not be null ", response); + assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); + assertEquals("Get a wrong customer name", response.getName(), "John"); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + assertEquals("Get a wrong header value", "value", exchange.getOut().getHeader("key")); + } + + @Test + public void testSuppressGetCustomerExceptionWithCxfRsEndpoint() { + Exchange exchange + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true&throwExceptionOnFailure=false", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message message = newExchange.getIn(); + // set the Http method + message.setHeader(Exchange.HTTP_METHOD, "PUT"); + // set the relative path + message.setHeader(Exchange.HTTP_PATH, "/customerservice/customers"); + // we just setup the customer with a wrong id + Customer customer = new Customer(); + customer.setId(222); + customer.setName("user"); + message.setBody(customer); + }); + + // we should get the exception here + assertNull("Don't expect the exception here", exchange.getException()); + Message result = exchange.getOut(); + assertEquals("Get a wrong http status code.", result.getHeader(Exchange.HTTP_RESPONSE_CODE), 406); + + + } + + @Test + public void testGetCustomerExceptionWithCxfRsEndpoint() { + Exchange exchange + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message message = newExchange.getIn(); + // set the Http method + message.setHeader(Exchange.HTTP_METHOD, "PUT"); + // set the relative path + message.setHeader(Exchange.HTTP_PATH, "/customerservice/customers"); + // we just setup the customer with a wrong id + Customer customer = new Customer(); + customer.setId(222); + customer.setName("user"); + message.setBody(customer); + }); + + // we should get the exception here + assertNotNull("Expect the exception here", exchange.getException()); + CxfOperationException exception = (CxfOperationException)exchange.getException(); + + assertEquals("Get a wrong response body", "Cannot find the customer!", exception.getResponseBody()); + + } + + @Test + public void testGetCustomerWithCxfRsEndpoint() { + Exchange exchange + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/123"); + // Specify the response class , cxfrs will use InputStream as the response object type + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); + // since we use the Get method, so we don't need to set the message body + inMessage.setBody(null); + }); + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + assertNotNull("The response should not be null ", response); + assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); + assertEquals("Get a wrong customer name", response.getName(), "John"); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testGetCustomerWithVariableReplacementAndCxfRsEndpoint() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/{customerId}"); + // Set variables for replacement + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, new String[] {"123"}); + // Specify the response class , cxfrs will use InputStream as the response object type + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); + // since we use the Get method, so we don't need to set the message body + inMessage.setBody(null); + }); + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + assertNotNull("The response should not be null ", response); + assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); + assertEquals("Get a wrong customer name", response.getName(), "John"); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testAddCustomerUniqueResponseCodeWithHttpClientAPI() { + Exchange exchange + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "POST"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customersUniqueResponseCode"); + // create a new customer object + Customer customer = new Customer(); + customer.setId(9999); + customer.setName("HttpClient"); + inMessage.setBody(customer); + }); + + // get the response message + Response response = (Response) exchange.getOut().getBody(); + assertNotNull("The response should not be null ", response); + assertNotNull("The response entity should not be null", response.getEntity()); + // check the response code + assertEquals("Get a wrong response code", 201, response.getStatus()); + // check the response code from message header + assertEquals("Get a wrong response code", 201, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testAddCustomerUniqueResponseCodeWithProxyAPI() { + Exchange exchange = template.send("direct://proxy", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message inMessage = exchange.getIn(); + // set the operation name + inMessage.setHeader(CxfConstants.OPERATION_NAME, "addCustomerUniqueResponseCode"); + // using the proxy client API + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE); + // set the parameters , if you just have one parameter + // camel will put this object into an Object[] itself + Customer customer = new Customer(); + customer.setId(8888); + customer.setName("ProxyAPI"); + inMessage.setBody(customer); + } + }); + + // get the response message + Response response = (Response) exchange.getOut().getBody(); + assertNotNull("The response should not be null ", response); + assertNotNull("The response entity should not be null", response.getEntity()); + // check the response code + assertEquals("Get a wrong response code", 201, response.getStatus()); + // check the response code from message header + assertEquals("Get a wrong response code", 201, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testAddCustomerUniqueResponseCode() { + Exchange exchange + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message inMessage = exchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "POST"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customersUniqueResponseCode"); + // put the response's entity into out message body + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); + // create a new customer object + Customer customer = new Customer(); + customer.setId(8888); + customer.setName("Willem"); + inMessage.setBody(customer); + } + }); + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + assertNotNull("The response should not be null ", response); + assertTrue("Get a wrong customer id ", response.getId() != 8888); + assertEquals("Get a wrong customer name", response.getName(), "Willem"); + assertEquals("Get a wrong response code", 201, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + } + + @Test + public void testProducerWithQueryParameters() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, InputStream.class); + inMessage.setBody(null); + }); + + // get the response message + String response = exchange.getOut().getBody(String.class); + assertNotNull("The response should not be null ", response); + assertEquals("The response value is wrong", "q1=12&q2=13", response); + } + + @Test + public void testProducerWithQueryParametersHeader() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message inMessage = newExchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, InputStream.class); + // override the parameter setting from URI + // START SNIPPET: QueryMapExample + Map queryMap = new LinkedHashMap<>(); + queryMap.put("q1", "new"); + queryMap.put("q2", "world"); + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_QUERY_MAP, queryMap); + // END SNIPPET: QueryMapExample + inMessage.setBody(null); + }); + + // get the response message + String response = exchange.getOut().getBody(String.class); + assertNotNull("The response should not be null ", response); + assertEquals("The response value is wrong", "q1=new&q2=world", response); + } + + + + @Test + public void testRestServerDirectlyGetCustomer() { + // we cannot convert directly to Customer as we need camel-jaxb + String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123", + null, Exchange.HTTP_METHOD, "GET", String.class); + + assertNotNull("The response should not be null ", response); + } + + @Test + public void testRestServerDirectlyAddCustomer() { + Customer input = new Customer(); + input.setName("Donald Duck"); + + // we cannot convert directly to Customer as we need camel-jaxb + String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers", + input, Exchange.HTTP_METHOD, "POST", String.class); + + assertNotNull(response); + assertTrue(response.endsWith("Donald Duck")); + } + + static class TestFeature implements Feature { + boolean initialized; + @Override + public void initialize(InterceptorProvider interceptorProvider, Bus bus) { + initialized = true; + } + @Override + public void initialize(Client client, Bus bus) { + //Do nothing + } + @Override + public void initialize(Server server, Bus bus) { + //Do nothing + } + @Override + public void initialize(Bus bus) { + //Do nothing + } + } + + @Test + public void testProducerWithFeature() { + TestFeature feature = context().getRegistry().lookupByNameAndType("testFeature", TestFeature.class); + + template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123?features=#myFeatures", + null, Exchange.HTTP_METHOD, "GET", String.class); + + assertTrue("The feature should be initialized", feature.initialized); + } + + @Test + public void testProducer422Response() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", newExchange -> { + newExchange.setPattern(ExchangePattern.InOut); + Message message = newExchange.getIn(); + // Try to create a new Customer with an invalid name + message.setHeader(Exchange.HTTP_METHOD, "POST"); + message.setHeader(Exchange.HTTP_PATH, "/customerservice/customers"); + Customer customer = new Customer(); + customer.setId(8888); + customer.setName(""); // will trigger a 422 response (a common REST server validation response code) + message.setBody(customer); + }); + + assertNotNull("Expect the exception here", exchange.getException()); + assertThat("Exception should be a CxfOperationException", exchange.getException(), instanceOf(CxfOperationException.class)); + + CxfOperationException cxfOperationException = CxfOperationException.class.cast(exchange.getException()); + + assertThat("CXF operation exception has correct response code", cxfOperationException.getStatusCode(), is(422)); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java new file mode 100644 index 0000000..11618c9 --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsAsyncRelayTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jws.WebMethod; +import javax.jws.WebService; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.spring.Main; +import org.apache.camel.test.junit4.TestSupport; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.jaxrs.client.JAXRSClientFactory; +import org.apache.cxf.jaxrs.ext.multipart.Multipart; +import org.junit.Test; + +public class CxfRsAsyncRelayTest extends TestSupport { + private static int port6 = CXFTestSupport.getPort6(); + /** + * A sample service "interface" (technically, it is a class since we will + * use proxy-client. That interface exposes three methods over-loading each + * other : we are testing the appropriate one will be chosen at runtime. + * + */ + @WebService + @Path("/rootpath") + @Consumes("multipart/form-data") + @Produces("application/xml") + public static class UploadService { + @WebMethod + @POST + @Path("/path1") + @Consumes("multipart/form-data") + public void upload(@Multipart(value = "content", type = "application/octet-stream") java.lang.Number content, + @Multipart(value = "name", type = "text/plain") String name) { + } + + @WebMethod + @GET + @Path("/path2") + @Consumes("text/plain") + private void upload() { + } + + @WebMethod + @POST + @Path("/path3") + @Consumes("multipart/form-data") + public void upload(@Multipart(value = "content", type = "application/octet-stream") InputStream content, + @Multipart(value = "name", type = "text/plain") String name) { + } + + } + + private static final String SAMPLE_CONTENT_PATH = "/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml"; + private static final String SAMPLE_NAME = "CxfRsSpringAsyncRelay.xml"; + private static final CountDownLatch LATCH = new CountDownLatch(1); + private static String content; + private static String name; + + /** + * That test builds a route chaining two cxfrs endpoints. It shows a request + * sent to the first one will be correctly transferred and consumed by the + * other one. + */ + @Test + public void testJaxrsAsyncRelayRoute() throws Exception { + final Main main = new Main(); + try { + main.setApplicationContextUri("org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml"); + main.start(); + Thread t = new Thread(new Runnable() { + /** + * Sends a request to the first endpoint in the route + */ + public void run() { + try { + JAXRSClientFactory.create("http://localhost:" + port6 + "/CxfRsAsyncRelayTest/rest", UploadService.class) + .upload(CamelRouteBuilder.class.getResourceAsStream(SAMPLE_CONTENT_PATH), + SAMPLE_NAME); + } catch (Exception e) { + log.warn("Error uploading to http://localhost:" + port6 + "/CxfRsAsyncRelayTest/rest", e); + } + } + }); + t.start(); + LATCH.await(10, TimeUnit.SECONDS); + assertEquals(SAMPLE_NAME, name); + StringWriter writer = new StringWriter(); + IOUtils.copyAndCloseInput(new InputStreamReader(CamelRouteBuilder.class + .getResourceAsStream(SAMPLE_CONTENT_PATH)), writer); + assertEquals(writer.toString(), content); + } finally { + main.stop(); + } + } + + /** + * Route builder to be used with + * org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncRelay.xml + * + */ + public static class CamelRouteBuilder extends RouteBuilder { + @Override + public void configure() throws InterruptedException { + from("upload1").removeHeader(Exchange.CONTENT_TYPE).to("upload2Client"); + from("upload2").process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // once the message arrives in the second endpoint, stores + // the message components and warns results can be compared + content = exchange.getIn().getHeader("content", String.class); + name = exchange.getIn().getHeader("name", String.class); + LATCH.countDown(); + } + }); + + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java index 48054e3..1ce871b 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsBindingConfigurationSelectionTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.cxf.jaxrs; import java.lang.reflect.Method; import java.util.Map; +import javax.ws.rs.client.Entity; import javax.ws.rs.core.MultivaluedMap; import org.apache.camel.Exchange; @@ -109,6 +110,11 @@ public class CxfRsBindingConfigurationSelectionTest extends CamelTestSupport { } @Override + public Entity bindCamelMessageToRequestEntity(Object body, Message camelMessage, Exchange camelExchange) { + return null; + } + + @Override public Object bindCamelMessageBodyToRequestBody(Message camelMessage, Exchange camelExchange) throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java index 039859c..4c10298 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerTest.java @@ -173,7 +173,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testSuppressGetCustomerExceptionWithCxfRsEndpoint() { Exchange exchange - = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true&throwExceptionOnFailure=false", new Processor() { + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true&throwExceptionOnFailure=false&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message message = exchange.getIn(); @@ -200,7 +200,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testGetCustomerExceptionWithCxfRsEndpoint() { Exchange exchange - = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", new Processor() { + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message message = exchange.getIn(); @@ -280,7 +280,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testAddCustomerUniqueResponseCodeWithHttpClientAPI() { Exchange exchange - = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true", new Processor() { + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); @@ -340,7 +340,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testAddCustomerUniqueResponseCode() { Exchange exchange - = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true", new Processor() { + = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "?httpClientAPI=true&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); @@ -368,7 +368,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testProducerWithQueryParameters() { - Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13", new Processor() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); @@ -388,7 +388,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testProducerWithQueryParametersHeader() { - Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13", new Processor() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort2() + "/" + getClass().getSimpleName() + "/testQuery?httpClientAPI=true&q1=12&q2=13&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message inMessage = exchange.getIn(); @@ -418,7 +418,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testRestServerDirectlyGetCustomer() { // we cannot convert directly to Customer as we need camel-jaxb - String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123", + String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123?synchronous=true", null, Exchange.HTTP_METHOD, "GET", String.class); assertNotNull("The response should not be null ", response); @@ -430,7 +430,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { input.setName("Donald Duck"); // we cannot convert directly to Customer as we need camel-jaxb - String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers", + String response = template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers?synchronous=true", input, Exchange.HTTP_METHOD, "POST", String.class); assertNotNull(response); @@ -461,7 +461,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { public void testProducerWithFeature() { TestFeature feature = context().getRegistry().lookupByNameAndType("testFeature", TestFeature.class); - template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123?features=#myFeatures", + template.requestBodyAndHeader("cxfrs:http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/customerservice/customers/123?features=#myFeatures&synchronous=true", null, Exchange.HTTP_METHOD, "GET", String.class); assertTrue("The feature should be initialized", feature.initialized); @@ -469,7 +469,7 @@ public class CxfRsProducerTest extends CamelSpringTestSupport { @Test public void testProducer422Response() { - Exchange exchange = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true", new Processor() { + Exchange exchange = template.send("cxfrs://http://localhost:" + getPort1() + "/" + getClass().getSimpleName() + "/?httpClientAPI=true&synchronous=true", new Processor() { public void process(Exchange exchange) throws Exception { exchange.setPattern(ExchangePattern.InOut); Message message = exchange.getIn(); http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java new file mode 100644 index 0000000..042e0df --- /dev/null +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsSslAsyncProducerTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cxf.jaxrs; + + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.cxf.CXFTestSupport; +import org.apache.camel.component.cxf.common.message.CxfConstants; +import org.apache.camel.component.cxf.jaxrs.testbean.Customer; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import static org.hamcrest.core.Is.is; + +public class CxfRsSslAsyncProducerTest extends CamelSpringTestSupport { + private static int port1 = CXFTestSupport.getSslPort(); + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + public int getPort1() { + return port1; + } + + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/cxf/jaxrs/CxfRsSpringSslAsyncProducer.xml"); + } + + @Test + public void testCorrectTrustStore() { + Exchange exchange = template.send("direct://trust", new MyProcessor()); + + // get the response message + Customer response = (Customer) exchange.getOut().getBody(); + + assertNotNull("The response should not be null ", response); + assertEquals("Get a wrong customer id ", String.valueOf(response.getId()), "123"); + assertEquals("Get a wrong customer name", response.getName(), "John"); + assertEquals("Get a wrong response code", 200, exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE)); + assertEquals("Get a wrong header value", "value", exchange.getOut().getHeader("key")); + } + + @Test + public void testNoTrustStore() { + Exchange exchange = template.send("direct://noTrust", new MyProcessor()); + assertThat(exchange.isFailed(), is(true)); + Exception e = exchange.getException(); + assertThat(e.getCause().getClass().getCanonicalName(), is("javax.net.ssl.SSLHandshakeException")); + } + + @Test + public void testWrongTrustStore() { + Exchange exchange = template.send("direct://wrongTrust", new MyProcessor()); + assertThat(exchange.isFailed(), is(true)); + Exception e = exchange.getException(); + assertThat(e.getCause().getClass().getCanonicalName(), is("javax.net.ssl.SSLHandshakeException")); + } + + private class MyProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + Message inMessage = exchange.getIn(); + // set the Http method + inMessage.setHeader(Exchange.HTTP_METHOD, "GET"); + // set the relative path + inMessage.setHeader(Exchange.HTTP_PATH, "/customerservice/customers/123"); + // Specify the response class , cxfrs will use InputStream as the response object type + inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_RESPONSE_CLASS, Customer.class); + // set a customer header + inMessage.setHeader("key", "value"); + // since we use the Get method, so we don't need to set the message body + inMessage.setBody(null); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml new file mode 100644 index 0000000..83b97ae --- /dev/null +++ b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducer.xml @@ -0,0 +1,71 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/camel/blob/208e21c5/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml new file mode 100644 index 0000000..9f27ffe --- /dev/null +++ b/components/camel-cxf/src/test/resources/org/apache/camel/component/cxf/jaxrs/CxfRsSpringAsyncProducerSession.xml @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +