camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/4] camel git commit: CAMEL-11333: Create a new camel-thrift RPC component
Date Fri, 23 Jun 2017 13:42:18 GMT
CAMEL-11333: Create a new camel-thrift RPC component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2a0a9e66
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2a0a9e66
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2a0a9e66

Branch: refs/heads/master
Commit: 2a0a9e6656bac7a82e6594a5a12f889834d6f2b6
Parents: d5e35c9
Author: Dmitry Volodin <dmvolod@gmail.com>
Authored: Thu Jun 22 18:30:55 2017 +0300
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Jun 23 15:41:15 2017 +0200

----------------------------------------------------------------------
 components/camel-thrift/ReadMe.md               |    1 +
 components/camel-thrift/pom.xml                 |   22 +-
 .../src/main/docs/thrift-component.adoc         |  145 +
 .../camel/component/thrift/ThriftComponent.java |   49 +
 .../component/thrift/ThriftConfiguration.java   |  128 +
 .../camel/component/thrift/ThriftConstants.java |   39 +
 .../camel/component/thrift/ThriftConsumer.java  |  143 +
 .../camel/component/thrift/ThriftEndpoint.java  |   72 +
 .../camel/component/thrift/ThriftProducer.java  |  146 +
 .../camel/component/thrift/ThriftUtils.java     |  261 +
 .../client/AsyncClientMethodCallback.java       |   49 +
 .../component/thrift/server/Invocation.java     |   34 +
 .../thrift/server/ThriftHsHaServer.java         |  130 +
 .../thrift/server/ThriftMethodHandler.java      |  108 +
 .../services/org/apache/camel/component/thrift  |   18 +
 .../thrift/ThriftConsumerAsyncTest.java         |  253 +
 .../thrift/ThriftConsumerConcurrentTest.java    |  174 +
 .../thrift/ThriftConsumerSyncTest.java          |  118 +
 .../thrift/ThriftProducerAsyncTest.java         |  288 +
 .../thrift/ThriftProducerBaseTest.java          |  204 +
 .../thrift/ThriftProducerSyncTest.java          |  173 +
 .../component/thrift/generated/Calculator.java  | 6221 ++++++++++++++++++
 .../thrift/generated/InvalidOperation.java      |  475 ++
 .../component/thrift/generated/Operation.java   |   55 +
 .../camel/component/thrift/generated/Work.java  |  702 ++
 .../dataformat/thrift/generated/Operation.java  |    2 +-
 .../camel/dataformat/thrift/generated/Work.java |  163 +-
 .../camel-thrift/src/test/thrift/readme.txt     |    3 +-
 .../src/test/thrift/tutorial-component.thrift   |  130 +
 parent/pom.xml                                  |    5 +-
 .../features/src/main/resources/features.xml    |    3 +-
 .../ThriftComponentAutoConfiguration.java       |  128 +
 .../ThriftComponentConfiguration.java           |   50 +
 .../main/resources/META-INF/spring.factories    |    4 +-
 .../camel/itest/karaf/CamelThriftTest.java      |    1 +
 .../camel/itest/springboot/CamelThriftTest.java |    1 +
 36 files changed, 10392 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/ReadMe.md
----------------------------------------------------------------------
diff --git a/components/camel-thrift/ReadMe.md b/components/camel-thrift/ReadMe.md
index 0524974..668e452 100644
--- a/components/camel-thrift/ReadMe.md
+++ b/components/camel-thrift/ReadMe.md
@@ -25,6 +25,7 @@ The sample test source is an example taken from the Thrift Java tutorial at: htt
     cd components/camel-thrift
     cd src/test/thrift
     thrift -r --gen java -out ../java/ ./tutorial-dataformat.thrift
+    thrift -r --gen java -out ../java/ ./tutorial-component.thrift
 
 The generate source code will override the existing.
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-thrift/pom.xml b/components/camel-thrift/pom.xml
index 9e4bc91..9a34d55 100644
--- a/components/camel-thrift/pom.xml
+++ b/components/camel-thrift/pom.xml
@@ -34,8 +34,14 @@
   <description>Camel Apache Thrift data format and RPC support</description>
 
   <properties>
-    <camel.osgi.export.pkg>org.apache.camel.dataformat.thrift.*</camel.osgi.export.pkg>
-    <camel.osgi.export.service>org.apache.camel.spi.DataFormatResolver;dataformat=thrift</camel.osgi.export.service>
+    <camel.osgi.export.pkg>
+      org.apache.camel.dataformat.thrift.*;${camel.osgi.version},
+      org.apache.camel.component.thrift.*;${camel.osgi.version}
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>
+      org.apache.camel.spi.DataFormatResolver;dataformat=thrift,
+      org.apache.camel.spi.ComponentResolver;component=thrift
+    </camel.osgi.export.service>
   </properties>
 
   <dependencies>
@@ -51,6 +57,12 @@
     </dependency>
     
     <dependency>
+      <groupId>org.javassist</groupId>
+      <artifactId>javassist</artifactId>
+      <version>${javassist-version}</version>
+    </dependency>
+    
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
@@ -87,6 +99,12 @@
       <version>${gson-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.googlecode.junit-toolbox</groupId>
+      <artifactId>junit-toolbox</artifactId>
+      <version>${junit-toolbox-version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/docs/thrift-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/docs/thrift-component.adoc b/components/camel-thrift/src/main/docs/thrift-component.adoc
new file mode 100644
index 0000000..cad6a59
--- /dev/null
+++ b/components/camel-thrift/src/main/docs/thrift-component.adoc
@@ -0,0 +1,145 @@
+## Thrift Component
+
+*Available as of Camel version 2.20*
+
+The Thrift component allows you to call or expose Remote Procedure Call (RPC) services
+using https://thrift.apache.org/[Apache Thrift] binary communication protocol and serialization mechanism.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-thrift</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+------------------------------------------------------------
+
+### URI format
+
+[source,java]
+-------------------------------------
+thrift://service[?options]
+-------------------------------------
+
+### Endpoint Options
+
+// component options: START
+The Thrift component has no options.
+// component options: END
+
+// endpoint options: START
+The Thrift endpoint is configured using URI syntax:
+
+    thrift:host:port/service
+
+with the following path and query parameters:
+
+#### Path Parameters (3 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **host** | The Thrift server host name. This is localhost or 0.0.0.0 (if not defined) when being a consumer or remote server hostname when using producer. |  | String
+| **port** | *Required* The Thrift server port |  | int
+| **service** | *Required* Fully qualified service name from the protocol buffer descriptor file (package dot service definition name) |  | String
+|=======================================================================
+
+#### Query Parameters (7 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
+| **maxPoolSize** (consumer) | The Thrift server consumer max thread pool size | 10 | int
+| **poolSize** (consumer) | The Thrift server consumer initial thread pool size | 1 | int
+| **exceptionHandler** (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
+| **method** (producer) | The Thrift invoked method name |  | String
+| **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|=======================================================================
+// endpoint options: END
+
+### Thrift method parameters mapping
+
+Parameters in the called procedure must be passed as a list of objects inside the message body. The primitives are converted from the objects on the fly.
+In order to correctly find the corresponding method, all types must be transmitted regardless of the values. Please see an exmaple below, how to pass
+different parameters to the method with the Camel body
+[source,java]
+-------------------------------------------------------------------------------
+List requestBody = new ArrayList();
+
+requestBody.add((boolean)true);
+requestBody.add((byte)THRIFT_TEST_NUM1);
+requestBody.add((short)THRIFT_TEST_NUM1);
+requestBody.add((int)THRIFT_TEST_NUM1);
+requestBody.add((long)THRIFT_TEST_NUM1);
+requestBody.add((double)THRIFT_TEST_NUM1);
+requestBody.add("empty"); // String parameter
+requestBody.add(ByteBuffer.allocate(10)); // binary parameter
+requestBody.add(new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY)); // Struct parameter
+requestBody.add(new ArrayList<Integer>()); // list paramater 
+requestBody.add(new HashSet<String>()); // set parameter
+requestBody.add(new HashMap<String, Long>()); // map parameter 
+
+Object responseBody = template.requestBody("direct:thrift-alltypes", requestBody);
+-------------------------------------------------------------------------------
+
+Incoming parameters in the service consumer will also be passed to the message body as a list of objects.
+
+
+### Thrift consumer headers (will be installed after the consumer invocation)
+
+[width="100%",cols="25%,50,25%",options="header",]
+|=======================================================================
+|Header name |Description|Possible values
+
+|*CamelThriftMethodName*|Method name handled by the consumer service|
+
+|=======================================================================
+
+### Examples
+
+Below is a simple synchronous method invoke with host and port parameters
+
+[source,java]
+-------------------------------------------------------------------------------
+from("direct:thrift-calculate")
+.to("thrift://localhost:1101/org.apache.camel.component.thrift.generated.Calculator?method=calculate&synchronous=true");
+-------------------------------------------------------------------------------
+
+Below is a simple synchronous method invoke for the XML DSL configuration
+
+[source,xml]
+---------------------------------------------------------------------------------------
+<route>
+    <from uri="direct:thrift-add" />
+    <to uri="thrift://localhost:1101/org.apache.camel.component.thrift.generated.Calculator?method=add&synchronous=true"/>
+</route>
+---------------------------------------------------------------------------------------
+
+Thrift service consumer with asynchronous communication
+
+[source,java]
+-------------------------------------------------------------------------------
+from("thrift://localhost:1101/org.apache.camel.component.thrift.generated.Calculator")
+.to("direct:thrift-service");
+-------------------------------------------------------------------------------
+
+It's possible to automate Java code generation for .thrift files using *thrift-maven-plugin*, but before start the thrift compiler binary distribution for your operating system must be present on the running host.
+
+### For more information, see these resources
+
+https://github.com/apache/thrift/[Thrift project GitHub]
+https://thrift.apache.org/tutorial/java [Apache Thrift Java tutorial]
+
+### See Also
+
+* link:getting-started.html[Getting Started]
+* link:configuring-camel.html[Configuring Camel]
+* link:component.html[Component]
+* link:endpoint.html[Endpoint]
+

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftComponent.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftComponent.java
new file mode 100644
index 0000000..f394cff
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftComponent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * Represents the component that manages {@link ThriftEndpoint}.
+ */
+public class ThriftComponent extends DefaultComponent {
+
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ThriftConfiguration config = new ThriftConfiguration();
+
+        config = parseConfiguration(config, uri, parameters);
+        setProperties(config, parameters);
+
+        Endpoint endpoint = new ThriftEndpoint(uri, this, config);
+        return endpoint;
+    }
+
+    /**
+     * Parses the configuration
+     * 
+     * @return the parsed and valid configuration to use
+     */
+    protected ThriftConfiguration parseConfiguration(ThriftConfiguration configuration, String remaining, Map<String, Object> parameters) throws Exception {
+        configuration.parseURI(new URI(remaining), parameters, this);
+        return configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
new file mode 100644
index 0000000..2fcabf9
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConfiguration.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+@UriParams
+public class ThriftConfiguration {
+
+    @UriPath
+    private String host;
+
+    @UriPath
+    @Metadata(required = "true")
+    private int port;
+
+    @UriPath
+    @Metadata(required = "true")
+    private String service;
+
+    @UriParam(label = "producer")
+    private String method;
+
+    @UriParam(label = "consumer", defaultValue = "" + ThriftConstants.THRIFT_CONSUMER_POOL_SIZE)
+    private int poolSize = ThriftConstants.THRIFT_CONSUMER_POOL_SIZE;
+
+    @UriParam(label = "consumer", defaultValue = "" + ThriftConstants.THRIFT_CONSUMER_MAX_POOL_SIZE)
+    private int maxPoolSize = ThriftConstants.THRIFT_CONSUMER_MAX_POOL_SIZE;
+
+    /**
+     * Fully qualified service name from the protocol buffer descriptor file
+     * (package dot service definition name)
+     */
+    public String getService() {
+        return service;
+    }
+
+    public void setService(String service) {
+        this.service = service;
+    }
+
+    /**
+     * The Thrift invoked method name
+     */
+    public String getMethod() {
+        return method;
+    }
+
+    public void setMethod(String method) {
+        this.method = method;
+    }
+
+    /**
+     * The Thrift server host name. This is localhost or 0.0.0.0 (if not
+     * defined) when being a consumer or remote server hostname when using
+     * producer.
+     */
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    /**
+     * The Thrift server port
+     */
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * The Thrift server consumer initial thread pool size
+     */
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+    }
+
+    /**
+     * The Thrift server consumer max thread pool size
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }
+    
+    public void parseURI(URI uri, Map<String, Object> parameters, ThriftComponent component) {
+        setHost(uri.getHost());
+        
+        if (uri.getPort() != -1) {
+            setPort(uri.getPort());
+        }
+        
+        setService(uri.getPath().substring(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
new file mode 100644
index 0000000..3656c75
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConstants.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+/**
+ * Thrift component constants
+ */
+public interface ThriftConstants {
+
+    String THRIFT_SYNC_CLIENT_CLASS_NAME = "Client";
+    String THRIFT_ASYNC_CLIENT_CLASS_NAME = "AsyncClient";
+    String THRIFT_ASYNC_CLIENT_FACTORY_NAME = "Factory";
+    String THRIFT_ASYNC_CLIENT_GETTER_NAME = "getAsyncClient";
+    String THRIFT_SERVER_SYNC_INTERFACE_NAME = "Iface";
+    String THRIFT_SERVER_SYNC_PROCESSOR_CLASS = "Processor";
+    String THRIFT_SERVER_ASYNC_INTERFACE_NAME = "AsyncIface";
+    String THRIFT_SERVER_ASYNC_PROCESSOR_CLASS = "AsyncProcessor";
+    
+    int THRIFT_CONSUMER_POOL_SIZE = 1;
+    int THRIFT_CONSUMER_MAX_POOL_SIZE = 10;
+    /*
+     * This headers will be set after Thrift consumer method is invoked
+     */
+    String THRIFT_METHOD_NAME_HEADER = "CamelThriftMethodName";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
new file mode 100644
index 0000000..efa916e
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftConsumer.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.Proxy;
+import javassist.util.proxy.ProxyFactory;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.thrift.server.ThriftHsHaServer;
+import org.apache.camel.component.thrift.server.ThriftHsHaServer.Args;
+import org.apache.camel.component.thrift.server.ThriftMethodHandler;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents Thrift server consumer implementation
+ */
+public class ThriftConsumer extends DefaultConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumer.class);
+
+    private TNonblockingServerSocket serverTransport;
+    private TServer server;
+    private final ThriftConfiguration configuration;
+    private final ThriftEndpoint endpoint;
+
+    public ThriftConsumer(ThriftEndpoint endpoint, Processor processor, ThriftConfiguration configuration) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+    }
+
+    public ThriftConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (server == null) {
+            LOG.debug("Starting the Thrift server");
+            initializeServer();
+            server.serve();
+            LOG.info("Thrift server started and listening on port: {}", serverTransport.getPort());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (server != null) {
+            LOG.debug("Terminating Thrift server");
+            server.stop();
+            serverTransport.close();
+            serverTransport = null;
+            server = null;
+        }
+        super.doStop();
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected void initializeServer() throws TTransportException {
+        Class serverImplementationClass;
+        Object serverImplementationInstance;
+        Object serverProcessor;
+        ProxyFactory serviceProxy = new ProxyFactory();
+        MethodHandler methodHandler = new ThriftMethodHandler(endpoint, this);
+
+        try {
+            Class serverInterface = ThriftUtils.getServerInterface(endpoint.getServicePackage(), endpoint.getServiceName(), endpoint.isSynchronous(), endpoint.getCamelContext());
+            serviceProxy.setInterfaces(new Class[] {serverInterface});
+            serverImplementationClass = serviceProxy.createClass();
+            serverImplementationInstance = (Object)serverImplementationClass.getConstructor().newInstance();
+            ((Proxy)serverImplementationInstance).setHandler(methodHandler);
+
+            serverProcessor = ThriftUtils.constructServerProcessor(endpoint.getServicePackage(), endpoint.getServiceName(), serverImplementationInstance, endpoint.isSynchronous(),
+                                                                   endpoint.getCamelContext());
+        } catch (IllegalArgumentException | InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
+            throw new IllegalArgumentException("Unable to create server implementation proxy service for " + configuration.getService());
+        }
+
+        if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.debug("Building Thrift server on {}:{}", configuration.getHost(), configuration.getPort());
+            serverTransport = new TNonblockingServerSocket(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        } else if (ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.debug("Building Thrift server on <any address>:{}", configuration.getPort());
+            serverTransport = new TNonblockingServerSocket(configuration.getPort());
+        } else {
+            throw new IllegalArgumentException("No server start properties (host, port) specified");
+        }
+
+        Args args = new Args(serverTransport);
+        args.processor((TProcessor)serverProcessor);
+        args.executorService(getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(this, getEndpoint().getEndpointUri(), configuration.getPoolSize(),
+                                                                                                       configuration.getMaxPoolSize()));
+        args.startThreadPool(getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "start-" + getEndpoint().getEndpointUri()));
+        args.context(endpoint.getCamelContext());
+        server = new ThriftHsHaServer(args);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        return doSend(exchange, callback);
+    }
+
+    private boolean doSend(Exchange exchange, AsyncCallback callback) {
+        if (isRunAllowed()) {
+            getAsyncProcessor().process(exchange, doneSync -> {
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+                }
+                callback.done(doneSync);
+            });
+            return false;
+        } else {
+            LOG.warn("Consumer not ready to process exchanges. The exchange {} will be discarded", exchange);
+            callback.done(true);
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
new file mode 100644
index 0000000..ffd79c3
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftEndpoint.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+
+/**
+ * The Thrift component allows to call and expose remote procedures (RPC) with
+ * Apache Thrift data format and serialization mechanism
+ */
+@UriEndpoint(firstVersion = "2.20.0", scheme = "thrift", title = "Thrift", syntax = "thrift:host:port/service", label = "rpc")
+public class ThriftEndpoint extends DefaultEndpoint {
+    @UriParam
+    private ThriftConfiguration configuration;
+    
+    private String serviceName;
+    private String servicePackage;
+
+    public ThriftEndpoint(String uri, ThriftComponent component, ThriftConfiguration config) throws Exception {
+        super(uri, component);
+        this.configuration = config;
+        
+        // Extract service and package names from the full service name
+        serviceName = ThriftUtils.extractServiceName(configuration.getService());
+        servicePackage = ThriftUtils.extractServicePackage(configuration.getService());
+    }
+
+    public Producer createProducer() throws Exception {
+        ThriftProducer producer = new ThriftProducer(this, configuration);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(producer);
+        } else {
+            return producer;
+        }
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new ThriftConsumer(this, processor, configuration);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+    
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getServicePackage() {
+        return servicePackage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
new file mode 100644
index 0000000..b36ef0c
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftProducer.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.thrift.client.AsyncClientMethodCallback;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents asynchronous and synchronous Thrift producer implementations
+ */
+public class ThriftProducer extends DefaultProducer implements AsyncProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftProducer.class);
+
+    protected final ThriftConfiguration configuration;
+    protected final ThriftEndpoint endpoint;
+    private TProtocol protocol;
+    private TTransport syncTransport;
+    private TNonblockingTransport asyncTransport;
+    private Object thriftClient;
+
+    public ThriftProducer(ThriftEndpoint endpoint, ThriftConfiguration configuration) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Message message = exchange.getIn();
+
+        try {
+            ThriftUtils.invokeAsyncMethod(thriftClient, configuration.getMethod(), message.getBody(), new AsyncClientMethodCallback(exchange, callback));
+        } catch (Exception e) {
+            if (e.getCause() instanceof TException) {
+                exchange.setException(e.getCause());
+            } else {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Message message = exchange.getIn();
+        try {
+            Object outBody = ThriftUtils.invokeSyncMethod(thriftClient, configuration.getMethod(), message.getBody());
+            exchange.getOut().setBody(outBody);
+        } catch (Exception e) {
+            if (e.getCause() instanceof TException) {
+                exchange.setException(e.getCause());
+            } else {
+                throw new Exception(e);
+            }
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (endpoint.isSynchronous()) {
+            if (syncTransport == null) {
+                initializeSyncTransport();
+                LOG.info("Getting synchronous client implementation");
+                thriftClient = ThriftUtils.constructClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(), protocol, endpoint.getCamelContext());
+            }
+        } else {
+            if (asyncTransport == null) {
+                initializeAsyncTransport();
+                LOG.info("Getting asynchronous client implementation");
+                thriftClient = ThriftUtils.constructAsyncClientInstance(endpoint.getServicePackage(), endpoint.getServiceName(), asyncTransport, endpoint.getCamelContext());
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (syncTransport != null) {
+            LOG.debug("Terminating synchronous transport the remote Thrift server");
+            syncTransport.close();
+            syncTransport = null;
+            protocol = null;
+        } else if (asyncTransport != null) {
+            LOG.debug("Terminating asynchronous transport the remote Thrift server");
+            asyncTransport.close();
+            asyncTransport = null;
+        }
+        super.doStop();
+    }
+    
+    protected void initializeSyncTransport() throws TTransportException {
+        if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.info("Creating transport to the remote Thrift server {}:{}", configuration.getHost(), configuration.getPort());
+            syncTransport = new TSocket(configuration.getHost(), configuration.getPort());
+        } else {
+            throw new IllegalArgumentException("No connection properties (host, port) specified");
+        }
+        syncTransport.open();
+        protocol = new TBinaryProtocol(new TFramedTransport(syncTransport));
+    }
+    
+    protected void initializeAsyncTransport() throws IOException, TTransportException {
+        if (!ObjectHelper.isEmpty(configuration.getHost()) && !ObjectHelper.isEmpty(configuration.getPort())) {
+            LOG.info("Creating transport to the remote Thrift server {}:{}", configuration.getHost(), configuration.getPort());
+            asyncTransport = new TNonblockingSocket(configuration.getHost(), configuration.getPort());
+        } else {
+            throw new IllegalArgumentException("No connection properties (host, port) specified");
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
new file mode 100644
index 0000000..8d66179
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/ThriftUtils.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.thrift.client.AsyncClientMethodCallback;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReflectionHelper;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TNonblockingTransport;
+
+/**
+ * ThriftUtils helpers are working with dynamic methods via Camel and Java
+ * reflection utilities
+ */
+public final class ThriftUtils {
+
+    private ThriftUtils() {
+    }
+
+    public static String extractServiceName(String service) {
+        return service.substring(service.lastIndexOf(".") + 1);
+    }
+
+    public static String extractServicePackage(String service) {
+        return service.substring(0, service.lastIndexOf("."));
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static Object constructClientInstance(String packageName, String serviceName, TProtocol protocol, final CamelContext context) {
+        Object clientInstance = null;
+        Class[] constructorParamTypes = {TProtocol.class};
+        Object[] constructorParamValues = {protocol};
+
+        String clientClassName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SYNC_CLIENT_CLASS_NAME;
+        try {
+            Class clientClass = context.getClassResolver().resolveMandatoryClass(clientClassName);
+            Constructor clientConstructor = clientClass.getConstructor(constructorParamTypes);
+            clientInstance = clientConstructor.newInstance(constructorParamValues);
+
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Thrift client class not found: " + clientClassName);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalArgumentException("Thrift client class constructor not found: " + clientClassName);
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException | SecurityException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return clientInstance;
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static Object constructAsyncClientInstance(String packageName, String serviceName, TNonblockingTransport transport, final CamelContext context) {
+        Object asynClientInstance = null;
+        Class[] getterParamTypes = {TNonblockingTransport.class};
+        Class[] constructorParamTypes = {TAsyncClientManager.class, TProtocolFactory.class};
+
+        String clientClassName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_ASYNC_CLIENT_CLASS_NAME + "$" + ThriftConstants.THRIFT_ASYNC_CLIENT_FACTORY_NAME;
+        try {
+            Class clientClass = context.getClassResolver().resolveMandatoryClass(clientClassName);
+            Constructor factoryConstructor = clientClass.getConstructor(constructorParamTypes);
+            Object factoryInstance = factoryConstructor.newInstance(new TAsyncClientManager(), new TBinaryProtocol.Factory());
+            Method asyncClientGetter = ReflectionHelper.findMethod(clientClass, ThriftConstants.THRIFT_ASYNC_CLIENT_GETTER_NAME, getterParamTypes);
+            if (asyncClientGetter == null) {
+                throw new IllegalArgumentException("Thrift async client getter not found: " + clientClassName + "." + ThriftConstants.THRIFT_ASYNC_CLIENT_GETTER_NAME);
+            }
+            asynClientInstance = ObjectHelper.invokeMethod(asyncClientGetter, factoryInstance, (TNonblockingTransport)transport);
+
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Thrift sync client class not found: " + clientClassName);
+        } catch (NoSuchMethodException e) {
+            throw new IllegalArgumentException("Thrift sync client factory class not found: " + clientClassName);
+        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | IOException | SecurityException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return asynClientInstance;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public static Object invokeSyncMethod(Object syncClient, String invokeMethod, Object request) {
+        Object[] params = convertObjects2Primitives(request, null);
+        Class[] paramsTypes = (Class[])params[0];
+        Object[] paramsValues = (Object[])params[1];
+
+        Method method = ReflectionHelper.findMethod(syncClient.getClass(), invokeMethod, paramsTypes);
+        if (method == null) {
+            throw new IllegalArgumentException("Thrift service client method not found: " + syncClient.getClass().getName() + "." + invokeMethod + printParamsTypes(paramsTypes));
+        }
+        Object result = ObjectHelper.invokeMethod(method, syncClient, paramsValues);
+        return result;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public static void invokeAsyncMethod(Object asyncClient, String invokeMethod, Object request, AsyncClientMethodCallback methodCallback) {
+        Object[] params = convertObjects2Primitives(request, methodCallback);
+        Class[] paramsTypes = (Class[])params[0];
+        Object[] paramsValues = (Object[])params[1];
+
+        Method method = ReflectionHelper.findMethod(asyncClient.getClass(), invokeMethod, paramsTypes);
+        if (method == null) {
+            throw new IllegalArgumentException("Thrift service client method not found: " + asyncClient.getClass().getName() + "." + invokeMethod + printParamsTypes(paramsTypes));
+        }
+        ObjectHelper.invokeMethod(method, asyncClient, paramsValues);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static Class getServerInterface(String packageName, String serviceName, boolean isSyncInterface, final CamelContext context) {
+        String serverInterfaceName = null;
+        Class serverInterface = null;
+
+        try {
+            if (isSyncInterface) {
+                serverInterfaceName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SERVER_SYNC_INTERFACE_NAME;
+            } else {
+                serverInterfaceName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SERVER_ASYNC_INTERFACE_NAME;
+            }
+            serverInterface = context.getClassResolver().resolveMandatoryClass(serverInterfaceName);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Unable to find server interface implementation for: " + serverInterfaceName);
+        }
+        return serverInterface;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static Object constructServerProcessor(String packageName, String serviceName, Object serverImplementation, boolean isSyncProcessor, final CamelContext context) {
+        String processorClassName = null;
+        Class serverInterface = null;
+        Object processorInstance = null;
+
+        try {
+            if (isSyncProcessor) {
+                processorClassName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SERVER_SYNC_PROCESSOR_CLASS;
+                serverInterface = getServerInterface(packageName, serviceName, isSyncProcessor, context);
+            } else {
+                processorClassName = packageName + "." + serviceName + "$" + ThriftConstants.THRIFT_SERVER_ASYNC_PROCESSOR_CLASS;
+                serverInterface = getServerInterface(packageName, serviceName, isSyncProcessor, context);
+            }
+            Class processorClass = context.getClassResolver().resolveMandatoryClass(processorClassName);
+            Constructor procesorConstructor = processorClass.getConstructor(new Class[] {serverInterface});
+            processorInstance = procesorConstructor.newInstance(serverImplementation);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Unable to find server processor for: " + processorClassName);
+        } catch (NoSuchMethodException | SecurityException e) {
+            throw new IllegalArgumentException("Processor class instance not found for: " + processorClassName);
+        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return processorInstance;
+    }
+
+    /*
+     * This function find onComplete method inside interface implementation and
+     * get fist parameter (but not Object.class) as return type
+     */
+    @SuppressWarnings("rawtypes")
+    public static Class findMethodReturnType(Class clazz, String name) {
+        for (Method method : clazz.getMethods()) {
+            if (name.equals(method.getName()) && !method.getParameterTypes()[0].equals(Object.class)) {
+                return method.getParameterTypes()[0];
+            }
+        }
+        return null;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static String printParamsTypes(Class[] paramsTypes) {
+        StringJoiner joiner = new StringJoiner(",");
+        for (Class paramType : paramsTypes) {
+            joiner.add(paramType == null ? "null" : paramType.getName());
+        }
+        return "(" + joiner.toString() + ")";
+    }
+
+    /*
+     * This function transforms objects types stored as list or simple object
+     * inside Body to the primitives objects to find appropriate method
+     */
+    @SuppressWarnings({"rawtypes"})
+    private static Object[] convertObjects2Primitives(Object request, AsyncClientMethodCallback methodCallback) {
+        Class[] paramsTypes = null;
+        Object[] paramsValues = null;
+        int paramListSize = 1;
+        if (request instanceof List) {
+            List paramList = (List)request;
+            paramListSize = paramList.size() + (methodCallback == null ? 0 : 1);
+            paramsTypes = new Class[paramListSize];
+            paramsValues = new Object[paramListSize];
+            int idx = 0;
+
+            for (Object param : paramList) {
+                if (param instanceof Short) {
+                    paramsTypes[idx] = short.class;
+                } else if (param instanceof Long) {
+                    paramsTypes[idx] = long.class;
+                } else if (param instanceof Integer) {
+                    paramsTypes[idx] = int.class;
+                } else if (param instanceof Double) {
+                    paramsTypes[idx] = double.class;
+                } else if (param instanceof Byte) {
+                    paramsTypes[idx] = byte.class;
+                } else if (param instanceof Boolean) {
+                    paramsTypes[idx] = boolean.class;
+                } else if (param instanceof List) {
+                    paramsTypes[idx] = List.class;
+                } else if (param instanceof Set) {
+                    paramsTypes[idx] = Set.class;
+                } else if (param instanceof Map) {
+                    paramsTypes[idx] = Map.class;
+                } else if (param instanceof ByteBuffer) {
+                    paramsTypes[idx] = ByteBuffer.class;
+                } else {
+                    paramsTypes[idx] = param.getClass();
+                }
+                paramsValues[idx] = param;
+                idx++;
+            }
+        } else if (request != null) {
+            paramListSize = methodCallback == null ? 1 : 2;
+            paramsTypes = new Class[paramListSize];
+            paramsValues = new Object[paramListSize];
+            paramsTypes[0] = request.getClass();
+            paramsValues[0] = request;
+        } else {
+            paramListSize = methodCallback == null ? 0 : 1;
+            paramsTypes = new Class[paramListSize];
+            paramsValues = new Object[paramListSize];
+        }
+        if (methodCallback != null) {
+            paramsTypes[paramListSize - 1] = AsyncMethodCallback.class;
+            paramsValues[paramListSize - 1] = methodCallback;
+        }
+        return new Object[] {paramsTypes, paramsValues};
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/client/AsyncClientMethodCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/client/AsyncClientMethodCallback.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/client/AsyncClientMethodCallback.java
new file mode 100644
index 0000000..7480ea1
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/client/AsyncClientMethodCallback.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift.client;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.thrift.async.AsyncMethodCallback;
+
+@SuppressWarnings("rawtypes")
+public class AsyncClientMethodCallback implements AsyncMethodCallback {
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+    
+    public AsyncClientMethodCallback(Exchange exchange, AsyncCallback callback) {
+        this.exchange = exchange;
+        this.callback = callback;
+    }
+
+    @Override
+    public void onComplete(Object response) {
+        exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+        if (response == null) {
+            exchange.getOut().setBody(response);
+        } else {
+            exchange.getOut().setBody(response, response.getClass());
+        }
+        callback.done(false);
+    }
+
+    @Override
+    public void onError(Exception exception) {
+        exchange.setException(exception);
+        callback.done(false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/Invocation.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/Invocation.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/Invocation.java
new file mode 100644
index 0000000..d61547b
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/Invocation.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift.server;
+
+import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer;
+
+/**
+ * Copy of the org.apache.thrift.server.Invocation
+ */
+class Invocation implements Runnable {
+    private final FrameBuffer frameBuffer;
+
+    public Invocation(final FrameBuffer frameBuffer) {
+        this.frameBuffer = frameBuffer;
+    }
+
+    public void run() {
+        frameBuffer.invoke();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
new file mode 100644
index 0000000..09d22a48
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftHsHaServer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift.server;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.CamelContext;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Thrift HsHaServer implementation with executors controlled by the Camel Executor Service Manager
+ */
+public class ThriftHsHaServer extends TNonblockingServer {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftHsHaServer.class);
+
+    public static class Args extends AbstractNonblockingServerArgs<Args> {
+        private ExecutorService executorService;
+        private ExecutorService startThreadPool;
+        private CamelContext context;
+
+        public Args(TNonblockingServerTransport transport) {
+            super(transport);
+        }
+
+        public ExecutorService getExecutorService() {
+            return executorService;
+        }
+
+        public Args executorService(ExecutorService executorService) {
+            this.executorService = executorService;
+            return this;
+        }
+
+        public ExecutorService getStartThreadPool() {
+            return startThreadPool;
+        }
+
+        public Args startThreadPool(ExecutorService startThreadPool) {
+            this.startThreadPool = startThreadPool;
+            return this;
+        }
+
+        public CamelContext getContext() {
+            return context;
+        }
+
+        public Args context(CamelContext context) {
+            this.context = context;
+            return this;
+        }
+    }
+
+    private final ExecutorService invoker;
+    private final CamelContext context;
+    private final ExecutorService startExecutor;
+
+    public ThriftHsHaServer(Args args) {
+        super(args);
+
+        this.context = args.context;
+        this.invoker = args.executorService;
+        this.startExecutor = args.startThreadPool;
+    }
+
+    @Override
+    public void serve() throws IllegalArgumentException {
+        if (!startThreads()) {
+            throw new IllegalArgumentException("Failed to start selector thread!");
+        }
+
+        if (!startListening()) {
+            throw new IllegalArgumentException("Failed to start listening on server socket!");
+        }
+
+        startExecutor.execute(() -> {
+            setServing(true);
+
+            waitForShutdown();
+
+            setServing(false);
+            stopListening();
+        });
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        context.getExecutorServiceManager().shutdownGraceful(startExecutor);
+    }
+
+    @Override
+    protected void waitForShutdown() {
+        joinSelector();
+        context.getExecutorServiceManager().shutdownGraceful(invoker);
+    }
+
+    @Override
+    protected boolean requestInvoke(FrameBuffer frameBuffer) {
+        try {
+            Runnable invocation = getRunnable(frameBuffer);
+            invoker.execute(invocation);
+            return true;
+        } catch (RejectedExecutionException rx) {
+            LOG.warn("ExecutorService rejected execution!", rx);
+            return false;
+        }
+    }
+
+    protected Runnable getRunnable(FrameBuffer frameBuffer) {
+        return new Invocation(frameBuffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftMethodHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftMethodHandler.java b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftMethodHandler.java
new file mode 100644
index 0000000..38a5fc0
--- /dev/null
+++ b/components/camel-thrift/src/main/java/org/apache/camel/component/thrift/server/ThriftMethodHandler.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift.server;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import javassist.util.proxy.MethodHandler;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.thrift.ThriftConstants;
+import org.apache.camel.component.thrift.ThriftConsumer;
+import org.apache.camel.component.thrift.ThriftEndpoint;
+import org.apache.camel.component.thrift.ThriftUtils;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+
+/**
+ * Thrift server methods invocation handler
+ */
+public class ThriftMethodHandler implements MethodHandler {
+    private final ThriftEndpoint endpoint;
+    private final ThriftConsumer consumer;
+
+    public ThriftMethodHandler(ThriftEndpoint endpoint, ThriftConsumer consumer) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
+        if (proceed == null) {
+            // Detects async methods invocation as a last argument is instance of
+            // {org.apache.thrift.async.AsyncMethodCallback}
+            if (args.length > 0 && args[args.length - 1] instanceof AsyncMethodCallback) {
+                AsyncMethodCallback callback = (AsyncMethodCallback)args[args.length - 1];
+                Exchange exchange = endpoint.createExchange();
+                if (args.length >= 2) {
+                    exchange.getIn().setBody(Arrays.asList(Arrays.copyOfRange(args, 0, args.length - 1)));
+                } else {
+                    exchange.getIn().setBody(null);
+                }
+                exchange.getIn().setHeader(ThriftConstants.THRIFT_METHOD_NAME_HEADER, thisMethod.getName());
+
+                consumer.process(exchange, doneSync -> {
+                    Message message = null;
+                    Object response = null;
+                    Exception exception = exchange.getException();
+
+                    if (exception != null) {
+                        callback.onError(exception);
+                    }
+
+                    if (exchange.hasOut()) {
+                        message = exchange.getOut();
+                    } else {
+                        message = exchange.getIn();
+                    }
+
+                    if (message != null) {
+                        Class returnType = ThriftUtils.findMethodReturnType(args[args.length - 1].getClass(), "onComplete");
+                        if (returnType != null) {
+                            response = message.getBody(returnType);
+                        } else {
+                            callback.onError(new TException("Unable to detect method return type"));
+                        }
+                    } else {
+                        callback.onError(new TException("Unable process null message"));
+                    }
+
+                    callback.onComplete(response);
+                });
+            } else {
+                Exchange exchange = endpoint.createExchange();
+                exchange.getIn().setBody(Arrays.asList(args));
+                exchange.getIn().setHeader(ThriftConstants.THRIFT_METHOD_NAME_HEADER, thisMethod.getName());
+
+                consumer.getProcessor().process(exchange);
+
+                Object responseBody = exchange.getIn().getBody(thisMethod.getReturnType());
+                if (responseBody == null && !thisMethod.getReturnType().equals(Void.TYPE)) {
+                    throw new TApplicationException("Return type requires not empty body");
+                }
+                return responseBody;
+            }
+
+            return null;
+        } else {
+            return proceed.invoke(self, args);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/main/resources/META-INF/services/org/apache/camel/component/thrift
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/main/resources/META-INF/services/org/apache/camel/component/thrift b/components/camel-thrift/src/main/resources/META-INF/services/org/apache/camel/component/thrift
new file mode 100644
index 0000000..5af0455
--- /dev/null
+++ b/components/camel-thrift/src/main/resources/META-INF/services/org/apache/camel/component/thrift
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.thrift.ThriftComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerAsyncTest.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerAsyncTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerAsyncTest.java
new file mode 100644
index 0000000..ae40c3c
--- /dev/null
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerAsyncTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.thrift.generated.Calculator;
+import org.apache.camel.component.thrift.generated.Operation;
+import org.apache.camel.component.thrift.generated.Work;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftConsumerAsyncTest extends CamelTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerAsyncTest.class);
+    private static final int THRIFT_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int THRIFT_TEST_NUM1 = 12;
+    private static final int THRIFT_TEST_NUM2 = 13;
+    private static Calculator.AsyncClient thriftClient;
+
+    private TNonblockingTransport transport;
+    private int calculateResult;
+    private int zipResult = -1;
+    private int pingResult = -1;
+    private int allTypesResult;
+    private Work echoResult;
+
+    @Before
+    public void startThriftClient() throws IOException, TTransportException {
+        if (transport == null) {
+            LOG.info("Connecting to the Thrift server on port: {}", THRIFT_TEST_PORT);
+            transport = new TNonblockingSocket("localhost", THRIFT_TEST_PORT);
+            thriftClient = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory())).getAsyncClient(transport);
+        }
+    }
+
+    @After
+    public void stopThriftClient() throws Exception {
+        if (transport != null) {
+            transport.close();
+            transport = null;
+            LOG.info("Connection to the Thrift server closed");
+        }
+    }
+
+    @Test
+    public void testCalculateMethodInvocation() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        Work work = new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY);
+
+        thriftClient.calculate(1, work, new AsyncMethodCallback<Integer>() {
+
+            @Override
+            public void onComplete(Integer response) {
+                calculateResult = response;
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Exception exception) {
+                LOG.info("Exception", exception);
+                latch.countDown();
+            }
+
+        });
+        latch.await(5, TimeUnit.SECONDS);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "calculate");
+        mockEndpoint.assertIsSatisfied();
+
+        assertEquals(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2, calculateResult);
+    }
+
+    @Test
+    public void testVoidMethodInvocation() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        thriftClient.ping(new AsyncMethodCallback<Void>() {
+
+            @Override
+            public void onComplete(Void response) {
+                pingResult = 0;
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Exception exception) {
+                LOG.info("Exception", exception);
+                latch.countDown();
+            }
+
+        });
+        latch.await(5, TimeUnit.SECONDS);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "ping");
+        mockEndpoint.assertIsSatisfied();
+
+        assertEquals(0, pingResult);
+    }
+
+    @Test
+    public void testOneWayMethodInvocation() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        thriftClient.zip(new AsyncMethodCallback<Void>() {
+
+            @Override
+            public void onComplete(Void response) {
+                zipResult = 0;
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Exception exception) {
+                LOG.info("Exception", exception);
+                latch.countDown();
+            }
+
+        });
+        latch.await(5, TimeUnit.SECONDS);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "zip");
+        mockEndpoint.assertIsSatisfied();
+
+        assertEquals(0, zipResult);
+    }
+
+    @Test
+    public void testAllTypesMethodInvocation() throws Exception {
+        LOG.info("Thrift method with all possile types async test start");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        thriftClient.alltypes(true, (byte)THRIFT_TEST_NUM1, (short)THRIFT_TEST_NUM1, (int)THRIFT_TEST_NUM1, (long)THRIFT_TEST_NUM1, (double)THRIFT_TEST_NUM1, "empty",
+                              ByteBuffer.allocate(10), new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY), new ArrayList<Integer>(), new HashSet<String>(),
+                              new HashMap<String, Long>(), new AsyncMethodCallback<Integer>() {
+
+                                  @Override
+                                  public void onComplete(Integer response) {
+                                      allTypesResult = response;
+                                      latch.countDown();
+                                  }
+
+                                  @Override
+                                  public void onError(Exception exception) {
+                                      LOG.info("Exception", exception);
+                                      latch.countDown();
+                                  }
+
+                              });
+        latch.await(5, TimeUnit.SECONDS);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "alltypes");
+        mockEndpoint.assertIsSatisfied();
+
+        assertEquals(THRIFT_TEST_NUM1, allTypesResult);
+    }
+    
+    @Test
+    public void testEchoMethodInvocation() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        Work work = new Work(THRIFT_TEST_NUM1, THRIFT_TEST_NUM2, Operation.MULTIPLY);
+
+        thriftClient.echo(work, new AsyncMethodCallback<Work>() {
+
+            @Override
+            public void onComplete(Work response) {
+                echoResult = response;
+                latch.countDown();
+            }
+
+            @Override
+            public void onError(Exception exception) {
+                LOG.info("Exception", exception);
+                latch.countDown();
+            }
+
+        });
+        latch.await(5, TimeUnit.SECONDS);
+
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:thrift-service");
+        mockEndpoint.expectedMessageCount(1);
+        mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(ThriftConstants.THRIFT_METHOD_NAME_HEADER, "echo");
+        mockEndpoint.assertIsSatisfied();
+
+        assertNotNull(echoResult);
+        assertTrue(echoResult instanceof Work);
+        assertEquals(THRIFT_TEST_NUM1, ((Work)echoResult).num1);
+        assertEquals(Operation.MULTIPLY, ((Work)echoResult).op);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("thrift://localhost:" + THRIFT_TEST_PORT + "/org.apache.camel.component.thrift.generated.Calculator")
+                    .to("mock:thrift-service").choice()
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("calculate")).setBody(simple(new Integer(THRIFT_TEST_NUM1 * THRIFT_TEST_NUM2).toString()))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("ping"))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("zip"))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("alltypes")).setBody(simple(new Integer(THRIFT_TEST_NUM1).toString()))
+                        .when(header(ThriftConstants.THRIFT_METHOD_NAME_HEADER).isEqualTo("echo")).setBody(simple("${body[0]}")).bean(new CalculatorMessageBuilder(), "echo");
+            }
+        };
+    }
+    
+    public class CalculatorMessageBuilder {
+        public Work echo(Work work) {
+            return work.deepCopy();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2a0a9e66/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
new file mode 100644
index 0000000..45a24de
--- /dev/null
+++ b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.thrift;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.googlecode.junittoolbox.MultithreadingTester;
+import com.googlecode.junittoolbox.RunnableAssert;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.thrift.generated.Calculator;
+import org.apache.camel.component.thrift.generated.Operation;
+import org.apache.camel.component.thrift.generated.Work;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThriftConsumerConcurrentTest extends CamelTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftConsumerConcurrentTest.class);
+
+    private static final int THRIFT_SYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int THRIFT_ASYNC_REQUEST_TEST_PORT = AvailablePortFinder.getNextAvailable();
+    private static final int THRIFT_TEST_NUM1 = 12;
+    private static final int CONCURRENT_THREAD_COUNT = 30;
+    private static final int ROUNDS_PER_THREAD_COUNT = 10;
+    
+    private static AtomicInteger idCounter = new AtomicInteger();
+
+    public static Integer createId() {
+        return idCounter.getAndIncrement();
+    }
+
+    public static Integer getId() {
+        return idCounter.get();
+    }
+
+    @Test
+    public void testSyncWithConcurrentThreads() throws Exception {
+        RunnableAssert ra = new RunnableAssert("testSyncWithConcurrentThreads") {
+
+            @Override
+            public void run() throws TTransportException {
+                TTransport transport = new TSocket("localhost", THRIFT_SYNC_REQUEST_TEST_PORT);
+                transport.open();
+                TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport));
+                Calculator.Client client = (new Calculator.Client.Factory()).getClient(protocol);
+
+                int instanceId = createId();
+
+                int calculateResponse = 0;
+                try {
+                    calculateResponse = client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
+                } catch (TException e) {
+                    LOG.info("Exception", e);
+                }
+                
+                assertNotNull("instanceId = " + instanceId, calculateResponse);
+                assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
+
+                transport.close();
+            }
+        };
+
+        new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run();
+    }
+    
+    @Test
+    public void testAsyncWithConcurrentThreads() throws Exception {
+        RunnableAssert ra = new RunnableAssert("testAsyncWithConcurrentThreads") {
+
+            @Override
+            public void run() throws TTransportException, IOException, InterruptedException {
+                final CountDownLatch latch = new CountDownLatch(1);
+                
+                TNonblockingTransport transport = new TNonblockingSocket("localhost", THRIFT_ASYNC_REQUEST_TEST_PORT);
+                Calculator.AsyncClient client = (new Calculator.AsyncClient.Factory(new TAsyncClientManager(), new TBinaryProtocol.Factory())).getAsyncClient(transport);
+
+                int instanceId = createId();
+                CalculateAsyncMethodCallback calculateCallback = new CalculateAsyncMethodCallback(latch);
+                try {
+                    client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY), calculateCallback);
+                } catch (TException e) {
+                    LOG.info("Exception", e);
+                }
+                latch.await(5, TimeUnit.SECONDS);
+                
+                int calculateResponse = calculateCallback.getCalculateResponse();
+                assertNotNull("instanceId = " + instanceId, calculateResponse);
+                assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
+
+                transport.close();
+            }
+        };
+
+        new MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT).run();
+    }
+    
+    public class CalculateAsyncMethodCallback implements AsyncMethodCallback<Integer> {
+        private final CountDownLatch latch;
+        private Integer calculateResponse;
+        
+        public CalculateAsyncMethodCallback(CountDownLatch latch) {
+            this.latch = latch;
+        }
+        
+        @Override
+        public void onComplete(Integer response) {
+            calculateResponse = response;
+            latch.countDown();
+        }
+
+        @Override
+        public void onError(Exception exception) {
+            LOG.info("Exception", exception);
+            latch.countDown();
+        }
+        
+        public Integer getCalculateResponse() {
+            return calculateResponse;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
+                from("thrift://localhost:" + THRIFT_SYNC_REQUEST_TEST_PORT + "/org.apache.camel.component.thrift.generated.Calculator?synchronous=true")
+                    .setBody(simple("${body[1]}")).bean(new CalculatorMessageBuilder(), "multiply");
+                
+                
+                from("thrift://localhost:" + THRIFT_ASYNC_REQUEST_TEST_PORT + "/org.apache.camel.component.thrift.generated.Calculator")
+                    .setBody(simple("${body[1]}")).bean(new CalculatorMessageBuilder(), "multiply");
+            }
+        };
+    }
+    
+    public class CalculatorMessageBuilder {
+        public Integer multiply(Work work) {
+            return work.num1 * work.num2;
+        }
+    }
+}


Mime
View raw message