camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [camel] branch master updated: CAMEL-12931 - Upgrade jBPM component to use 7 series with consumer capability to react to produced events by jBPM
Date Tue, 13 Nov 2018 07:11:42 GMT
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ece674  CAMEL-12931 - Upgrade jBPM component to use 7 series with consumer capability to react to produced events by jBPM
1ece674 is described below

commit 1ece67412d19215d7fb8fe9675ccd75c65b1d45d
Author: Maciej Swiderski <swiderski.maciej@gmail.com>
AuthorDate: Wed Nov 7 14:33:49 2018 +0100

    CAMEL-12931 - Upgrade jBPM component to use 7 series with consumer capability to react to produced events by jBPM
---
 components/camel-jbpm/pom.xml                      |  71 +++--
 .../camel-jbpm/src/main/docs/jbpm-component.adoc   | 175 +++++++++--
 ...MComponent.java => JBPMCamelConsumerAware.java} |  24 +-
 .../apache/camel/component/jbpm/JBPMComponent.java |   6 +-
 .../camel/component/jbpm/JBPMConfiguration.java    |  73 ++++-
 .../apache/camel/component/jbpm/JBPMConstants.java |  11 +-
 .../apache/camel/component/jbpm/JBPMConsumer.java  | 208 +++++++++++++
 .../apache/camel/component/jbpm/JBPMEndpoint.java  |  44 ++-
 .../apache/camel/component/jbpm/JBPMProducer.java  | 291 ++++++++++---------
 .../component/jbpm/emitters/CamelEventEmitter.java |  75 +++++
 .../emitters/ServiceRegistryBoundEventEmitter.java |  62 ++++
 .../jbpm/listeners/CamelCaseEventListener.java     | 286 ++++++++++++++++++
 .../jbpm/listeners/CamelProcessEventListener.java  | 138 +++++++++
 .../jbpm/listeners/CamelTaskEventListener.java     | 321 +++++++++++++++++++++
 .../jbpm/server/CamelKieServerExtension.java       | 225 +++++++++++++++
 .../org.kie.server.services.api.KieServerExtension |   1 +
 .../jbpm/JBPMComponentIntegrationTest.java         |  65 ++++-
 parent/pom.xml                                     |   2 +-
 .../karaf/features/src/main/resources/features.xml |  50 ++--
 19 files changed, 1864 insertions(+), 264 deletions(-)

diff --git a/components/camel-jbpm/pom.xml b/components/camel-jbpm/pom.xml
index 6c197d4..2a5a161 100644
--- a/components/camel-jbpm/pom.xml
+++ b/components/camel-jbpm/pom.xml
@@ -43,32 +43,55 @@
             <artifactId>camel-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.kie.remote</groupId>
-            <artifactId>kie-remote-client</artifactId>
+            <groupId>org.kie.server</groupId>
+            <artifactId>kie-server-client</artifactId>
             <version>${jbpm-version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.hornetq</groupId>
-                    <artifactId>hornetq-core-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.hornetq</groupId>
-                    <artifactId>hornetq-jms-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.jboss.spec.javax.jms</groupId>
-                    <artifactId>jboss-jms-api_1.1_spec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.jboss.spec.javax.xml.ws</groupId>
-                    <artifactId>jboss-jaxws-api_2.2_spec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.jboss.logging</groupId>
-                    <artifactId>jboss-logging</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
+        
+        <!-- jBPM consumer -->
+        <dependency>
+            <groupId>org.kie</groupId>
+            <artifactId>kie-api</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.kie</groupId>
+            <artifactId>kie-internal</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.jbpm</groupId>
+            <artifactId>jbpm-services-api</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.jbpm</groupId>
+            <artifactId>jbpm-case-mgmt-api</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.kie.server</groupId>
+            <artifactId>kie-server-services-common</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.jbpm</groupId>
+            <artifactId>jbpm-persistence-api</artifactId>
+            <version>${jbpm-version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        
         <dependency>
             <groupId>org.jboss.logging</groupId>
             <artifactId>jboss-logging</artifactId>
diff --git a/components/camel-jbpm/src/main/docs/jbpm-component.adoc b/components/camel-jbpm/src/main/docs/jbpm-component.adoc
index 70e7796..46928d2 100644
--- a/components/camel-jbpm/src/main/docs/jbpm-component.adoc
+++ b/components/camel-jbpm/src/main/docs/jbpm-component.adoc
@@ -4,9 +4,9 @@
 *Available as of Camel version 2.6*
 
 The *jbpm* component provides integration with Business Process
-Management (BPM) Suit http://www.jbpm.org/[jBPM]. It uses
-kie-remote-client API to interact with jBPM instance over REST. The
-component supports only producer.
+Management http://www.jbpm.org/[jBPM]. It uses
+kie-server-client API to interact with jBPM instance over REST. The
+component supports both producer and consumer.
 
 Maven users will need to add the following dependency to their `pom.xml`
 for this component:
@@ -20,6 +20,111 @@ for this component:
 </dependency>
 ------------------------------------------------------------------------------------
 
+## Consumer
+
+jBPM Consumer allows to attach routes to 
+
+* ProcessEventListeners
+* TaskEventListners
+* CaseEventListeners
+
+### URI format
+
+[source,java]
+---------------------------------------------
+jbpm::events:type:[classifier][?options]
+---------------------------------------------
+
+==== Path Parameters (3 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *events* | Classifier for the consumer to know which type of data it should attach to |  | URL
+| *type* | Type of event listener - supports: process, task, case |  | String
+| *classifier* | Used to distinguish routes for same event type|  | String
+|===
+
+Each route would then receive events when they are being produced by jBPM engine.
+
+Routes can be defined either in global way - on application level or deployed 
+together with business assets projects also knows as KJARs.
+
+Consumers are configured via KieServerExtension that is a pluggable interface to enhance
+jBPM with additional capabilities. It reacts to different life cycle phases of the KIE Server
+and by that is able to configure individual endpoints properly.
+
+### KJAR routes
+
+Create file named `camel-routes.xml` in the root folder of your KJAR (src/main/resources) so it will be automatically 
+discovered and Camel Context for given KJAR will be created.
+
+### Global routes
+
+Create file name `global-camel-routes` in the root of the class path of KIE Server. It will be automatically found and registered
+on every KJAR deployed to KIE Server.
+
+
+Example camel-routes.xml file that can be placed in the KJAR
+
+[source, xml]
+----
+<routes xmlns="http://camel.apache.org/schema/spring">
+    
+    <route id="processes">
+        <from uri="jbpm:events:process:test"/>
+        <filter>
+          <simple>${in.header.EventType} == 'beforeProcessStarted'</simple>
+          <to uri="log:kjar.processes?level=INFO&amp;showBody=true&amp;showHeaders=true"/>
+        </filter>
+    </route>
+    
+    <route id="tasks">
+        <from uri="jbpm:events:task:test"/>
+        <filter>
+          <simple>${in.header.EventType} starts with 'before'</simple>
+          <to uri="log:kjar.tasks?level=INFO&amp;showBody=true&amp;showHeaders=true"/>
+        </filter>
+    </route>
+</routes>
+----
+
+
+### Use of jBPM Component in KIE Server
+
+To make use of camel-jbpm component in a KIE Server it is as simple as just adding two jars into KIE Server application
+
+* camel-core
+* camel-jbpm
+
+then start KIE Server and you will see once booted following information in logs
+
+[source, plain]
+----
+Camel KIE Server extension has been successfully registered as server extension
+....
+
+Route: tasks started and consuming from: jbpm://events:task:test?deploymentId=form-rendering_1.0.0
+Total 2 routes, of which 2 are started
+Apache Camel 2.23.0-SNAPSHOT (CamelContext: KIE Server Camel context for container evaluation_1.0.0) started in 0.378 seconds
+o.k.server.services.impl.KieServerImpl   : Container evaluation_1.0.0 (for release id evaluation:evaluation:1.0.0) successfully started
+----
+
+To make use of jBPM Consumer jBPM deployment descriptor must also define camel specific event listeners of following types
+
+* `new org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()`
+* `new org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()`
+* `new org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()`
+
+These must be set in either server level of kjar deployment descriptor (use MVEL as resolver type) - see jbpm docs for more details about
+deployment descriptors.
+
+## Producer
+
+Producer is dedicated to interact with jBPM via kie-server-client that uses exposed REST api of 
+jBPM (KIE Server).
+
 ### URI format
 
 [source,java]
@@ -47,40 +152,46 @@ jbpm:connectionURL
 
 with the following path and query parameters:
 
-==== Path Parameters (1 parameters):
+==== Path Parameters (2 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
 | *connectionURL* | *Required* The URL to the jBPM server. |  | URL
+| *eventListenerType* | Sets the event listener type to attach to |  | String
 |===
 
 
-==== Query Parameters (25 parameters):
+==== Query Parameters (30 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *attachmentId* (producer) | attachId to use when retrieving attachments |  | Long
-| *contentId* (producer) | contentId to use when retrieving attachments |  | Long
-| *deploymentId* (producer) | *Required* The id of the deployment |  | String
-| *event* (producer) | the data associated with this event when signalEvent operation is performed |  | Object
-| *eventType* (producer) | the type of event to use when signalEvent operation is performed |  | String
-| *identifier* (producer) | identifier the global identifier |  | String
-| *language* (producer) | The language to use when filtering user tasks |  | String
-| *maxNumber* (producer) | the maximum number of rules that should be fired |  | Integer
+| *attachmentId* (common) | attachId to use when retrieving attachments |  | Long
+| *contentId* (common) | contentId to use when retrieving attachments |  | Long
+| *deploymentId* (common) | *Required* The id of the deployment |  | String
+| *emitterSendItems* (common) | Sets if event produced by emitter should be sent as single items or complete collection |  | Boolean
+| *event* (common) | the data associated with this event when signalEvent operation is performed |  | Object
+| *eventType* (common) | the type of event to use when signalEvent operation is performed |  | String
+| *identifier* (common) | identifier the global identifier |  | String
+| *maxNumber* (common) | the maximum number of rules that should be fired |  | Integer
+| *page* (common) | The page to use when retrieving user tasks |  | Integer
+| *pageSize* (common) | The page size to use when retrieving user tasks |  | Integer
+| *processId* (common) | the id of the process that should be acted upon |  | String
+| *processInstanceId* (common) | the id of the process instance |  | Long
+| *targetUserId* (common) | The targetUserId used when delegating a task |  | String
+| *task* (common) | The task instance to use with task operations |  | Task
+| *taskId* (common) | the id of the task |  | Long
+| *timeout* (common) | A timeout value |  | Integer
+| *userId* (common) | userId to use with task operations |  | String
+| *value* (common) | the value to assign to the global identifier |  | Object
+| *workItemId* (common) | the id of the work item |  | Long
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *operation* (producer) | The operation to perform | startProcess | String
-| *processId* (producer) | the id of the process that should be acted upon |  | String
-| *processInstanceId* (producer) | the id of the process instance |  | Long
-| *targetUserId* (producer) | The targetUserId used when delegating a task |  | String
-| *task* (producer) | The task instance to use with task operations |  | Task
-| *taskId* (producer) | the id of the task |  | Long
-| *timeout* (producer) | A timeout value |  | Integer
-| *userId* (producer) | userId to use with task operations |  | String
-| *value* (producer) | the value to assign to the global identifier |  | Object
-| *workItemId* (producer) | the id of the work item |  | Long
 | *entities* (advanced) | The potentialOwners when nominateTask operation is performed |  | List
 | *extraJaxbClasses* (advanced) | To load additional classes when working with XML |  | Class[]
 | *parameters* (advanced) | the variables that should be set for various operations |  | Map
@@ -153,23 +264,29 @@ org.infinispan.notifications.cachelistener.event.Event.Type
 
 |CamelJBPMContentId |0 |Long |contentId to use when retrieving attachments
 
-|CamelJBPMEntityList |null |List<OrganizationalEntity> |The potentialOwners when nominateTask operation is performed
+|CamelJBPMEntityList |null |List<String> |The potentialOwners when nominateTask operation is performed
 
-|CamelJBPMStatusList |null |List<Status> |The list of status to use when filtering tasks
+|CamelJBPMStatusList |null |List<String> |The list of status to use when filtering tasks
 |=======================================================================
 
 ### Example
 
 Below is an example route that starts a business process with id
-project1.integration-test and deploymentId
-org.kie.example:project1:1.0.0-SNAPSHOT
+evaluation. To run this example you need jBPM to run locally, easiest is to use single zip 
+distribution - downloaded from jbpm.org. Next, start it and import Evaluation sample project, build and deploy.
+Once done this test can be ran out of the box.
 
 [source,java]
 ----------------------------------------------------------------------------------------------
+Map<String, Object> params = new HashMap<>();
+params.put("employee", "wbadmin");
+params.put("reason", "Camel asks for it");
+
 from("direct:start")
-        .setHeader(JBPMConstants.PROCESS_ID, constant("project1.integration-test"))
-        .to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=pa$word1"
- + "&deploymentId=org.kie.example:project1:1.0.0-SNAPSHOT");
+        .setHeader(JBPMConstants.PROCESS_ID, constant("evaluation"))
+        .setHeader((JBPMConstants.PARAMETERS, params))
+        .to("jbpm:http://localhost:8080/kie-server/services/rest/server?userName=wbadmin&password=wbadmin
+        &deploymentId=evaluation");
 ----------------------------------------------------------------------------------------------
 
 ### See Also
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
similarity index 61%
copy from components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
copy to components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
index f562234..85b4df2 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
@@ -14,20 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.jbpm;
-
-import java.net.URL;
-import java.util.Map;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.DefaultComponent;
+package org.apache.camel.component.jbpm;
 
-public class JBPMComponent extends DefaultComponent {
+/**
+ * Indicates that class implementing this interface should receive (at some point)
+ * JBPMConsumer instance that is required to operate.
+ */
+public interface JBPMCamelConsumerAware {
 
-    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        JBPMConfiguration configuration = new JBPMConfiguration();
-        configuration.setConnectionURL(new URL(remaining));
-        setProperties(configuration, parameters);
-        return new JBPMEndpoint(uri, this, configuration);
-    }
-}
\ No newline at end of file
+    void addConsumer(JBPMConsumer consumer);
+    
+    void removeConsumer(JBPMConsumer consumer);
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
index f562234..ed03b51 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
@@ -26,7 +26,11 @@ public class JBPMComponent extends DefaultComponent {
 
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         JBPMConfiguration configuration = new JBPMConfiguration();
-        configuration.setConnectionURL(new URL(remaining));
+        if (remaining.startsWith("events")) {
+            configuration.setEventListenerType(remaining.split(":")[1]);
+        } else {        
+            configuration.setConnectionURL(new URL(remaining));
+        }
         setProperties(configuration, parameters);
         return new JBPMEndpoint(uri, this, configuration);
     }
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
index c4a30c7..9633aaa 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jbpm;
 
 import java.net.URL;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -24,8 +25,6 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
-import org.kie.api.task.model.OrganizationalEntity;
-import org.kie.api.task.model.Status;
 import org.kie.api.task.model.Task;
 
 @UriParams
@@ -58,7 +57,9 @@ public class JBPMConfiguration {
     @UriParam
     private String userId;
     @UriParam
-    private String language;
+    private Integer page = 0;
+    @UriParam
+    private Integer pageSize = 10;
     @UriParam
     private String targetUserId;
     @UriParam
@@ -68,9 +69,9 @@ public class JBPMConfiguration {
     @UriParam
     private Task task;
     @UriParam(label = "advanced")
-    private List<OrganizationalEntity> entities;
+    private List<String> entities;
     @UriParam(label = "filter")
-    private List<Status> statuses;
+    private List<String> statuses;
     @UriParam(label = "security", secret = true)
     private String userName;
     @UriParam(label = "security", secret = true)
@@ -81,6 +82,11 @@ public class JBPMConfiguration {
     private Map<String, Object> parameters;
     @UriParam(label = "advanced")
     private Class[] extraJaxbClasses;
+    @UriParam
+    private Boolean emitterSendItems;
+    
+    @UriPath
+    private String eventListenerType;
 
     public String getOperation() {
         return operation;
@@ -225,15 +231,26 @@ public class JBPMConfiguration {
         this.task = task;
     }
 
-    public String getLanguage() {
-        return language;
+    public Integer getPage() {
+        return page;
+    }
+
+    /**
+     * The page to use when retrieving user tasks
+     */
+    public void setPage(Integer page) {
+        this.page = page;
+    }
+    
+    public Integer getPageSize() {
+        return pageSize;
     }
 
     /**
-     * The language to use when filtering user tasks
+     * The page size to use when retrieving user tasks
      */
-    public void setLanguage(String language) {
-        this.language = language;
+    public void setPageSize(Integer pageSize) {
+        this.pageSize = pageSize;
     }
 
     public String getTargetUserId() {
@@ -269,25 +286,25 @@ public class JBPMConfiguration {
         this.contentId = contentId;
     }
 
-    public List<OrganizationalEntity> getEntities() {
+    public List<String> getEntities() {
         return entities;
     }
 
     /**
      * The potentialOwners when nominateTask operation is performed
      */
-    public void setEntities(List<OrganizationalEntity> entities) {
+    public void setEntities(List<String> entities) {
         this.entities = entities;
     }
 
-    public List<Status> getStatuses() {
+    public List<String> getStatuses() {
         return statuses;
     }
 
     /**
      * The list of status to use when filtering tasks
      */
-    public void setStatuses(List<Status> statuses) {
+    public void setStatuses(List<String> statuses) {
         this.statuses = statuses;
     }
 
@@ -356,4 +373,32 @@ public class JBPMConfiguration {
     public void setExtraJaxbClasses(Class[] extraJaxbClasses) {
         this.extraJaxbClasses = extraJaxbClasses;
     }
+
+    
+    public String getEventListenerType() {
+        return eventListenerType;
+    }
+
+    /**
+     * Sets the event listener type to attach to
+     */
+    public void setEventListenerType(String eventListenerType) {
+        this.eventListenerType = eventListenerType;
+    }
+    
+    public Boolean getEmitterSendItems() {
+        return emitterSendItems;
+    }
+
+    /**
+     * Sets if event produced by emitter should be sent as single items or complete collection
+     */
+    public void setEmitterSendItems(Boolean emiterSendItems) {
+        this.emitterSendItems = emiterSendItems;
+    }
+
+    @Override
+    public String toString() {
+        return "JBPMConfiguration [connectionURL=" + connectionURL + ", operation=" + operation + ", deploymentId=" + deploymentId + ", processInstanceId=" + processInstanceId + ", value=" + value + ", processId=" + processId + ", eventType=" + eventType + ", event=" + event + ", maxNumber=" + maxNumber + ", identifier=" + identifier + ", workItemId=" + workItemId + ", taskId=" + taskId + ", userId=" + userId + ", page=" + page + ", pageSize=" + pageSize + ", targetUserId=" + targetUserI [...]
+    }
 }
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
index 6b00d02..8341cfd 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
@@ -30,10 +30,17 @@ public interface JBPMConstants {
     String TASK_ID = "CamelJBPMTaskId";
     String TASK = "CamelJBPMTask";
     String USER_ID = "CamelJBPMUserId";
-    String TARGET_USER_ID = "CamelJBPMTargetUserId";
-    String LANGUAGE = "CamelJBPMLanguage";
+    String TARGET_USER_ID = "CamelJBPMTargetUserId";    
     String ATTACHMENT_ID = "CamelJBPMAttachmentId";
     String CONTENT_ID = "CamelJBPMContentId";
     String ENTITY_LIST = "CamelJBPMEntityList";
     String STATUS_LIST = "CamelJBPMStatusList";
+    String RESULT_PAGE = "CamelJBPMResultPage";
+    String RESULT_PAGE_SIZE = "CamelJBPMResultPageSize";
+    
+    
+    String JBPM_PROCESS_EVENT_LISTENER = "process";
+    String JBPM_TASK_EVENT_LISTENER = "task";
+    String JBPM_CASE_EVENT_LISTENER = "case";
+    String JBPM_EVENT_EMITTER = "emitter";
 }
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
new file mode 100644
index 0000000..6c5c6f9
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jbpm.emitters.CamelEventEmitter;
+import org.apache.camel.component.jbpm.listeners.CamelCaseEventListener;
+import org.apache.camel.component.jbpm.listeners.CamelProcessEventListener;
+import org.apache.camel.component.jbpm.listeners.CamelTaskEventListener;
+import org.apache.camel.impl.DefaultConsumer;
+import org.jbpm.services.api.DeploymentEvent;
+import org.jbpm.services.api.DeploymentEventListener;
+import org.jbpm.services.api.DeploymentService;
+import org.jbpm.services.api.ListenerSupport;
+import org.jbpm.services.api.model.DeployedUnit;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.internal.runtime.manager.CacheManager;
+import org.kie.internal.runtime.manager.InternalRuntimeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JBPMConsumer extends DefaultConsumer implements DeploymentEventListener {
+    
+    private static final transient Logger LOGGER = LoggerFactory.getLogger(JBPMConsumer.class);
+   
+    private JBPMEndpoint endpoint;
+    private JBPMConfiguration configuration;
+    
+    public JBPMConsumer(Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        
+        this.endpoint = (JBPMEndpoint) endpoint;
+        this.configuration = ((JBPMEndpoint) getEndpoint()).getConfiguration();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        
+        DeploymentService deploymentService = (DeploymentService) ServiceRegistry.get().service(ServiceRegistry.DEPLOYMENT_SERVICE);        
+        
+        if (configuration.getDeploymentId() != null) {
+            InternalRuntimeManager manager = (InternalRuntimeManager) deploymentService.getRuntimeManager(configuration.getDeploymentId());
+            configure(manager, this);
+            
+            LOGGER.debug("JBPM Camel Consumer configured and started for deployment id {}", configuration.getDeploymentId());
+        } else {
+            
+            ((ListenerSupport) deploymentService).addListener(this);
+            
+            for (DeployedUnit deployed : deploymentService.getDeployedUnits()) {
+                InternalRuntimeManager manager = (InternalRuntimeManager) deployed.getRuntimeManager();
+                configure(manager, this); 
+            }
+            
+            LOGGER.debug("JBPM Camel Consumer configured and started on all available deployments");
+        }
+        
+        
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        DeploymentService deploymentService = (DeploymentService) ServiceRegistry.get().service(ServiceRegistry.DEPLOYMENT_SERVICE);        
+        if (configuration.getDeploymentId() != null) {
+            LOGGER.debug("JBPM Camel Consumer unconfigured and stopped for deployment id {}", configuration.getDeploymentId());
+        } else {
+            ((ListenerSupport) deploymentService).removeListener(this);
+            
+            LOGGER.debug("JBPM Camel Consumer unconfigured and stopped on all available deployments");
+        }
+        
+        if (JBPMConstants.JBPM_EVENT_EMITTER.equals(configuration.getEventListenerType())) {
+            ServiceRegistry.get().remove("CamelEventEmitter");
+        }
+        
+    }
+
+    public void sendMessage(String eventType, Object body) {
+        Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
+        exchange.getIn().setHeader("EventType", eventType);
+        
+        exchange.getIn().setBody(body);
+
+        if (!endpoint.isSynchronous()) {
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    // handle any thrown exception
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+                    }
+                }
+            });
+        } else {
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            // handle any thrown exception
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+            }
+        }
+    }
+    
+    @Override
+    public void onDeploy(DeploymentEvent event) {
+        InternalRuntimeManager manager = (InternalRuntimeManager) event.getDeployedUnit().getRuntimeManager();
+        configure(manager, this);       
+
+    }
+
+    @Override
+    public void onUnDeploy(DeploymentEvent event) {  
+        // no-op
+    }
+
+    @Override
+    public void onActivate(DeploymentEvent event) {
+        // no-op
+        
+    }
+
+    @Override
+    public void onDeactivate(DeploymentEvent event) {
+        // no-op
+        
+    }
+
+    
+    protected void configure(InternalRuntimeManager manager, JBPMConsumer consumer) {
+        String eventListenerType = configuration.getEventListenerType();
+        if (eventListenerType == null) {
+            return;
+        }
+        
+       
+        configureConsumer(eventListenerType, manager, consumer);
+        
+    }
+    
+    protected void configureConsumer(String eventListenerType, InternalRuntimeManager manager, JBPMConsumer consumer) {
+        LOGGER.debug("Configuring Camel JBPM Consumer for {} on runtime manager {}", eventListenerType, manager);
+       
+        CacheManager cacheManager = manager.getCacheManager();
+        JBPMCamelConsumerAware consumerAware = null;
+        if (JBPMConstants.JBPM_PROCESS_EVENT_LISTENER.equals(eventListenerType)) {
+            consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()");
+            if (consumerAware == null) {
+                consumerAware = new CamelProcessEventListener();
+                cacheManager.add("new org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()", consumerAware);
+            }
+            LOGGER.debug("Configuring JBPMConsumer on process event listener {}", consumerAware);
+        } else if (JBPMConstants.JBPM_TASK_EVENT_LISTENER.equals(eventListenerType)) {
+            consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()");
+            if (consumerAware == null) {
+                consumerAware = new CamelTaskEventListener();
+                cacheManager.add("new org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()", consumerAware);
+            }
+            LOGGER.debug("Configuring JBPMConsumer on task event listener {}", consumerAware);
+        } else if (JBPMConstants.JBPM_CASE_EVENT_LISTENER.equals(eventListenerType)) {
+            consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()");
+            if (consumerAware == null) {
+                consumerAware = new CamelCaseEventListener();
+                cacheManager.add("new org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()", consumerAware);
+            }
+            LOGGER.debug("Configuring JBPMConsumer on case event listener {}", consumerAware);
+        } else if (JBPMConstants.JBPM_EVENT_EMITTER.equals(eventListenerType)) {
+            LOGGER.debug("Configuring JBPMConsumer for event emitter");
+            ServiceRegistry.get().register("CamelEventEmitter", new CamelEventEmitter(this, configuration.getEmitterSendItems()));
+            
+            return;
+        }        
+  
+        LOGGER.debug("Adding consumer {} on {}", consumer, consumerAware);
+        consumerAware.addConsumer(consumer);    
+        
+    }
+
+    @Override
+    public String toString() {
+        return "JBPMConsumer [endpoint=" + endpoint + ", configuration=" + configuration + "]";
+    }
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
index 1a5d5e5..eb0472f 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
@@ -18,6 +18,9 @@ package org.apache.camel.component.jbpm;
 
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -25,16 +28,16 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.kie.api.runtime.manager.RuntimeEngine;
-import org.kie.remote.client.api.RemoteRestRuntimeEngineBuilder;
-import org.kie.services.client.api.RemoteRuntimeEngineFactory;
+import org.kie.server.client.KieServicesClient;
+import org.kie.server.client.KieServicesConfiguration;
+import org.kie.server.client.KieServicesFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * The jbpm component provides integration with jBPM (Business Process Management).
  */
-@UriEndpoint(firstVersion = "2.6.0", scheme = "jbpm", title = "JBPM", syntax = "jbpm:connectionURL", producerOnly = true, label = "process")
+@UriEndpoint(firstVersion = "2.6.0", scheme = "jbpm", title = "JBPM", syntax = "jbpm:connectionURL", label = "process")
 public class JBPMEndpoint extends DefaultEndpoint {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(JBPMEndpoint.class);
 
@@ -47,35 +50,24 @@ public class JBPMEndpoint extends DefaultEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        RemoteRestRuntimeEngineBuilder engineBuilder = RemoteRuntimeEngineFactory.newRestBuilder();
-        if (configuration.getUserName() != null) {
-            engineBuilder.addUserName(configuration.getUserName());
-        }
-        if (configuration.getPassword() != null) {
-            engineBuilder.addPassword(configuration.getPassword());
-        }
-        if (configuration.getDeploymentId() != null) {
-            engineBuilder.addDeploymentId(configuration.getDeploymentId());
-        }
-        if (configuration.getConnectionURL() != null) {
-            engineBuilder.addUrl(configuration.getConnectionURL());
-        }
-        if (configuration.getProcessInstanceId() != null) {
-            engineBuilder.addProcessInstanceId(configuration.getProcessInstanceId());
-        }
+        KieServicesConfiguration kieConfiguration = KieServicesFactory.newRestConfiguration(configuration.getConnectionURL().toExternalForm(), configuration.getUserName(), configuration.getPassword());
+  
         if (configuration.getTimeout() != null) {
-            engineBuilder.addTimeout(configuration.getTimeout());
+            kieConfiguration.setTimeout(configuration.getTimeout());
         }
         if (configuration.getExtraJaxbClasses() != null) {
-            engineBuilder.addExtraJaxbClasses(configuration.getExtraJaxbClasses());
+            List<Class<?>> classes = Arrays.asList(configuration.getExtraJaxbClasses());
+            kieConfiguration.addExtraClasses(new LinkedHashSet<>(classes));
         }
-        RuntimeEngine runtimeEngine = engineBuilder.build();
-
-        return new JBPMProducer(this, runtimeEngine);
+        
+        KieServicesClient kieServerClient = KieServicesFactory.newKieServicesClient(kieConfiguration);        
+        LOGGER.debug("JBPM Producer created with KieServerClient configured for {}", configuration.getConnectionURL());
+        return new JBPMProducer(this, kieServerClient);
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("Consumer not supported for " + getClass().getSimpleName() + " endpoint");
+        LOGGER.debug("JBPM Consumer created and configured for deployment {}", configuration.getDeploymentId());
+        return new JBPMConsumer(this, processor);
     }
 
     public boolean isSingleton() {
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
index 8fc9fc4..c9f218b 100644
--- a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.jbpm;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -24,55 +26,54 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ExchangeHelper;
-import org.kie.api.runtime.KieSession;
-import org.kie.api.runtime.manager.RuntimeEngine;
-import org.kie.api.runtime.process.ProcessInstance;
-import org.kie.api.task.TaskService;
-import org.kie.api.task.model.Attachment;
-import org.kie.api.task.model.Content;
-import org.kie.api.task.model.OrganizationalEntity;
-import org.kie.api.task.model.Status;
+import org.kie.api.KieServices;
+import org.kie.api.command.BatchExecutionCommand;
+import org.kie.api.command.Command;
+import org.kie.api.command.KieCommands;
+import org.kie.api.runtime.ExecutionResults;
 import org.kie.api.task.model.Task;
-import org.kie.api.task.model.TaskSummary;
+import org.kie.server.api.model.ServiceResponse;
+import org.kie.server.api.model.instance.ProcessInstance;
+import org.kie.server.api.model.instance.TaskAttachment;
+import org.kie.server.api.model.instance.TaskInstance;
+import org.kie.server.api.model.instance.TaskSummary;
+import org.kie.server.client.KieServicesClient;
+import org.kie.server.client.ProcessServicesClient;
+import org.kie.server.client.QueryServicesClient;
+import org.kie.server.client.RuleServicesClient;
+import org.kie.server.client.UserTaskServicesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JBPMProducer extends DefaultProducer {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(JBPMProducer.class);
-    private KieSession kieSession;
-    private TaskService taskService;
+
+    private static KieCommands commandsFactory = KieServices.get().getCommands();
+    
     private JBPMConfiguration configuration;
-    private RuntimeEngine runtimeEngine;
+    private KieServicesClient kieServicesClient;
+    
 
-    public JBPMProducer(JBPMEndpoint endpoint, RuntimeEngine runtimeEngine) {
+    public JBPMProducer(JBPMEndpoint endpoint, KieServicesClient kieServicesClient) {
         super(endpoint);
         this.configuration = endpoint.getConfiguration();
-        this.runtimeEngine = runtimeEngine;
+        this.kieServicesClient = kieServicesClient;        
     }
 
     @Override
     protected void doStart() throws Exception {
         LOGGER.trace("starting producer");
-        kieSession = runtimeEngine.getKieSession();
-        taskService = runtimeEngine.getTaskService();
         super.doStart();
         LOGGER.trace("started producer");
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (kieSession != null) {
-            kieSession = null;
-        }
-
-        if (taskService != null) {
-            taskService = null;
-        }
+        super.doStop();        
     }
 
     public void process(Exchange exchange) throws Exception {
-        getOperation(exchange).execute(kieSession, taskService, configuration, exchange);
+        getOperation(exchange).execute(kieServicesClient, configuration, exchange);
     }
 
     Operation getOperation(Exchange exchange) {
@@ -92,35 +93,40 @@ public class JBPMProducer extends DefaultProducer {
         //PROCESS OPERATIONS
         startProcess {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                ProcessInstance processInstance = kieSession.startProcess(getProcessId(configuration, exchange), getParameters(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
+                Long processInstance = processClient.startProcess(configuration.getDeploymentId(), getProcessId(configuration, exchange), getParameters(configuration, exchange));
                 setResult(exchange, processInstance);
             }
         }, abortProcessInstance {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                kieSession.abortProcessInstance(safe(getProcessInstanceId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
+                processClient.abortProcessInstance(configuration.getDeploymentId(), safe(getProcessInstanceId(configuration, exchange)));
             }
         }, signalEvent {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
                 Long processInstanceId = getProcessInstanceId(configuration, exchange);
                 if (processInstanceId != null) {
-                    kieSession.signalEvent(getEventType(configuration, exchange), getEvent(configuration, exchange), processInstanceId);
+                    processClient.signalProcessInstance(configuration.getDeploymentId(), processInstanceId, getEventType(configuration, exchange), getEvent(configuration, exchange));
                 } else {
-                    kieSession.signalEvent(getEventType(configuration, exchange), getEvent(configuration, exchange));
+                    processClient.signal(configuration.getDeploymentId(), getEventType(configuration, exchange), getEvent(configuration, exchange));
                 }
             }
         }, getProcessInstance {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                ProcessInstance processInstance = kieSession.getProcessInstance(safe(getProcessInstanceId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
+                ProcessInstance processInstance = processClient.getProcessInstance(configuration.getDeploymentId(), safe(getProcessInstanceId(configuration, exchange)));
                 setResult(exchange, processInstance);
             }
         }, getProcessInstances {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Collection<ProcessInstance> processInstances = kieSession.getProcessInstances();
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                QueryServicesClient queryClient = kieServicesClient.getServicesClient(QueryServicesClient.class);
+                Collection<ProcessInstance> processInstances = queryClient.findProcessInstances(getPage(configuration, exchange), getPageSize(configuration, exchange));
                 setResult(exchange, processInstances);
             }
         },
@@ -128,198 +134,217 @@ public class JBPMProducer extends DefaultProducer {
         //RULE OPERATIONS
         fireAllRules {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                RuleServicesClient ruleClient = kieServicesClient.getServicesClient(RuleServicesClient.class);
+                List<Command<?>> commands = new ArrayList<Command<?>>();
+                BatchExecutionCommand executionCommand = commandsFactory.newBatchExecution(commands);
+                
                 Integer max = getMaxNumber(configuration, exchange);
-                int rulesFired;
                 if (max != null) {
-                    rulesFired = kieSession.fireAllRules(max);
+                    commands.add(commandsFactory.newFireAllRules(max));
                 } else {
-                    rulesFired = kieSession.fireAllRules();
+                    commands.add(commandsFactory.newFireAllRules());
                 }
-                setResult(exchange, rulesFired);
-            }
-        }, getFactCount {
-            @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                long factCount = kieSession.getFactCount();
-                setResult(exchange, factCount);
+                ServiceResponse<ExecutionResults> reply = ruleClient.executeCommandsWithResults(configuration.getDeploymentId(), executionCommand);                
+                setResult(exchange, reply.getResult());
             }
         }, getGlobal {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Object global = kieSession.getGlobal(getIdentifier(configuration, exchange));
-                setResult(exchange, global);
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                RuleServicesClient ruleClient = kieServicesClient.getServicesClient(RuleServicesClient.class);
+                List<Command<?>> commands = new ArrayList<Command<?>>();
+                BatchExecutionCommand executionCommand = commandsFactory.newBatchExecution(commands);
+                String identifier = getIdentifier(configuration, exchange);
+                commands.add(commandsFactory.newGetGlobal(identifier, identifier));
+
+                ServiceResponse<ExecutionResults> reply = ruleClient.executeCommandsWithResults(configuration.getDeploymentId(), executionCommand);  
+                setResult(exchange, reply.getResult().getValue(identifier));
             }
         }, setGlobal {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                kieSession.setGlobal(getIdentifier(configuration, exchange), getValue(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                RuleServicesClient ruleClient = kieServicesClient.getServicesClient(RuleServicesClient.class);
+                List<Command<?>> commands = new ArrayList<Command<?>>();
+                BatchExecutionCommand executionCommand = commandsFactory.newBatchExecution(commands);
+
+                commands.add(commandsFactory.newSetGlobal(getIdentifier(configuration, exchange), getValue(configuration, exchange)));
+
+                ruleClient.executeCommandsWithResults(configuration.getDeploymentId(), executionCommand);                
             }
         },
 
         //WORK ITEM OPERATIONS
         abortWorkItem {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                kieSession.getWorkItemManager().abortWorkItem(safe(getWorkItemId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
+                processClient.abortWorkItem(configuration.getDeploymentId(), safe(getProcessInstanceId(configuration, exchange)), safe(getWorkItemId(configuration, exchange)));
             }
         }, completeWorkItem {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                kieSession.getWorkItemManager().completeWorkItem(safe(getWorkItemId(configuration, exchange)), getParameters(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                ProcessServicesClient processClient = kieServicesClient.getServicesClient(ProcessServicesClient.class);
+                processClient.completeWorkItem(configuration.getDeploymentId(), safe(getProcessInstanceId(configuration, exchange)), safe(getWorkItemId(configuration, exchange)), getParameters(configuration, exchange));
             }
         },
 
         //TASK OPERATIONS
         activateTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.activate(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
-            }
-        }, addTask {
-            @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                long taskId = taskService.addTask(getTask(configuration, exchange), getParameters(configuration, exchange));
-                setResult(exchange, taskId);
-            }
-        }, claimNextAvailableTask {
-            @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.claimNextAvailable(getUserId(configuration, exchange), getLanguage(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.activateTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, claimTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.claim(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.claimTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, completeTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.complete(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getParameters(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.completeAutoProgress(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getParameters(configuration, exchange));
             }
         }, delegateTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.delegate(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getTargetUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.delegateTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getTargetUserId(configuration, exchange));
             }
         }, exitTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.exit(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.exitTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, failTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.fail(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getParameters(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.failTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getParameters(configuration, exchange));
             }
         }, getAttachment {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Attachment attachment = taskService.getAttachmentById(safe(getAttachmentId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                TaskAttachment attachment = taskClient.getTaskAttachmentById(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), safe(getAttachmentId(configuration, exchange)));
                 setResult(exchange, attachment);
             }
-        }, getContent {
-            @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Content content = taskService.getContentById(safe(getContentId(configuration, exchange)));
-                setResult(exchange, content);
-            }
         }, getTasksAssignedAsBusinessAdministrator {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                List<TaskSummary> taskSummaries = taskService.getTasksAssignedAsBusinessAdministrator(getUserId(configuration, exchange), getLanguage(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                List<TaskSummary> taskSummaries = taskClient.findTasksAssignedAsBusinessAdministrator(getUserId(configuration, exchange), getPage(configuration, exchange), getPageSize(configuration, exchange));
                 setResult(exchange, taskSummaries);
             }
         }, getTasksAssignedAsPotentialOwnerByStatus {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.getTasksAssignedAsPotentialOwnerByStatus(getUserId(configuration, exchange), getStatuses(configuration, exchange), getLanguage(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                List<TaskSummary> taskSummaries = taskClient.findTasksAssignedAsPotentialOwner(getUserId(configuration, exchange), getStatuses(configuration, exchange), getPage(configuration, exchange), getPageSize(configuration, exchange));
+                setResult(exchange, taskSummaries);
             }
         }, getTaskByWorkItem {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Task task = taskService.getTaskByWorkItemId(safe(getWorkItemId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                TaskInstance task = taskClient.findTaskByWorkItemId(safe(getWorkItemId(configuration, exchange)));
                 setResult(exchange, task);
             }
         }, getTaskBy {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Task task = taskService.getTaskById(safe(getTaskId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                TaskInstance task = taskClient.findTaskById(safe(getTaskId(configuration, exchange)));
                 setResult(exchange, task);
             }
         }, getTaskContent {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                Map<String, Object> taskContent = taskService.getTaskContent(safe(getTaskId(configuration, exchange)));
-                setResult(exchange, taskContent);
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                Map<String, Object> content = taskClient.getTaskOutputContentByTaskId(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)));
+                setResult(exchange, content);
             }
         }, getTasksByProcessInstance {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                List<Long> processInstanceIds = taskService.getTasksByProcessInstanceId(safe(getProcessInstanceId(configuration, exchange)));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                List<TaskSummary> processInstanceIds = taskClient.findTasksByStatusByProcessInstanceId(safe(getProcessInstanceId(configuration, exchange)), Collections.emptyList(),
+                        getPage(configuration, exchange), getPageSize(configuration, exchange));
                 setResult(exchange, processInstanceIds);
             }
         }, getTasksByStatusByProcessInstance {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                List<TaskSummary> taskSummaryList = taskService.getTasksByStatusByProcessInstanceId(
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                List<TaskSummary> taskSummaryList = taskClient.findTasksByStatusByProcessInstanceId(
                         safe(getProcessInstanceId(configuration, exchange)), getStatuses(configuration, exchange),
-                        getLanguage(configuration, exchange));
+                        getPage(configuration, exchange), getPageSize(configuration, exchange));
                 setResult(exchange, taskSummaryList);
             }
         }, getTasksOwned {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                List<TaskSummary> summaryList = taskService.getTasksOwned(getUserId(configuration, exchange), getLanguage(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                List<TaskSummary> summaryList = taskClient.findTasksOwned(getUserId(configuration, exchange), getPage(configuration, exchange), getPageSize(configuration, exchange));
                 setResult(exchange, summaryList);
             }
         }, nominateTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.nominate(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getEntities(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.nominateTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange), getEntities(configuration, exchange));
             }
         }, releaseTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.release(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.releaseTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, resumeTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.resume(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.resumeTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, skipTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.skip(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.skipTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, startTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.start(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.startTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, stopTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.stop(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.stopTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         }, suspendTask {
             @Override
-            void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange) {
-                taskService.suspend(safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
+            void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange) {
+                UserTaskServicesClient taskClient = kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+                taskClient.suspendTask(configuration.getDeploymentId(), safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
             }
         };
 
-        List<Status> getStatuses(JBPMConfiguration configuration, Exchange exchange) {
-            List<Status> statusList = exchange.getIn().getHeader(JBPMConstants.STATUS_LIST, List.class);
+        List<String> getStatuses(JBPMConfiguration configuration, Exchange exchange) {
+            List<String> statusList = exchange.getIn().getHeader(JBPMConstants.STATUS_LIST, List.class);
             if (statusList == null) {
                 statusList = configuration.getStatuses();
             }
             return statusList;
         }
 
-        List<OrganizationalEntity> getEntities(JBPMConfiguration configuration, Exchange exchange) {
-            List<OrganizationalEntity> entityList = exchange.getIn().getHeader(JBPMConstants.ENTITY_LIST, List.class);
+        List<String> getEntities(JBPMConfiguration configuration, Exchange exchange) {
+            List<String> entityList = exchange.getIn().getHeader(JBPMConstants.ENTITY_LIST, List.class);
             if (entityList == null) {
                 entityList = configuration.getEntities();
             }
@@ -350,12 +375,20 @@ public class JBPMProducer extends DefaultProducer {
             return userId;
         }
 
-        String getLanguage(JBPMConfiguration configuration, Exchange exchange) {
-            String language = exchange.getIn().getHeader(JBPMConstants.LANGUAGE, String.class);
-            if (language == null) {
-                language = configuration.getLanguage();
+        Integer getPage(JBPMConfiguration configuration, Exchange exchange) {
+            Integer page = exchange.getIn().getHeader(JBPMConstants.RESULT_PAGE, Integer.class);
+            if (page == null) {
+                page = configuration.getPage();
+            }
+            return page;
+        }
+        
+        Integer getPageSize(JBPMConfiguration configuration, Exchange exchange) {
+            Integer pageSize = exchange.getIn().getHeader(JBPMConstants.RESULT_PAGE_SIZE, Integer.class);
+            if (pageSize == null) {
+                pageSize = configuration.getPageSize();
             }
-            return language;
+            return pageSize;
         }
 
         Task getTask(JBPMConfiguration configuration, Exchange exchange) {
@@ -466,7 +499,7 @@ public class JBPMProducer extends DefaultProducer {
             getResultMessage(exchange).setBody(result);
         }
 
-        abstract void execute(KieSession kieSession, TaskService taskService, JBPMConfiguration configuration, Exchange exchange);
+        abstract void execute(KieServicesClient kieServicesClient, JBPMConfiguration configuration, Exchange exchange);
     }
 }
 
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java
new file mode 100644
index 0000000..0886466
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.emitters;
+
+import java.util.Collection;
+
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.jbpm.persistence.api.integration.EventCollection;
+import org.jbpm.persistence.api.integration.EventEmitter;
+import org.jbpm.persistence.api.integration.InstanceView;
+import org.jbpm.persistence.api.integration.base.BaseEventCollection;
+
+public class CamelEventEmitter implements EventEmitter {
+    
+    private JBPMConsumer consumer;
+    private boolean sendItems;
+    
+    public CamelEventEmitter(JBPMConsumer consumer, boolean sendItems) {
+        this.consumer = consumer; 
+        this.sendItems = sendItems;
+    }
+
+    @Override
+    public void deliver(Collection<InstanceView<?>> data) {
+        // no-op
+        
+    }
+
+    @Override
+    public void apply(Collection<InstanceView<?>> data) {
+        if (consumer == null || data.isEmpty()) {
+            return;
+        }
+        
+        if (sendItems) {
+            
+            data.forEach(item -> consumer.sendMessage("Emitter", item));
+        } else {
+        
+            consumer.sendMessage("Emitter", data);
+        }
+    }
+
+    @Override
+    public void drop(Collection<InstanceView<?>> data) {
+        // no-op
+        
+    }
+
+    @Override
+    public EventCollection newCollection() {
+        return new BaseEventCollection();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java
new file mode 100644
index 0000000..c531aa7
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.emitters;
+
+import java.util.Collection;
+
+import org.jbpm.persistence.api.integration.EventCollection;
+import org.jbpm.persistence.api.integration.EventEmitter;
+import org.jbpm.persistence.api.integration.InstanceView;
+import org.jbpm.services.api.service.ServiceRegistry;
+
+public class ServiceRegistryBoundEventEmitter implements EventEmitter {
+    
+    private EventEmitter delegate;
+    
+    public ServiceRegistryBoundEventEmitter() {
+        this.delegate = (EventEmitter) ServiceRegistry.get().service("CamelEventEmitter");
+    }
+
+    @Override
+    public void deliver(Collection<InstanceView<?>> data) {
+        delegate.deliver(data);
+        
+    }
+
+    @Override
+    public void apply(Collection<InstanceView<?>> data) {
+        delegate.apply(data);
+    }
+
+    @Override
+    public void drop(Collection<InstanceView<?>> data) {
+        delegate.drop(data);
+        
+    }
+
+    @Override
+    public EventCollection newCollection() {
+        return delegate.newCollection();
+    }
+
+    @Override
+    public void close() {
+ 
+    }
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java
new file mode 100644
index 0000000..4906b5b
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.jbpm.casemgmt.api.event.CaseCancelEvent;
+import org.jbpm.casemgmt.api.event.CaseCloseEvent;
+import org.jbpm.casemgmt.api.event.CaseCommentEvent;
+import org.jbpm.casemgmt.api.event.CaseDataEvent;
+import org.jbpm.casemgmt.api.event.CaseDestroyEvent;
+import org.jbpm.casemgmt.api.event.CaseDynamicSubprocessEvent;
+import org.jbpm.casemgmt.api.event.CaseDynamicTaskEvent;
+import org.jbpm.casemgmt.api.event.CaseEventListener;
+import org.jbpm.casemgmt.api.event.CaseReopenEvent;
+import org.jbpm.casemgmt.api.event.CaseRoleAssignmentEvent;
+import org.jbpm.casemgmt.api.event.CaseStartEvent;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelCaseEventListener implements CaseEventListener, Cacheable, JBPMCamelConsumerAware {    
+    
+    private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+    
+    @Override
+    public void beforeCaseStarted(CaseStartEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseStarted", event);
+    }
+
+    @Override
+    public void afterCaseStarted(CaseStartEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseStarted", event);
+    }
+
+    @Override
+    public void beforeCaseClosed(CaseCloseEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseClosed", event);
+    }
+
+    @Override
+    public void afterCaseClosed(CaseCloseEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseClosed", event);
+    }
+
+    @Override
+    public void beforeCaseCancelled(CaseCancelEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseCancelled", event);
+    }
+
+    @Override
+    public void afterCaseCancelled(CaseCancelEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseCancelled", event);
+    }
+
+    @Override
+    public void beforeCaseDestroyed(CaseDestroyEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseDestroyed", event);
+    }
+
+    @Override
+    public void afterCaseDestroyed(CaseDestroyEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseDestroyed", event);
+    }
+
+    @Override
+    public void beforeCaseReopen(CaseReopenEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseReopen", event);
+    }
+
+    @Override
+    public void afterCaseReopen(CaseReopenEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseReopen", event);
+    }
+
+    @Override
+    public void beforeCaseCommentAdded(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseCommentAdded", event);
+    }
+
+    @Override
+    public void afterCaseCommentAdded(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseCommentAdded", event);
+    }
+
+    @Override
+    public void beforeCaseCommentUpdated(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseCommentUpdated", event);
+    }
+
+    @Override
+    public void afterCaseCommentUpdated(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseCommentUpdated", event);
+    }
+
+    @Override
+    public void beforeCaseCommentRemoved(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseCommentRemoved", event);
+    }
+
+    @Override
+    public void afterCaseCommentRemoved(CaseCommentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseCommentRemoved", event);
+    }
+
+    @Override
+    public void beforeCaseRoleAssignmentAdded(CaseRoleAssignmentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseRoleAssignmentAdded", event);
+    }
+
+    @Override
+    public void afterCaseRoleAssignmentAdded(CaseRoleAssignmentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseRoleAssignmentAdded", event);
+    }
+
+    @Override
+    public void beforeCaseRoleAssignmentRemoved(CaseRoleAssignmentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseRoleAssignmentRemoved", event);
+    }
+
+    @Override
+    public void afterCaseRoleAssignmentRemoved(CaseRoleAssignmentEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseRoleAssignmentRemoved", event);
+    }
+
+    @Override
+    public void beforeCaseDataAdded(CaseDataEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseDataAdded", event);
+    }
+
+    @Override
+    public void afterCaseDataAdded(CaseDataEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseDataAdded", event);
+    }
+
+    @Override
+    public void beforeCaseDataRemoved(CaseDataEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeCaseDataRemoved", event);
+    }
+
+    @Override
+    public void afterCaseDataRemoved(CaseDataEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterCaseDataRemoved", event);
+    }
+
+    @Override
+    public void beforeDynamicTaskAdded(CaseDynamicTaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeDynamicTaskAdded", event);
+    }
+
+    @Override
+    public void afterDynamicTaskAdded(CaseDynamicTaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterDynamicTaskAdded", event);
+    }
+
+    @Override
+    public void beforeDynamicProcessAdded(CaseDynamicSubprocessEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeDynamicProcessAdded", event);
+    }
+
+    @Override
+    public void afterDynamicProcessAdded(CaseDynamicSubprocessEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterDynamicProcessAdded", event);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void addConsumer(JBPMConsumer consumer) {
+        this.consumers.add(consumer);
+    }
+
+    @Override
+    public void removeConsumer(JBPMConsumer consumer) {
+        this.consumers.remove(consumer);
+    }
+    
+    protected void sendMessage(String eventType, Object event) {
+        this.consumers.stream().filter(c -> c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+    }
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java
new file mode 100644
index 0000000..2e6ff08
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.kie.api.event.process.ProcessCompletedEvent;
+import org.kie.api.event.process.ProcessEventListener;
+import org.kie.api.event.process.ProcessNodeLeftEvent;
+import org.kie.api.event.process.ProcessNodeTriggeredEvent;
+import org.kie.api.event.process.ProcessStartedEvent;
+import org.kie.api.event.process.ProcessVariableChangedEvent;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelProcessEventListener implements ProcessEventListener, Cacheable, JBPMCamelConsumerAware {
+
+    private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+
+    @Override
+    public void beforeProcessStarted(ProcessStartedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+
+        sendMessage("beforeProcessStarted", event);
+    }
+
+    @Override
+    public void afterProcessStarted(ProcessStartedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        
+        sendMessage("afterProcessStarted", event);
+    }
+
+    @Override
+    public void beforeProcessCompleted(ProcessCompletedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeProcessCompleted", event);
+    }
+
+    @Override
+    public void afterProcessCompleted(ProcessCompletedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterProcessCompleted", event);
+    }
+
+    @Override
+    public void beforeNodeTriggered(ProcessNodeTriggeredEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeNodeTriggered", event);
+    }
+
+    @Override
+    public void afterNodeTriggered(ProcessNodeTriggeredEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterNodeTriggered", event);
+    }
+
+    @Override
+    public void beforeNodeLeft(ProcessNodeLeftEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeNodeLeft", event);
+    }
+
+    @Override
+    public void afterNodeLeft(ProcessNodeLeftEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterNodeLeft", event);
+    }
+
+    @Override
+    public void beforeVariableChanged(ProcessVariableChangedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeVariableChanged", event);
+    }
+
+    @Override
+    public void afterVariableChanged(ProcessVariableChangedEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterVariableChanged", event);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void addConsumer(JBPMConsumer consumer) {
+        this.consumers.add(consumer);
+    }
+
+    @Override
+    public void removeConsumer(JBPMConsumer consumer) {
+        this.consumers.remove(consumer);
+    }
+    
+    protected void sendMessage(String eventType, Object event) {
+        this.consumers.stream().filter(c -> c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+    }
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java
new file mode 100644
index 0000000..67a93b7
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.kie.api.task.TaskEvent;
+import org.kie.api.task.TaskLifeCycleEventListener;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelTaskEventListener implements Cacheable, TaskLifeCycleEventListener, JBPMCamelConsumerAware {
+
+    private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+    
+    @Override
+    public void beforeTaskActivatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskActivatedEvent", event);
+    }
+
+    @Override
+    public void beforeTaskClaimedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskClaimedEvent", event);
+    }
+
+    @Override
+    public void beforeTaskSkippedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskSkippedEvent", event);
+    }
+
+    @Override
+    public void beforeTaskStartedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskStartedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskStoppedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskStoppedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskCompletedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskCompletedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskFailedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskFailedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskAddedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskAddedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskExitedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskExitedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskReleasedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskReleasedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskResumedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskResumedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskSuspendedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskSuspendedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskForwardedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskForwardedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskDelegatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskDelegatedEvent", event);
+
+    }
+
+    @Override
+    public void beforeTaskNominatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("beforeTaskNominatedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskActivatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskActivatedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskClaimedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskClaimedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskSkippedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskSkippedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskStartedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskStartedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskStoppedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskStoppedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskCompletedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskCompletedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskFailedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskFailedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskAddedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskAddedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskExitedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskExitedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskReleasedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskReleasedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskResumedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskResumedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskSuspendedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskSuspendedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskForwardedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskForwardedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskDelegatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskDelegatedEvent", event);
+
+    }
+
+    @Override
+    public void afterTaskNominatedEvent(TaskEvent event) {
+        if (consumers.isEmpty()) {
+            return;
+        }
+        sendMessage("afterTaskNominatedEvent", event);
+
+    }
+
+    @Override
+    public void close() {        
+
+    }
+
+    @Override
+    public void addConsumer(JBPMConsumer consumer) {
+        this.consumers.add(consumer);        
+    }
+
+    @Override
+    public void removeConsumer(JBPMConsumer consumer) {
+        this.consumers.remove(consumer);
+    }
+    
+    protected void sendMessage(String eventType, Object event) {
+        this.consumers.stream().filter(c -> c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+    }
+
+
+}
diff --git a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java
new file mode 100644
index 0000000..3a8a0ea
--- /dev/null
+++ b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.server;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RoutesDefinition;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.server.services.api.KieContainerInstance;
+import org.kie.server.services.api.KieServerExtension;
+import org.kie.server.services.api.KieServerRegistry;
+import org.kie.server.services.api.SupportedTransports;
+import org.kie.server.services.impl.KieServerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CamelKieServerExtension implements KieServerExtension {
+    public static final String EXTENSION_NAME = "Camel";
+
+    private static final Logger logger = LoggerFactory.getLogger(CamelKieServerExtension.class);
+
+    private static final Boolean disabled = Boolean.parseBoolean(System.getProperty("org.camel.server.ext.disabled", "false"));
+
+    protected DefaultCamelContext camel;
+    
+    protected boolean managedCamel;
+
+    protected Map<String, DefaultCamelContext> camelContexts = new HashMap<>();
+    
+    public CamelKieServerExtension() {
+        this.managedCamel = true;
+    }
+    
+    public CamelKieServerExtension(DefaultCamelContext camel) {
+        this.camel = camel;
+        this.managedCamel = false;
+    }
+    
+    @Override
+    public boolean isInitialized() {
+        return camel != null;
+    }
+
+    @Override
+    public boolean isActive() {
+        return disabled == false;
+    }
+
+    @Override
+    public void init(KieServerImpl kieServer, KieServerRegistry registry) {
+        if (this.managedCamel && this.camel == null) {
+            this.camel = new DefaultCamelContext();
+            this.camel.setName("KIE Server Camel context");
+            
+            try (InputStream is = this.getClass().getResourceAsStream("/global-camel-routes.xml")) {
+                if (is != null) {
+                                        
+                    RoutesDefinition routes = camel.loadRoutesDefinition(is);                                  
+                    camel.addRouteDefinitions(routes.getRoutes());            
+                }
+            } catch (Exception e) {
+                logger.error("Error while adding Camel context for KIE Server", e);
+            }
+        }
+        
+        ServiceRegistry.get().register("GlobalCamelService", this.camel);
+    }
+
+    @Override
+    public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
+        ServiceRegistry.get().remove("GlobalCamelService");
+        
+        if (this.managedCamel && this.camel != null) {
+            try {
+                this.camel.stop();
+            } catch (Exception e) {
+                logger.error("Failed at stopping KIE Server extension {}", EXTENSION_NAME);
+            }
+        }
+    }
+
+    @Override
+    public void createContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
+        
+        ClassLoader classloader = kieContainerInstance.getKieContainer().getClassLoader();
+        try (InputStream is = classloader.getResourceAsStream("camel-routes.xml")) {
+            if (is != null) {
+                
+                DefaultCamelContext context = new DefaultCamelContext();
+                context.setName("KIE Server Camel context for container " + kieContainerInstance.getContainerId());                            
+                
+                RoutesDefinition routes = context.loadRoutesDefinition(is);              
+                annotateKJarRoutes(routes, id);
+                context.addRouteDefinitions(routes.getRoutes());
+                context.start();
+                camelContexts.put(id, context);
+                
+                ServiceRegistry.get().register(id + "_CamelService", this.camel);
+               
+            }
+        } catch (Exception e) {
+            logger.error("Error while adding Camel context for {}", kieContainerInstance.getContainerId(), e);
+        }
+    }
+
+    @Override
+    public void updateContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
+        disposeContainer(id, kieContainerInstance, parameters);
+        createContainer(id, kieContainerInstance, parameters);
+    }
+
+    @Override
+    public boolean isUpdateContainerAllowed(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
+        return true;
+    }
+
+    @Override
+    public void disposeContainer(String id, KieContainerInstance kieContainerInstance, Map<String, Object> parameters) {
+        DefaultCamelContext context = camelContexts.get(id);
+        
+        if (context != null) {
+            
+            ServiceRegistry.get().remove(id + "_CamelService");
+            try {
+                context.stop();
+            } catch (Exception e) {
+                logger.error("Error while removing Camel context for container {}", id, e);
+            }
+        }
+    }
+
+    @Override
+    public List<Object> getAppComponents(SupportedTransports type) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <T> T getAppComponents(Class<T> serviceType) {
+        return null;
+    }
+
+    @Override
+    public String getImplementedCapability() {
+        return "Integration";
+    }
+
+    @Override
+    public List<Object> getServices() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String getExtensionName() {
+        return EXTENSION_NAME;
+    }
+
+    @Override
+    public Integer getStartOrder() {
+        return 50;
+    }
+    
+    @Override
+    public void serverStarted() {
+        if (this.managedCamel && this.camel != null && !this.camel.isStarted()) {
+            try {
+                this.camel.start();
+            } catch (Exception e) {
+                logger.error("Failed at start Camel context", e);
+            }
+        }
+    }
+    
+    @Override
+    public String toString() {
+        return EXTENSION_NAME + " KIE Server extension";
+    }
+
+    protected void annotateKJarRoutes(RoutesDefinition routes, String deploymentId) {
+        for (RouteDefinition route : routes.getRoutes()) {
+            
+            for (FromDefinition from : route.getInputs()) {
+                
+                if (from.getUri().startsWith("jbpm:events") && !from.getUri().contains("deploymentId")) {
+                    StringBuilder uri = new StringBuilder(from.getUri());
+                    
+                    String[] split = from.getUri().split("\\?");
+                    if (split.length == 1) {
+                        // no query given
+                        uri.append("?");
+                    } else {
+                        // already query params exist
+                        uri.append("&");
+                    }
+                    uri.append("deploymentId=").append(deploymentId);
+                    from.setUri(uri.toString());
+                }
+                
+                System.out.println(from.getUri());
+            }
+        }
+    }
+}
diff --git a/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension b/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension
new file mode 100644
index 0000000..a3aed9f
--- /dev/null
+++ b/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension
@@ -0,0 +1 @@
+org.apache.camel.component.jbpm.server.CamelKieServerExtension
\ No newline at end of file
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
index 987b0a6..18e1055 100644
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
+++ b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
@@ -17,21 +17,74 @@
 
 package org.apache.camel.component.jbpm;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jbpm.JBPMProducer.Operation;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.kie.server.api.model.instance.TaskSummary;
 
-@Ignore("This is an integration test that needs BPMS running on the local machine")
+/**
+ * To run this example you need jBPM to run locally, easiest is to use single zip 
+ * distribution - download from jbpm.org
+ * 
+ * Next, start it and import Evaluation sample project, build and deploy.
+ * Once done this test can be ran out of the box.
+ */
+@Ignore("This is an integration test that needs jBPM running on the local machine")
 public class JBPMComponentIntegrationTest extends CamelTestSupport {
 
+    @SuppressWarnings("unchecked")
     @Test
     public void interactsOverRest() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(1);
-        template.sendBodyAndHeader("direct:start", null, JBPMConstants.PROCESS_ID, "project1.integration-test");
+        
+        // let's start process instance for evaluation process
+        Map<String, Object> params = new HashMap<>();
+        params.put("employee", "wbadmin");
+        params.put("reason", "Camel asks for it");
+        
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(JBPMConstants.PROCESS_ID, "evaluation");
+        headers.put(JBPMConstants.PARAMETERS, params);
+        
+        template.sendBodyAndHeaders("direct:start", null, headers);
+        assertMockEndpointsSatisfied();
+        Long processInstanceId = (Long) getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody();
+        assertNotNull(processInstanceId);
+        
+        // now let's collect user tasks
+        headers = new HashMap<>();        
+        headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION + Operation.getTasksOwned);
+        
+        template.sendBodyAndHeaders("direct:start", null, headers);
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+        assertMockEndpointsSatisfied();
+        
+        List<TaskSummary> tasks = (List<TaskSummary>) getMockEndpoint("mock:result").getExchanges().get(1).getIn().getBody();
+        assertEquals(1, tasks.size());
+        
+        // let's complete first user task
+        headers = new HashMap<>();
+        headers.put(JBPMConstants.TASK_ID, tasks.get(0).getId());
+        headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION + Operation.completeTask);
+        
+        template.sendBodyAndHeaders("direct:start", null, headers);
+        getMockEndpoint("mock:result").expectedMessageCount(3);
+        assertMockEndpointsSatisfied();
+        
+        // lastly let's abort process instance we just created
+        headers = new HashMap<>();
+        headers.put(JBPMConstants.PROCESS_INSTANCE_ID, processInstanceId);
+        headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION + Operation.abortProcessInstance);
+        
+        template.sendBodyAndHeaders("direct:start", null, headers);
+        getMockEndpoint("mock:result").expectedMessageCount(4);
         assertMockEndpointsSatisfied();
-
-        assertNotNull(getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody());
     }
 
     @Override
@@ -40,8 +93,8 @@ public class JBPMComponentIntegrationTest extends CamelTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=pa$word1"
-                            + "&deploymentId=org.kie.example:project1:1.0.0-SNAPSHOT")
+                        .to("jbpm:http://localhost:8080/kie-server/services/rest/server?userName=wbadmin&password=wbadmin"
+                            + "&deploymentId=evaluation")
                         .to("mock:result");
             }
         };
diff --git a/parent/pom.xml b/parent/pom.xml
index 6d7a82b..69253d4 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -360,7 +360,7 @@
     <javax.ws.rs-api-version>2.0.1</javax.ws.rs-api-version>
     <jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
     <jaxen-version>1.1.6</jaxen-version>
-    <jbpm-version>6.5.0.Final</jbpm-version>
+    <jbpm-version>7.14.0.Final</jbpm-version>
     <jboss-javaee-6-version>1.0.0.Final</jboss-javaee-6-version>
     <jboss-logging-version>3.3.2.Final</jboss-logging-version>
     <jboss-marshalling-version>1.4.10.Final</jboss-marshalling-version>
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 51a9988..0b6cb7d 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1149,27 +1149,41 @@
     <details>The camel-jbpm feature can only run on a SUN JVM. You need to add the package com.sun.tools.xjc to the java platform packages in the etc/jre.properties file.</details>
     <feature version='${project.version}'>camel-core</feature>
     <feature>transaction</feature>
-    <bundle dependency='true'>mvn:org.openengsb.wrapped/com.google.protobuf/2.4.1.w1</bundle>
-    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.javax-inject/${javax-inject-bundle-version}</bundle>
-    <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-jaxrs/${jackson-version}</bundle>
-    <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-core-asl/${jackson-version}</bundle>
-    <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-mapper-asl/${jackson-version}</bundle>
-    <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-xc/${jackson-version}</bundle>
-    <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo-jms-spec-version}</bundle>
-    <bundle dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.jaxws-api-2.2/${servicemix-specs-version}</bundle>
+    <feature version='${cxf-version-range}'>cxf-jaxrs</feature>
+    <bundle dependency='true'>mvn:org.kie.server/kie-server-api/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.server/kie-server-common/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.server/kie-server-client/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.soup/kie-soup-maven-support/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.soup/kie-soup-project-datamodel-api/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.soup/kie-soup-project-datamodel-commons/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie.soup/kie-soup-commons/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie/kie-api/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie/kie-internal/${jbpm-version}</bundle>    
     <bundle dependency='true'>mvn:org.drools/drools-core/${jbpm-version}</bundle>
     <bundle dependency='true'>mvn:org.drools/drools-compiler/${jbpm-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie/kie-internal/${jbpm-version}</bundle>
-    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream-java8/${xstream-bundle-version}</bundle>
+    <bundle dependency='true'>mvn:org.mvel/mvel2/${mvel-version}</bundle>    
+    <bundle dependency='true'>mvn:org.kie/kie-dmn-model/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.kie/kie-dmn-api/${jbpm-version}</bundle>   
+    <bundle dependency='true'>mvn:org.optaplanner/optaplanner-core/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.optaplanner/optaplanner-persistence-common/${jbpm-version}</bundle>  
+    <bundle dependency='true'>mvn:org.optaplanner/optaplanner-persistence-jaxb/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.optaplanner/optaplanner-persistence-jackson/${jbpm-version}</bundle>
+    <bundle dependency='true'>mvn:org.optaplanner/optaplanner-persistence-xstream/${jbpm-version}</bundle>  
+    <bundle dependency='true'>mvn:com.google.protobuf/protobuf-java/${protobuf-version}</bundle>
+    <bundle dependency='true'>mvn:com.google.guava/guava/${google-guava-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.commons/commons-math3/${commons-math3-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.commons/commons-lang3/${commons-lang3-version}</bundle>
     <bundle dependency='true'>mvn:commons-codec/commons-codec/${commons-codec-version}</bundle>
-    <bundle dependency='true'>mvn:org.mvel/mvel2/${mvel-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie/kie-api/${jbpm-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie.remote/kie-remote-common/${jbpm-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie.remote.ws/kie-remote-ws-common/${jbpm-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie.remote/kie-remote-jaxb/${jbpm-version}</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie.remote/kie-services-client/6.1.0.Final</bundle>
-    <bundle dependency='true'>wrap:mvn:org.kie.remote/kie-remote-client/${jbpm-version}</bundle>
-    <bundle>mvn:org.apache.camel/camel-jbpm/${project.version}</bundle>
+    <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-annotations/${jackson2-version}</bundle>
+    <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-core/${jackson2-version}</bundle>
+    <bundle dependency='true'>mvn:com.fasterxml.jackson.core/jackson-databind/${jackson2-version}</bundle>
+    <bundle dependency='true'>mvn:com.fasterxml.jackson.module/jackson-module-jaxb-annotations/${jackson2-version}</bundle>    
+    <bundle dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.jaxws-api-2.2/${servicemix-specs-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream-java8/${xstream-bundle-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xpp3/${xpp3-bundle-version}</bundle>    
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.reflections/${reflections-bundle-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jaxb-xjc/${jaxb-bundle-version}</bundle>    
+    <bundle>mvn:org.apache.camel/camel-jbpm/${project.version}</bundle> 
   </feature>
   <feature name='camel-jcache' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>


Mime
View raw message