camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [camel-k-runtime] 03/08: kamelets: polished #375
Date Thu, 30 Jul 2020 09:15:03 GMT
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelets-claus
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 18f15c8ae87ce3f9101dfacad958dd5efcc416e9
Author: Claus Ibsen <claus.ibsen@gmail.com>
AuthorDate: Wed Jul 29 16:54:00 2020 +0200

    kamelets: polished #375
---
 camel-kamelet/pom.xml                              |  2 +-
 .../camel/component/kamelet/KameletEndpoint.java   | 35 ++++++++++------------
 2 files changed, 16 insertions(+), 21 deletions(-)

diff --git a/camel-kamelet/pom.xml b/camel-kamelet/pom.xml
index 33e09c3..7083b87 100644
--- a/camel-kamelet/pom.xml
+++ b/camel-kamelet/pom.xml
@@ -43,7 +43,7 @@
 
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-core-engine</artifactId>
+            <artifactId>camel-support</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 22cd543..d3f0eac 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.kamelet;
 
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -26,9 +28,9 @@ import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.support.service.ServiceHelper;
 
 @UriEndpoint(
@@ -81,7 +83,6 @@ public class KameletEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         Consumer answer = new KemeletConsumer(processor);
         configureConsumer(answer);
-
         return answer;
     }
 
@@ -117,52 +118,46 @@ public class KameletEndpoint extends DefaultEndpoint {
             endpoint = getCamelContext().getEndpoint(kameletUri);
             consumer = endpoint.createConsumer(getProcessor());
 
-            ServiceHelper.startService(endpoint);
-            ServiceHelper.startService(consumer);
-
+            ServiceHelper.startService(endpoint, consumer);
             super.doStart();
         }
 
         @Override
         protected void doStop() throws Exception {
-            ServiceHelper.stopService(endpoint);
-            ServiceHelper.stopService(consumer);
-
+            ServiceHelper.stopService(consumer, endpoint);
             super.doStop();
         }
     }
 
-    private class KameletProducer extends DefaultProducer {
+    private class KameletProducer extends DefaultAsyncProducer {
         private volatile Endpoint endpoint;
-        private volatile Producer producer;
+        private volatile AsyncProducer producer;
 
         public KameletProducer() {
             super(KameletEndpoint.this);
         }
 
         @Override
-        public void process(Exchange exchange) throws Exception {
+        public boolean process(Exchange exchange, AsyncCallback callback) {
             if (producer != null) {
-                producer.process(exchange);
+                return producer.process(exchange, callback);
+            } else {
+                callback.done(true);
+                return true;
             }
         }
 
         @Override
         protected void doStart() throws Exception {
             endpoint = getCamelContext().getEndpoint(kameletUri);
-            producer = endpoint.createProducer();
-
-            ServiceHelper.startService(endpoint);
-            ServiceHelper.startService(producer);
-
+            producer = endpoint.createAsyncProducer();
+            ServiceHelper.startService(endpoint, producer);
             super.doStart();
         }
 
         @Override
         protected void doStop() throws Exception {
-            ServiceHelper.stopService(endpoint);
-            ServiceHelper.stopService(producer);
-
+            ServiceHelper.stopService(producer, endpoint);
             super.doStop();
         }
     }


Mime
View raw message