camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject [camel-k-runtime] branch master updated: Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126
Date Mon, 19 Aug 2019 08:54:33 GMT
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 53256bc  Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126
53256bc is described below

commit 53256bcdf4f0123be452ad835e17b50965b88f5a
Author: lburgazzoli <lburgazzoli@gmail.com>
AuthorDate: Wed Aug 14 00:37:21 2019 +0200

    Base knative-http component to vertx-web as the next undertow major release (3) will be based on it #126
---
 camel-k-loader-kotlin/pom.xml                      |   1 -
 camel-knative-http/pom.xml                         |  11 +-
 .../camel/component/knative/http/KnativeHttp.java  | 109 +------
 .../component/knative/http/KnativeHttpBinding.java | 318 ---------------------
 .../knative/http/KnativeHttpClientCallback.java    | 242 ----------------
 .../knative/http/KnativeHttpComponent.java         | 166 ++++++++---
 .../knative/http/KnativeHttpConsumer.java          | 234 +++++++++++----
 .../http/KnativeHttpConsumerDispatcher.java        | 193 +++++++++++++
 .../knative/http/KnativeHttpDispatcher.java        | 181 ------------
 .../knative/http/KnativeHttpEndpoint.java          |  38 +--
 .../http/KnativeHttpHeaderFilterStrategy.java      |   3 +-
 .../knative/http/KnativeHttpProducer.java          | 178 +++++++-----
 .../component/knative/http/KnativeHttpSupport.java |  49 ++++
 .../component/knative/http/KnativeHttpTest.java    | 252 ++++++++--------
 .../component/knative/CloudEventsV01Test.java      |  14 +-
 .../component/knative/CloudEventsV02Test.java      |  14 +-
 .../component/knative/KnativeComponentTest.java    | 126 +-------
 pom.xml                                            |   1 +
 18 files changed, 829 insertions(+), 1301 deletions(-)

diff --git a/camel-k-loader-kotlin/pom.xml b/camel-k-loader-kotlin/pom.xml
index f7c3b75..7ca24f4 100644
--- a/camel-k-loader-kotlin/pom.xml
+++ b/camel-k-loader-kotlin/pom.xml
@@ -177,7 +177,6 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
-                    <useSystemClassLoader>false</useSystemClassLoader>
                     <forkCount>0</forkCount>
                 </configuration>
             </plugin>
diff --git a/camel-knative-http/pom.xml b/camel-knative-http/pom.xml
index 08c7a42..44b7e06 100644
--- a/camel-knative-http/pom.xml
+++ b/camel-knative-http/pom.xml
@@ -53,9 +53,14 @@
         </dependency>
 
         <dependency>
-            <groupId>io.undertow</groupId>
-            <artifactId>undertow-core</artifactId>
-            <version>${undertow.version}</version>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-web</artifactId>
+            <version>${vertx.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-web-client</artifactId>
+            <version>${vertx.version}</version>
         </dependency>
 
         <dependency>
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
index ac5c713..aad1725 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
@@ -17,21 +17,26 @@
 package org.apache.camel.component.knative.http;
 
 import java.util.Objects;
-import javax.net.ssl.SSLContext;
+import java.util.regex.Pattern;
+
+import io.vertx.core.Handler;
+import io.vertx.core.http.HttpServerRequest;
 
 public final class KnativeHttp {
+    public static final int DEFAULT_PORT = 8080;
+    public static final String DEFAULT_PATH = "/";
+    public static final Pattern ENDPOINT_PATTERN = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)");
+
     private KnativeHttp() {
     }
 
-    public static final class HostKey {
+    public static final class ServerKey {
         private final String host;
         private final int port;
-        private final SSLContext sslContext;
 
-        public HostKey(String host, int port, SSLContext ssl) {
+        public ServerKey(String host, int port) {
             this.host = host;
             this.port = port;
-            this.sslContext = ssl;
         }
 
         public String getHost() {
@@ -42,10 +47,6 @@ public final class KnativeHttp {
             return port;
         }
 
-        public SSLContext getSslContext() {
-            return sslContext;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -54,9 +55,8 @@ public final class KnativeHttp {
             if (o == null || getClass() != o.getClass()) {
                 return false;
             }
-            HostKey key = (HostKey) o;
-            return getPort() == key.getPort()
-                && getHost().equals(key.getHost());
+            ServerKey key = (ServerKey) o;
+            return getPort() == key.getPort() && getHost().equals(key.getHost());
         }
 
         @Override
@@ -65,88 +65,7 @@ public final class KnativeHttp {
         }
     }
 
-
-    /**
-     * Options to configure an Undertow host.
-     */
-    public static final class HostOptions {
-
-        /**
-         * The number of worker threads to use in a Undertow host.
-         */
-        private Integer workerThreads;
-
-        /**
-         * The number of io threads to use in a Undertow host.
-         */
-        private Integer ioThreads;
-
-        /**
-         * The buffer size of the Undertow host.
-         */
-        private Integer bufferSize;
-
-        /**
-         * Set if the Undertow host should use direct buffers.
-         */
-        private Boolean directBuffers;
-
-        /**
-         * Set if the Undertow host should use http2 protocol.
-         */
-        private Boolean http2Enabled;
-
-
-        public Integer getWorkerThreads() {
-            return workerThreads;
-        }
-
-        public void setWorkerThreads(Integer workerThreads) {
-            this.workerThreads = workerThreads;
-        }
-
-        public Integer getIoThreads() {
-            return ioThreads;
-        }
-
-        public void setIoThreads(Integer ioThreads) {
-            this.ioThreads = ioThreads;
-        }
-
-        public Integer getBufferSize() {
-            return bufferSize;
-        }
-
-        public void setBufferSize(Integer bufferSize) {
-            this.bufferSize = bufferSize;
-        }
-
-        public Boolean getDirectBuffers() {
-            return directBuffers;
-        }
-
-        public void setDirectBuffers(Boolean directBuffers) {
-            this.directBuffers = directBuffers;
-        }
-
-        public Boolean getHttp2Enabled() {
-            return http2Enabled;
-        }
-
-        public void setHttp2Enabled(Boolean http2Enabled) {
-            this.http2Enabled = http2Enabled;
-        }
-
-        @Override
-        public String toString() {
-            final StringBuilder sb = new StringBuilder("UndertowHostOptions{");
-            sb.append("workerThreads=").append(workerThreads);
-            sb.append(", ioThreads=").append(ioThreads);
-            sb.append(", bufferSize=").append(bufferSize);
-            sb.append(", directBuffers=").append(directBuffers);
-            sb.append(", http2Enabled=").append(http2Enabled);
-            sb.append('}');
-            return sb.toString();
-        }
+    public interface PredicatedHandler extends Handler<HttpServerRequest> {
+        boolean canHandle(HttpServerRequest event);
     }
 }
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
deleted file mode 100644
index 8dd6f34..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpBinding.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-
-import io.undertow.client.ClientExchange;
-import io.undertow.client.ClientRequest;
-import io.undertow.client.ClientResponse;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.Headers;
-import io.undertow.util.HttpString;
-import io.undertow.util.Methods;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.DefaultMessage;
-import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.MessageHelper;
-import org.apache.camel.support.ObjectHelper;
-import org.apache.camel.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xnio.channels.BlockingReadableByteChannel;
-import org.xnio.channels.StreamSourceChannel;
-
-public final class KnativeHttpBinding {
-    private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpBinding.class);
-
-    private final HeaderFilterStrategy headerFilterStrategy;
-    private final Boolean transferException;
-
-    public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy) {
-        this(headerFilterStrategy, Boolean.FALSE);
-    }
-
-    public KnativeHttpBinding(HeaderFilterStrategy headerFilterStrategy, Boolean transferException) {
-        this.headerFilterStrategy = Objects.requireNonNull(headerFilterStrategy, "headerFilterStrategy");
-        this.transferException = transferException;
-    }
-
-    public Message toCamelMessage(HttpServerExchange httpExchange, Exchange exchange) throws Exception {
-        Message result = new DefaultMessage(exchange.getContext());
-
-        populateCamelHeaders(httpExchange, result.getHeaders(), exchange);
-
-        //extract body by myself if undertow parser didn't handle and the method is allowed to have one
-        //body is extracted as byte[] then auto TypeConverter kicks in
-        if (Methods.POST.equals(httpExchange.getRequestMethod()) || Methods.PUT.equals(httpExchange.getRequestMethod()) || Methods.PATCH.equals(httpExchange.getRequestMethod())) {
-            result.setBody(readFromChannel(httpExchange.getRequestChannel()));
-        } else {
-            result.setBody(null);
-        }
-
-        return result;
-    }
-
-    public Message toCamelMessage(ClientExchange clientExchange, Exchange exchange) throws Exception {
-        Message result = new DefaultMessage(exchange.getContext());
-
-        //retrieve response headers
-        populateCamelHeaders(clientExchange.getResponse(), result.getHeaders(), exchange);
-
-        result.setBody(readFromChannel(clientExchange.getResponseChannel()));
-
-        return result;
-    }
-
-    public void populateCamelHeaders(HttpServerExchange httpExchange, Map<String, Object> headersMap, Exchange exchange) {
-        String path = httpExchange.getRequestPath();
-        KnativeHttpEndpoint endpoint = (KnativeHttpEndpoint) exchange.getFromEndpoint();
-        if (endpoint.getHttpURI() != null) {
-            // need to match by lower case as we want to ignore case on context-path
-            String endpointPath = endpoint.getHttpURI().getPath();
-            String matchPath = path.toLowerCase(Locale.US);
-            String match = endpointPath.toLowerCase(Locale.US);
-            if (matchPath.startsWith(match)) {
-                path = path.substring(endpointPath.length());
-            }
-        }
-        headersMap.put(Exchange.HTTP_PATH, path);
-
-        for (HttpString name : httpExchange.getRequestHeaders().getHeaderNames()) {
-            if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
-                name = Headers.CONTENT_TYPE;
-            }
-
-            if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
-                String value = httpExchange.getRequestHeaders().get(name).toString();
-                // store a special header that this request was authenticated using HTTP Basic
-                if (value != null && value.trim().startsWith("Basic")) {
-                    if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
-                        appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
-                    }
-                }
-            }
-
-            // add the headers one by one, and use the header filter strategy
-            for (Object value : httpExchange.getRequestHeaders().get(name)) {
-                if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
-                    appendHeader(headersMap, name.toString(), value);
-                }
-            }
-        }
-
-        //process uri parameters as headers
-        Map<String, Deque<String>> pathParameters = httpExchange.getQueryParameters();
-        for (Map.Entry<String, Deque<String>> entry : pathParameters.entrySet()) {
-            String name = entry.getKey();
-            for (Object value: entry.getValue()) {
-                if (!headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
-                    appendHeader(headersMap, name, value);
-                }
-            }
-        }
-
-        headersMap.put(Exchange.HTTP_METHOD, httpExchange.getRequestMethod().toString());
-        headersMap.put(Exchange.HTTP_URL, httpExchange.getRequestURL());
-        headersMap.put(Exchange.HTTP_URI, httpExchange.getRequestURI());
-        headersMap.put(Exchange.HTTP_QUERY, httpExchange.getQueryString());
-        headersMap.put(Exchange.HTTP_RAW_QUERY, httpExchange.getQueryString());
-    }
-
-    public void populateCamelHeaders(ClientResponse response, Map<String, Object> headersMap, Exchange exchange) throws Exception {
-        headersMap.put(Exchange.HTTP_RESPONSE_CODE, response.getResponseCode());
-
-        for (HttpString name : response.getResponseHeaders().getHeaderNames()) {
-            // mapping the content-type
-            //String name = httpName.toString();
-            if (name.toString().toLowerCase(Locale.US).equals("content-type")) {
-                name = Headers.CONTENT_TYPE;
-            }
-
-            if (name.toString().toLowerCase(Locale.US).equals("authorization")) {
-                String value = response.getResponseHeaders().get(name).toString();
-                // store a special header that this request was authenticated using HTTP Basic
-                if (value != null && value.trim().startsWith("Basic")) {
-                    if (!headerFilterStrategy.applyFilterToExternalHeaders(Exchange.AUTHENTICATION, "Basic", exchange)) {
-                        appendHeader(headersMap, Exchange.AUTHENTICATION, "Basic");
-                    }
-                }
-            }
-
-            // add the headers one by one, and use the header filter strategy
-            for (Object value : response.getResponseHeaders().get(name)) {
-                if (!headerFilterStrategy.applyFilterToExternalHeaders(name.toString(), value, exchange)) {
-                    appendHeader(headersMap, name.toString(), value);
-                }
-            }
-        }
-    }
-
-    public Object toHttpResponse(HttpServerExchange httpExchange, Message message) throws IOException {
-        final boolean failed = message.getExchange().isFailed();
-        final int defaultCode = failed ? 500 : 200;
-        final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
-        final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
-
-        httpExchange.setStatusCode(code);
-
-        //copy headers from Message to Response
-        for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
-            String key = entry.getKey();
-            Object value = entry.getValue();
-            // use an iterator as there can be multiple values. (must not use a delimiter)
-            for (Object it: ObjectHelper.createIterable(value, null)) {
-                String headerValue = tc.convertTo(String.class, it);
-                if (headerValue == null) {
-                    continue;
-                }
-                if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
-                    httpExchange.getResponseHeaders().add(new HttpString(key), headerValue);
-                }
-            }
-        }
-
-        Object body = message.getBody();
-        Exception exception = message.getExchange().getException();
-
-        if (exception != null) {
-            if (transferException) {
-                // we failed due an exception, and transfer it as java serialized object
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                ObjectOutputStream oos = new ObjectOutputStream(bos);
-                oos.writeObject(exception);
-                oos.flush();
-                IOHelper.close(oos, bos);
-
-                // the body should be the serialized java object of the exception
-                body = ByteBuffer.wrap(bos.toByteArray());
-                // force content type to be serialized java object
-                message.setHeader(Exchange.CONTENT_TYPE, "application/x-java-serialized-object");
-            } else {
-                // we failed due an exception so print it as plain text
-                StringWriter sw = new StringWriter();
-                PrintWriter pw = new PrintWriter(sw);
-                exception.printStackTrace(pw);
-
-                // the body should then be the stacktrace
-                body = ByteBuffer.wrap(sw.toString().getBytes());
-                // force content type to be text/plain as that is what the stacktrace is
-                message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
-            }
-
-            // and mark the exception as failure handled, as we handled it by returning it as the response
-            ExchangeHelper.setFailureHandled(message.getExchange());
-        }
-
-        // set the content type in the response.
-        String contentType = MessageHelper.getContentType(message);
-        if (contentType != null) {
-            // set content-type
-            httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
-            LOG.trace("Content-Type: {}", contentType);
-        }
-        return body;
-    }
-
-    public Object toHttpRequest(ClientRequest clientRequest, Message message) {
-        final Object body = message.getBody();
-        final HeaderMap requestHeaders = clientRequest.getRequestHeaders();
-
-        // set the content type in the response.
-        String contentType = MessageHelper.getContentType(message);
-        if (contentType != null) {
-            // set content-type
-            requestHeaders.put(Headers.CONTENT_TYPE, contentType);
-        }
-
-        TypeConverter tc = message.getExchange().getContext().getTypeConverter();
-
-        //copy headers from Message to Request
-        for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
-            String key = entry.getKey();
-            Object value = entry.getValue();
-            // use an iterator as there can be multiple values. (must not use a delimiter)
-            for (Object it: ObjectHelper.createIterable(value, null)) {
-                String headerValue = tc.convertTo(String.class, it);
-                if (headerValue == null) {
-                    continue;
-                }
-                if (!headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
-                    requestHeaders.add(new HttpString(key), headerValue);
-                }
-            }
-        }
-
-        return body;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void appendHeader(Map<String, Object> headers, String key, Object value) {
-        if (headers.containsKey(key)) {
-            Object existing = headers.get(key);
-            List<Object> list;
-            if (existing instanceof List) {
-                list = (List<Object>) existing;
-            } else {
-                list = new ArrayList<>();
-                list.add(existing);
-            }
-            list.add(value);
-            value = list;
-        }
-
-        headers.put(key, value);
-    }
-
-    public static byte[] readFromChannel(StreamSourceChannel source) throws IOException {
-        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-        final ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
-        final ReadableByteChannel blockingSource = new BlockingReadableByteChannel(source);
-
-        for (;;) {
-            int res = blockingSource.read(buffer);
-            if (res == -1) {
-                return out.toByteArray();
-            } else if (res == 0) {
-                LOG.error("Channel did not block");
-            } else {
-                buffer.flip();
-                out.write(
-                  buffer.array(),
-                  buffer.arrayOffset() + buffer.position(),
-                  buffer.arrayOffset() + buffer.limit());
-                buffer.clear();
-            }
-        }
-    }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
deleted file mode 100644
index 4adf801..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpClientCallback.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.util.Map;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import io.undertow.client.ClientCallback;
-import io.undertow.client.ClientConnection;
-import io.undertow.client.ClientExchange;
-import io.undertow.client.ClientRequest;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.http.common.HttpHelper;
-import org.apache.camel.http.common.HttpOperationFailedException;
-import org.apache.camel.support.ExchangeHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xnio.ChannelExceptionHandler;
-import org.xnio.ChannelListener;
-import org.xnio.ChannelListeners;
-import org.xnio.IoUtils;
-import org.xnio.channels.StreamSinkChannel;
-
-
-class KnativeHttpClientCallback implements ClientCallback<ClientConnection> {
-    private static final Logger LOG = LoggerFactory.getLogger(KnativeHttpClientCallback.class);
-
-    private final ByteBuffer body;
-    private final AsyncCallback callback;
-    private final BlockingDeque<Closeable> closeables;
-    private final KnativeHttpEndpoint endpoint;
-    private final Exchange exchange;
-    private final ClientRequest request;
-
-    KnativeHttpClientCallback(Exchange exchange, AsyncCallback callback, KnativeHttpEndpoint endpoint, ClientRequest request, ByteBuffer body) {
-        this.closeables = new LinkedBlockingDeque<>();
-        this.exchange = exchange;
-        this.callback = callback;
-        this.endpoint = endpoint;
-        this.request = request;
-        this.body = body;
-    }
-
-    @Override
-    public void completed(final ClientConnection connection) {
-        // we have established connection, make sure we close it
-        deferClose(connection);
-
-        // now we can send the request and perform the exchange: writing the
-        // request and reading the response
-        connection.sendRequest(request, on(this::performClientExchange));
-    }
-
-    @Override
-    public void failed(final IOException e) {
-        hasFailedWith(e);
-    }
-
-    private ChannelListener<StreamSinkChannel> asyncWriter(final ByteBuffer body) {
-        return channel -> {
-            try {
-                write(channel, body);
-
-                if (body.hasRemaining()) {
-                    channel.resumeWrites();
-                } else {
-                    flush(channel);
-                }
-            } catch (final IOException e) {
-                hasFailedWith(e);
-            }
-        };
-    }
-
-    private void deferClose(final Closeable closeable) {
-        try {
-            closeables.putFirst(closeable);
-        } catch (final InterruptedException e) {
-            hasFailedWith(e);
-        }
-    }
-
-    private void finish(final Message result) {
-        for (final Closeable closeable : closeables) {
-            IoUtils.safeClose(closeable);
-        }
-
-        if (result != null) {
-            if (ExchangeHelper.isOutCapable(exchange)) {
-                exchange.setOut(result);
-            } else {
-                exchange.setIn(result);
-            }
-        }
-
-        callback.done(false);
-    }
-
-    private void hasFailedWith(final Throwable e) {
-        LOG.trace("Exchange has failed with", e);
-        if (Boolean.TRUE.equals(endpoint.getThrowExceptionOnFailure())) {
-            exchange.setException(e);
-        }
-
-        finish(null);
-    }
-
-    private <T> ClientCallback<T> on(final Consumer<T> completed) {
-        return on(completed, this::hasFailedWith);
-    }
-
-    private <T> ClientCallback<T> on(Consumer<T> completed, Consumer<IOException> failed) {
-        return new ClientCallback<T>() {
-            @Override
-            public void completed(final T result) {
-                completed.accept(result);
-            }
-
-            @Override
-            public void failed(final IOException e) {
-                failed(e);
-            }
-        };
-    }
-
-    private void performClientExchange(final ClientExchange clientExchange) {
-        // add response listener to the exchange, we could receive the response
-        // at any time (async)
-        setupResponseListener(clientExchange);
-
-        // write the request
-        writeRequest(clientExchange, body);
-    }
-
-    private void setupResponseListener(final ClientExchange clientExchange) {
-        clientExchange.setResponseListener(on(response -> {
-            try {
-                final KnativeHttpBinding binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
-                final Message result = binding.toCamelMessage(clientExchange, exchange);
-                final int code = clientExchange.getResponse().getResponseCode();
-
-                if (!HttpHelper.isStatusCodeOk(code, "200-299") && endpoint.getThrowExceptionOnFailure()) {
-                    // operation failed so populate exception to throw
-                    final String uri = endpoint.getHttpURI().toString();
-                    final String statusText = clientExchange.getResponse().getStatus();
-
-                    // Convert Message headers (Map<String, Object>) to Map<String, String> as expected by
-                    // HttpOperationsFailedException using Message versus clientExchange as its header values
-                    // have extra formatting
-                    final Map<String, String> headers = result.getHeaders().entrySet()
-                            .stream()
-                            .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().toString()));
-
-                    // Since result (Message) isn't associated with an Exchange yet, you can not use result.getBody(String.class)
-                    final String bodyText = ExchangeHelper.convertToType(exchange, String.class, result.getBody());
-                    final Exception cause = new HttpOperationFailedException(uri, code, statusText, null, headers, bodyText);
-
-                    if (ExchangeHelper.isOutCapable(exchange)) {
-                        exchange.setOut(result);
-                    } else {
-                        exchange.setIn(result);
-                    }
-
-                    // make sure to fail with HttpOperationFailedException
-                    hasFailedWith(cause);
-                } else {
-                    // we end Camel exchange here
-                    finish(result);
-                }
-            } catch (Throwable e) {
-                hasFailedWith(e);
-            }
-        }));
-    }
-
-    private void writeRequest(final ClientExchange clientExchange, final ByteBuffer body) {
-        final StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
-        if (body != null) {
-            try {
-                // try writing, we could be on IO thread and ready to write to
-                // the socket (or not)
-                write(requestChannel, body);
-
-                if (body.hasRemaining()) {
-                    // we did not write all of body (or at all) register a write
-                    // listener to write asynchronously
-                    requestChannel.getWriteSetter().set(asyncWriter(body));
-                    requestChannel.resumeWrites();
-                } else {
-                    // we are done, we need to flush the request
-                    flush(requestChannel);
-                }
-            } catch (final IOException e) {
-                hasFailedWith(e);
-            }
-        }
-    }
-
-    private static void flush(final StreamSinkChannel channel) throws IOException {
-        // the canonical way of flushing Xnio channels
-        channel.shutdownWrites();
-        if (!channel.flush()) {
-            final ChannelListener<StreamSinkChannel> safeClose = IoUtils::safeClose;
-            final ChannelExceptionHandler<Channel> closingChannelExceptionHandler = ChannelListeners
-                .closingChannelExceptionHandler();
-            final ChannelListener<StreamSinkChannel> flushingChannelListener = ChannelListeners
-                .flushingChannelListener(safeClose, closingChannelExceptionHandler);
-            channel.getWriteSetter().set(flushingChannelListener);
-            channel.resumeWrites();
-        }
-    }
-
-    private static void write(final StreamSinkChannel channel, final ByteBuffer body) throws IOException {
-        int written = 1;
-        while (body.hasRemaining() && written > 0) {
-            written = channel.write(body);
-        }
-    }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
index bea70a5..24edb27 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
@@ -17,94 +17,188 @@
 package org.apache.camel.component.knative.http;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
-import io.undertow.predicate.Predicate;
-import io.undertow.server.HttpHandler;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.client.WebClientOptions;
 import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.IntrospectionSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Component("knative-http")
 public class KnativeHttpComponent extends DefaultComponent {
     private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class);
-    private final Map<KnativeHttp.HostKey, KnativeHttpDispatcher> registry;
 
+    private final Map<KnativeHttp.ServerKey, KnativeHttpConsumerDispatcher> registry;
+
+    @Metadata(label = "advanced")
+    private Vertx vertx;
+    @Metadata(label = "advanced")
+    private VertxOptions vertxOptions;
     @Metadata(label = "advanced")
-    private KnativeHttp.HostOptions hostOptions;
+    private HttpServerOptions vertxHttpServerOptions;
+    @Metadata(label = "advanced")
+    private WebClientOptions vertxHttpClientOptions;
+
+    private boolean localVertx;
+    private ExecutorService executor;
 
     public KnativeHttpComponent() {
         this.registry = new ConcurrentHashMap<>();
+        this.localVertx = false;
     }
 
     @Override
-    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        final Pattern pattern = Pattern.compile("([0-9a-zA-Z][\\w\\.-]+):(\\d+)\\/?(.*)");
-        final Matcher matcher = pattern.matcher(remaining);
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        this.executor = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "knative-http-component");
+
+        if (this.vertx == null) {
+            Set<Vertx> instances = getCamelContext().getRegistry().findByType(Vertx.class);
+            if (instances.size() == 1) {
+                this.vertx = instances.iterator().next();
+            }
+        }
 
+        if (this.vertx == null) {
+            VertxOptions options = ObjectHelper.supplyIfEmpty(this.vertxOptions, VertxOptions::new);
+
+            this.vertx = Vertx.vertx(options);
+            this.localVertx = true;
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+
+        if (this.vertx != null && this.localVertx) {
+            Future<?> future = this.executor.submit(
+                () -> {
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    this.vertx.close(result -> {
+                        try {
+                            if (result.failed()) {
+                                LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
+                                    result.cause().getMessage()
+                                );
+
+                                throw new RuntimeException(result.cause());
+                            }
+
+                            LOGGER.info("Vert.x HttpServer stopped");
+                        } finally {
+                            latch.countDown();
+                        }
+                    });
+
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            );
+
+            try {
+                future.get();
+            } finally {
+                this.vertx = null;
+                this.localVertx = false;
+            }
+        }
+
+        if (this.executor != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
+        }
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        Matcher matcher = KnativeHttp.ENDPOINT_PATTERN.matcher(remaining);
         if (!matcher.find()) {
             throw new IllegalArgumentException("Bad URI: " + remaining);
         }
 
-        final String host;
-        final int port;
-        final String path;
+        KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
+        ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true));
 
         switch (matcher.groupCount()) {
         case 1:
-            host = matcher.group(1);
-            port = 8080;
-            path = "/";
+            ep.setHost(matcher.group(1));
+            ep.setPort(KnativeHttp.DEFAULT_PORT);
+            ep.setPath(KnativeHttp.DEFAULT_PATH);
             break;
         case 2:
-            host = matcher.group(1);
-            port = Integer.parseInt(matcher.group(2));
-            path = "/";
+            ep.setHost(matcher.group(1));
+            ep.setPort(Integer.parseInt(matcher.group(2)));
+            ep.setPath(KnativeHttp.DEFAULT_PATH);
             break;
         case 3:
-            host = matcher.group(1);
-            port = Integer.parseInt(matcher.group(2));
-            path = "/" + matcher.group(3);
+            ep.setHost(matcher.group(1));
+            ep.setPort(Integer.parseInt(matcher.group(2)));
+            ep.setPath(KnativeHttp.DEFAULT_PATH + matcher.group(3));
             break;
         default:
             throw new IllegalArgumentException("Bad URI: " + remaining);
         }
 
-        KnativeHttpEndpoint ep = new KnativeHttpEndpoint(uri, this);
-        ep.setHost(host);
-        ep.setPort(port);
-        ep.setPath(path);
-        ep.setHeaderFilter(IntrospectionSupport.extractProperties(parameters, "filter.", true));
-
         setProperties(ep, parameters);
 
         return ep;
     }
 
-    public KnativeHttp.HostOptions getHostOptions() {
-        return hostOptions;
+    public Vertx getVertx() {
+        return vertx;
     }
 
-    public void setHostOptions(KnativeHttp.HostOptions hostOptions) {
-        this.hostOptions = hostOptions;
+    public void setVertx(Vertx vertx) {
+        this.vertx = vertx;
     }
 
-    public void bind(KnativeHttp.HostKey key, HttpHandler handler, Predicate predicate) {
-        getUndertow(key).bind(handler, predicate);
+    public VertxOptions getVertxOptions() {
+        return vertxOptions;
     }
 
-    public void unbind(KnativeHttp.HostKey key, HttpHandler handler) {
-        getUndertow(key).unbind(handler);
+    public void setVertxOptions(VertxOptions vertxOptions) {
+        this.vertxOptions = vertxOptions;
+    }
+
+    public HttpServerOptions getVertxHttpServerOptions() {
+        return vertxHttpServerOptions;
+    }
+
+    public void setVertxHttpServerOptions(HttpServerOptions vertxHttpServerOptions) {
+        this.vertxHttpServerOptions = vertxHttpServerOptions;
+    }
+
+    public WebClientOptions getVertxHttpClientOptions() {
+        return vertxHttpClientOptions;
+    }
+
+    public void setVertxHttpClientOptions(WebClientOptions vertxHttpClientOptions) {
+        this.vertxHttpClientOptions = vertxHttpClientOptions;
+    }
 
+    KnativeHttpConsumerDispatcher getDispatcher(KnativeHttp.ServerKey key) {
+        return registry.computeIfAbsent(key, k -> new KnativeHttpConsumerDispatcher(executor, vertx, k, vertxHttpServerOptions));
     }
 
-    private KnativeHttpDispatcher getUndertow(KnativeHttp.HostKey key) {
-        return registry.computeIfAbsent(key, k -> new KnativeHttpDispatcher(k, hostOptions));
+    ExecutorService getExecutorService() {
+        return this.executor;
     }
 }
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index 26f057b..a9bbedf 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -16,31 +16,57 @@
  */
 package org.apache.camel.component.knative.http;
 
-import java.nio.ByteBuffer;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Predicate;
 
-import io.undertow.predicate.Predicates;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.HttpString;
-import io.undertow.util.MimeMappings;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 
-public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler {
-    private final KnativeHttpBinding binding;
+public class KnativeHttpConsumer extends DefaultConsumer implements KnativeHttp.PredicatedHandler {
+    private final Predicate<HttpServerRequest> filter;
 
     public KnativeHttpConsumer(KnativeHttpEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
 
-        this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+        filter = v -> {
+            if (!Objects.equals(endpoint.getPath(), v.path())) {
+                return false;
+            }
+            if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
+                return true;
+            }
+
+            for (Map.Entry<String, Object> entry : endpoint.getHeaderFilter().entrySet()) {
+                String ref = entry.getValue().toString();
+                String val = v.getHeader(entry.getKey());
+                boolean matches = Objects.equals(ref, val) || val.matches(ref);
+
+                if (!matches) {
+                    return false;
+                }
+            }
+
+            return true;
+        };
     }
 
     @Override
@@ -52,30 +78,9 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler
     protected void doStart() throws Exception {
         final KnativeHttpEndpoint endpoint = getEndpoint();
         final KnativeHttpComponent component = endpoint.getComponent();
-        final KnativeHttp.HostKey key = endpoint.getHostKey();
-
-        component.bind(key, this, Predicates.and(
-            Predicates.path(endpoint.getPath()),
-            value -> {
-                if (ObjectHelper.isEmpty(endpoint.getHeaderFilter())) {
-                    return true;
-                }
-
-                HeaderMap hm = value.getRequestHeaders();
+        final KnativeHttp.ServerKey key = endpoint.getServerKey();
 
-                for (Map.Entry<String, Object> entry: endpoint.getHeaderFilter().entrySet()) {
-                    String ref = entry.getValue().toString();
-                    String val = hm.getFirst(entry.getKey());
-                    boolean matches = Objects.equals(ref, val) || val.matches(ref);
-
-                    if (!matches) {
-                        return false;
-                    }
-                }
-
-                return true;
-            }
-        ));
+        component.getDispatcher(key).bind(this);
 
         super.doStart();
     }
@@ -84,49 +89,152 @@ public class KnativeHttpConsumer extends DefaultConsumer implements HttpHandler
     protected void doStop() throws Exception {
         final KnativeHttpEndpoint endpoint = getEndpoint();
         final KnativeHttpComponent component = endpoint.getComponent();
-        final KnativeHttp.HostKey key = endpoint.getHostKey();
+        final KnativeHttp.ServerKey key = endpoint.getServerKey();
 
-        component.unbind(key, this);
+        component.getDispatcher(key).unbind(this);
 
         super.doStop();
     }
 
     @Override
-    public void handleRequest(HttpServerExchange httpExchange) throws Exception {
-        //create new Exchange
-        //binding is used to extract header and payload(if available)
-        Exchange camelExchange = createExchange(httpExchange);
-
-        //Unit of Work to process the Exchange
-        createUoW(camelExchange);
-        try {
-            getProcessor().process(camelExchange);
-        } catch (Exception e) {
-            getExceptionHandler().handleException(e);
-        } finally {
-            doneUoW(camelExchange);
+    public boolean canHandle(HttpServerRequest request) {
+        return filter.test(request);
+    }
+
+    @Override
+    public void handle(HttpServerRequest request) {
+        if (request.method() == HttpMethod.POST) {
+            final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
+            final Message in = toMessage(request, exchange);
+
+            request.bodyHandler(buffer -> {
+                in.setBody(buffer.getBytes());
+
+                exchange.setIn(in);
+
+                try {
+                    createUoW(exchange);
+                    getAsyncProcessor().process(exchange, doneSync -> {
+                        try {
+                            HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
+                            Buffer body = computeResponseBody(exchange.getMessage());
+
+                            // set the content type in the response.
+                            String contentType = MessageHelper.getContentType(exchange.getMessage());
+                            if (contentType != null) {
+                                // set content-type
+                                response.putHeader(Exchange.CONTENT_TYPE, contentType);
+                            }
+
+                            if (body == null) {
+                                request.response().setStatusCode(204);
+                                request.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+                                request.response().end("No response available");
+                            } else {
+                                request.response().end(body);
+                            }
+                        } catch (Exception e) {
+                            getExceptionHandler().handleException(e);
+                        }
+                    });
+                } catch (Exception e) {
+                    getExceptionHandler().handleException(e);
+                } finally {
+                    doneUoW(exchange);
+                }
+            });
+        } else {
+            request.response().setStatusCode(405);
+            request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
+            request.response().end("Unsupported method");
+
+            throw new IllegalArgumentException("Unsupported method: " + request.method());
+        }
+    }
+
+    private Message toMessage(HttpServerRequest request, Exchange exchange) {
+        KnativeHttpEndpoint endpoint = getEndpoint();
+        Message message = new DefaultMessage(exchange.getContext());
+        String path = request.path();
+
+        if (endpoint.getPath() != null) {
+            String endpointPath = endpoint.getPath();
+            String matchPath = path.toLowerCase(Locale.US);
+            String match = endpointPath.toLowerCase(Locale.US);
+
+            if (matchPath.startsWith(match)) {
+                path = path.substring(endpointPath.length());
+            }
+        }
+
+        for (Map.Entry<String, String> entry : request.headers().entries()) {
+            if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+                KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+            }
+        }
+        for (Map.Entry<String, String> entry : request.params().entries()) {
+            if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+                KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+            }
         }
 
-        Object body = binding.toHttpResponse(httpExchange, camelExchange.getMessage());
-        TypeConverter tc = getEndpoint().getCamelContext().getTypeConverter();
+        message.setHeader(Exchange.HTTP_PATH, path);
+        message.setHeader(Exchange.HTTP_METHOD, request.method());
+        message.setHeader(Exchange.HTTP_URI, request.uri());
+        message.setHeader(Exchange.HTTP_QUERY, request.query());
 
-        if (body == null) {
-            httpExchange.getResponseHeaders().put(new HttpString(Exchange.CONTENT_TYPE), MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt"));
-            httpExchange.getResponseSender().send("No response available");
-        } else {
-            ByteBuffer bodyAsByteBuffer = tc.mandatoryConvertTo(ByteBuffer.class, body);
-            httpExchange.getResponseSender().send(bodyAsByteBuffer);
+        return message;
+    }
+
+    private HttpServerResponse toHttpResponse(HttpServerRequest request, Message message) {
+        final HttpServerResponse response = request.response();
+        final boolean failed = message.getExchange().isFailed();
+        final int defaultCode = failed ? 500 : 200;
+        final int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class);
+        final TypeConverter tc = message.getExchange().getContext().getTypeConverter();
+
+        response.setStatusCode(code);
+
+        for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+            final String key = entry.getKey();
+            final Object value = entry.getValue();
+
+            for (Object it: org.apache.camel.support.ObjectHelper.createIterable(value, null)) {
+                String headerValue = tc.convertTo(String.class, it);
+                if (headerValue == null) {
+                    continue;
+                }
+                if (!getEndpoint().getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) {
+                    response.putHeader(key, headerValue);
+                }
+            }
         }
+
+        return response;
     }
 
-    public Exchange createExchange(HttpServerExchange httpExchange) throws Exception {
-        Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOut);
-        Message in = binding.toCamelMessage(httpExchange, exchange);
+    private Buffer computeResponseBody(Message message) throws NoTypeConversionAvailableException {
+        Object body = message.getBody();
+        Exception exception = message.getExchange().getException();
+
+        if (exception != null) {
+            // we failed due an exception so print it as plain text
+            StringWriter sw = new StringWriter();
+            PrintWriter pw = new PrintWriter(sw);
+            exception.printStackTrace(pw);
 
-        exchange.setProperty(Exchange.CHARSET_NAME, httpExchange.getRequestCharset());
-        in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, httpExchange.getRequestCharset());
+            // the body should then be the stacktrace
+            body = sw.toString().getBytes(StandardCharsets.UTF_8);
+            // force content type to be text/plain as that is what the stacktrace is
+            message.setHeader(Exchange.CONTENT_TYPE, "text/plain");
+
+            // and mark the exception as failure handled, as we handled it by returning
+            // it as the response
+            ExchangeHelper.setFailureHandled(message.getExchange());
+        }
 
-        exchange.setIn(in);
-        return exchange;
+        return Buffer.buffer(
+            message.getExchange().getContext().getTypeConverter().mandatoryConvertTo(byte[].class, body)
+        );
     }
 }
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
new file mode 100644
index 0000000..446dc6b
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerDispatcher.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpServerResponse;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ReferenceCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class KnativeHttpConsumerDispatcher {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumerDispatcher.class);
+
+    private final Vertx vertx;
+    private final KnativeHttp.ServerKey key;
+    private final ReferenceCount refCnt;
+    private final Set<KnativeHttp.PredicatedHandler> handlers;
+    private final HttpServerWrapper server;
+    private final HttpServerOptions serverOptions;
+    private final ExecutorService executor;
+
+    public KnativeHttpConsumerDispatcher(ExecutorService executor,  Vertx vertx, KnativeHttp.ServerKey key, HttpServerOptions serverOptions) {
+        this.executor = executor;
+        this.vertx = vertx;
+        this.serverOptions = ObjectHelper.supplyIfEmpty(serverOptions, HttpServerOptions::new);
+        this.server = new HttpServerWrapper();
+
+        this.handlers = new CopyOnWriteArraySet<>();
+        this.key = key;
+        this.refCnt = ReferenceCount.on(server::start, server::stop);
+    }
+
+    public void bind(KnativeHttp.PredicatedHandler handler) {
+        if (handlers.add(handler)) {
+            refCnt.retain();
+        }
+    }
+
+    public void unbind(KnativeHttp.PredicatedHandler handler) {
+        if (handlers.remove(handler)) {
+            refCnt.release();
+        }
+    }
+
+    private final class HttpServerWrapper extends ServiceSupport implements Handler<HttpServerRequest> {
+        private HttpServer server;
+
+        @Override
+        protected void doStart() throws Exception {
+            LOGGER.info("Starting Vert.x HttpServer on {}:{}}",
+                key.getHost(),
+                key.getPort()
+            );
+
+            startAsync().toCompletableFuture().join();
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            LOGGER.info("Stopping Vert.x HttpServer on {}:{}",
+                key.getHost(),
+                key.getPort());
+
+            try {
+                if (server != null) {
+                    stopAsync().toCompletableFuture().join();
+                }
+            } finally {
+                this.server = null;
+            }
+        }
+
+        private CompletionStage<Void> startAsync() {
+            server = vertx.createHttpServer(serverOptions);
+            server.requestHandler(this);
+
+            return CompletableFuture.runAsync(
+                () -> {
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    server.listen(key.getPort(), key.getHost(), result -> {
+                        try {
+                            if (result.failed()) {
+                                LOGGER.warn("Failed to start Vert.x HttpServer on {}:{}, reason: {}",
+                                    key.getHost(),
+                                    key.getPort(),
+                                    result.cause().getMessage()
+                                );
+
+                                throw new RuntimeException(result.cause());
+                            }
+
+                            LOGGER.info("Vert.x HttpServer started on {}:{}", key.getPort(), key.getHost());
+                        } finally {
+                            latch.countDown();
+                        }
+                    });
+
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                executor
+            );
+        }
+
+        protected CompletionStage<Void> stopAsync() {
+            return CompletableFuture.runAsync(
+                () -> {
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    server.close(result -> {
+                        try {
+                            if (result.failed()) {
+                                LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
+                                    result.cause().getMessage()
+                                );
+
+                                throw new RuntimeException(result.cause());
+                            }
+
+                            LOGGER.info("Vert.x HttpServer stopped");
+                        } finally {
+                            latch.countDown();
+                        }
+                    });
+
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                executor
+            );
+        }
+
+        @Override
+        public void handle(HttpServerRequest request) {
+            LOGGER.debug("received exchange on path: {}, headers: {}",
+                request.path(),
+                request.headers()
+            );
+
+            for (KnativeHttp.PredicatedHandler handler: handlers) {
+                if (handler.canHandle(request)) {
+                    handler.handle(request);
+                    return;
+                }
+            }
+
+            LOGGER.warn("No handler found for path: {}, headers: {}",
+                request.path(),
+                request.headers()
+            );
+
+            HttpServerResponse response = request.response();
+            response.setStatusCode(404);
+            response.putHeader(Exchange.CONTENT_TYPE, "text/plain");
+            response.end("No matching condition found");
+        }
+    }
+}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
deleted file mode 100644
index 2d774cd..0000000
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpDispatcher.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.http;
-
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import io.undertow.Undertow;
-import io.undertow.UndertowOptions;
-import io.undertow.predicate.Predicate;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.server.handlers.PathHandler;
-import io.undertow.util.Headers;
-import io.undertow.util.StatusCodes;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class KnativeHttpDispatcher implements HttpHandler {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpDispatcher.class);
-
-    private final KnativeHttp.HostKey key;
-    private final KnativeHttp.HostOptions options;
-    private final ReferenceCount refCnt;
-    private final Set<PredicatedHandlerWrapper> handlers;
-    private final Undertow undertow;
-    private final PathHandler handler;
-
-    public KnativeHttpDispatcher(KnativeHttp.HostKey key, KnativeHttp.HostOptions option) {
-        this.handlers = new CopyOnWriteArraySet<>();
-        this.key = key;
-        this.options = option;
-        this.handler = new PathHandler(this);
-        this.undertow = createUndertow();
-        this.refCnt = ReferenceCount.on(this::startUndertow, this::stopUndertow);
-    }
-
-    @Override
-    public void handleRequest(HttpServerExchange exchange) throws Exception {
-        LOGGER.debug("received exchange on path: {}, headers: {}",
-            exchange.getRelativePath(),
-            exchange.getRequestHeaders()
-        );
-
-        for (PredicatedHandlerWrapper handler: handlers) {
-            if (handler.dispatch(exchange)) {
-                return;
-            }
-        }
-
-        exchange.setStatusCode(StatusCodes.NOT_FOUND);
-        exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
-        exchange.getResponseSender().send("No matching condition found");
-    }
-
-    public void bind(HttpHandler handler, Predicate predicate) {
-        if (handlers.add(new PredicatedHandlerWrapper(handler, predicate))) {
-            refCnt.retain();
-        }
-    }
-
-    public void unbind(HttpHandler handler) {
-        if (handlers.removeIf(phw -> phw.handler == handler)) {
-            refCnt.release();
-        }
-    }
-
-    private void startUndertow() {
-        try {
-            LOGGER.info("Starting Undertow server on {}://{}:{}}",
-              key.getSslContext() != null ? "https" : "http",
-              key.getHost(),
-              key.getPort()
-            );
-
-            undertow.start();
-        } catch (RuntimeException e) {
-            LOGGER.warn("Failed to start Undertow server on {}://{}:{}, reason: {}",
-              key.getSslContext() != null ? "https" : "http",
-              key.getHost(),
-              key.getPort(),
-              e.getMessage()
-            );
-
-            undertow.stop();
-
-            throw e;
-        }
-    }
-
-    private void stopUndertow() {
-        LOGGER.info("Stopping Undertow server on {}://{}:{}",
-          key.getSslContext() != null ? "https" : "http",
-          key.getHost(),
-          key.getPort());
-
-        undertow.stop();
-    }
-
-    private Undertow createUndertow() {
-        Undertow.Builder builder = Undertow.builder();
-        if (key.getSslContext() != null) {
-            builder.addHttpsListener(key.getPort(), key.getHost(), key.getSslContext());
-        } else {
-            builder.addHttpListener(key.getPort(), key.getHost());
-        }
-
-        if (options != null) {
-            ObjectHelper.ifNotEmpty(options.getIoThreads(), builder::setIoThreads);
-            ObjectHelper.ifNotEmpty(options.getWorkerThreads(), builder::setWorkerThreads);
-            ObjectHelper.ifNotEmpty(options.getBufferSize(), builder::setBufferSize);
-            ObjectHelper.ifNotEmpty(options.getDirectBuffers(), builder::setDirectBuffers);
-            ObjectHelper.ifNotEmpty(options.getHttp2Enabled(), e -> builder.setServerOption(UndertowOptions.ENABLE_HTTP2, e));
-        }
-
-        return builder.setHandler(new PathHandler(handler)).build();
-    }
-
-    private static final class PredicatedHandlerWrapper {
-        private final HttpHandler handler;
-        private final Predicate predicate;
-
-        public PredicatedHandlerWrapper(HttpHandler handler, Predicate predicate) {
-            this.handler = ObjectHelper.notNull(handler, "handler");
-            this.predicate = ObjectHelper.notNull(predicate, "predicate");
-        }
-
-        boolean dispatch(HttpServerExchange exchange) throws Exception {
-            if (predicate.resolve(exchange)) {
-                if (exchange.isInIoThread()) {
-                    exchange.dispatch(handler);
-                } else {
-                    handler.handleRequest(exchange);
-                }
-
-                return true;
-            }
-
-            LOGGER.debug("No handler for path: {}, headers: {}",
-                exchange.getRelativePath(),
-                exchange.getRequestHeaders()
-            );
-
-            return false;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PredicatedHandlerWrapper holder = (PredicatedHandlerWrapper) o;
-            return handler.equals(holder.handler);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(handler);
-        }
-    }
-}
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
index e02f736..e9d8c2b 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.knative.http;
 
-import java.net.URI;
 import java.util.Map;
 
 import org.apache.camel.Consumer;
@@ -27,10 +26,9 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.ObjectHelper;
-import org.xnio.OptionMap;
 
 @UriEndpoint(
     firstVersion = "3.0.0",
@@ -65,12 +63,6 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
         this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
     }
 
-    // **********************************
-    //
-    // Properties
-    //
-    // **********************************
-
     public String getHost() {
         return host;
     }
@@ -93,6 +85,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
 
     public void setPath(String path) {
         this.path = path;
+
+        if (!this.path.startsWith("/")) {
+            this.path = "/" + path;
+        }
     }
 
     public HeaderFilterStrategy getHeaderFilterStrategy() {
@@ -127,26 +123,10 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
         this.throwExceptionOnFailure = throwExceptionOnFailure;
     }
 
-    public KnativeHttp.HostKey getHostKey() {
-        return new KnativeHttp.HostKey(host, port, null);
+    public KnativeHttp.ServerKey getServerKey() {
+        return new KnativeHttp.ServerKey(host, port);
     }
 
-    public URI getHttpURI() {
-        String uri = "http://" + host + ":" + port;
-
-        if (ObjectHelper.isNotEmpty(path)) {
-            uri += path;
-        }
-
-        return URI.create(uri);
-    }
-
-    // **********************************
-    //
-    // Impl
-    //
-    // **********************************
-
     @Override
     public KnativeHttpComponent getComponent() {
         return (KnativeHttpComponent)super.getComponent();
@@ -154,11 +134,11 @@ public class KnativeHttpEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KnativeHttpProducer(this, OptionMap.EMPTY);
+        return new KnativeHttpProducer(this, getComponent().getVertx(), getComponent().getVertxHttpClientOptions());
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new KnativeHttpConsumer(this, processor);
+        return new KnativeHttpConsumer(this, AsyncProcessorConverterHelper.convert(processor));
     }
 }
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
index 96051c9..db3d165 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpHeaderFilterStrategy.java
@@ -19,12 +19,11 @@ package org.apache.camel.component.knative.http;
 import org.apache.camel.support.DefaultHeaderFilterStrategy;
 
 public class KnativeHttpHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
-
     public KnativeHttpHeaderFilterStrategy() {
         initialize();
     }
 
-    protected void initialize() {
+    protected final void initialize() {
         getOutFilter().add("content-length");
         getOutFilter().add("content-type");
         getOutFilter().add("host");
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
index 7d79ccd..c4aafed 100644
--- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java
@@ -16,44 +16,40 @@
  */
 package org.apache.camel.component.knative.http;
 
-import java.net.URI;
-import java.nio.ByteBuffer;
-
-import io.undertow.client.ClientRequest;
-import io.undertow.client.UndertowClient;
-import io.undertow.protocols.ssl.UndertowXnioSsl;
-import io.undertow.server.DefaultByteBufferPool;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.Headers;
-import io.undertow.util.Methods;
+import java.util.Map;
+
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.TypeConverter;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.http.common.HttpOperationFailedException;
 import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.jsse.SSLContextParameters;
-import org.apache.camel.util.URISupport;
+import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.xnio.OptionMap;
-import org.xnio.Xnio;
-import org.xnio.XnioWorker;
-import org.xnio.ssl.XnioSsl;
 
 public class KnativeHttpProducer extends DefaultAsyncProducer {
     private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class);
 
-    private final OptionMap options;
-    private final KnativeHttpBinding binding;
+    private final Vertx vertx;
+    private final WebClientOptions clientOptions;
+    private WebClient client;
 
-    private UndertowClient client;
-    private DefaultByteBufferPool pool;
-    private XnioSsl ssl;
-    private XnioWorker worker;
-
-    public KnativeHttpProducer(KnativeHttpEndpoint endpoint, OptionMap options) {
+    public KnativeHttpProducer(KnativeHttpEndpoint endpoint, Vertx vertx, WebClientOptions clientOptions) {
         super(endpoint);
-        this.options = options;
-        this.binding = new KnativeHttpBinding(endpoint.getHeaderFilterStrategy());
+
+        this.vertx = ObjectHelper.notNull(vertx, "vertx");
+        this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new);
     }
 
     @Override
@@ -62,70 +58,98 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
     }
 
     @Override
-    public boolean process(final Exchange camelExchange, final AsyncCallback callback) {
-        final KnativeHttpEndpoint endpoint = getEndpoint();
-        final URI uri = endpoint.getHttpURI();
-        final String pathAndQuery = URISupport.pathAndQueryOf(uri);
-
-        final ClientRequest request = new ClientRequest();
-        request.setMethod(Methods.POST);
-        request.setPath(pathAndQuery);
-        request.getRequestHeaders().put(Headers.HOST, uri.getHost());
-
-        final Object body = binding.toHttpRequest(request, camelExchange.getIn());
-        final TypeConverter tc = endpoint.getCamelContext().getTypeConverter();
-        final ByteBuffer bodyAsByte = tc.tryConvertTo(ByteBuffer.class, body);
-
-        // As tryConvertTo is used to convert the body, we should do null check
-        // or the call bodyAsByte.remaining() may throw an NPE
-        if (body != null && bodyAsByte != null) {
-            request.getRequestHeaders().put(Headers.CONTENT_LENGTH, bodyAsByte.remaining());
-        }
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        final byte[] payload;
 
-        // when connect succeeds or fails UndertowClientCallback will
-        // get notified on a I/O thread run by Xnio worker. The writing
-        // of request and reading of response is performed also in the
-        // callback
-        client.connect(
-            new KnativeHttpClientCallback(camelExchange, callback, getEndpoint(), request, bodyAsByte),
-            uri,
-            worker,
-            ssl,
-            pool,
-            options);
-
-        // the call above will proceed on Xnio I/O thread we will
-        // notify the exchange asynchronously when the HTTP exchange
-        // ends with success or failure from UndertowClientCallback
-        return false;
-    }
+        try {
+            payload = exchange.getMessage().getMandatoryBody(byte[].class);
+        } catch (InvalidPayloadException e) {
+            exchange.setException(e);
+            callback.done(true);
 
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
+            return true;
+        }
 
-        final Xnio xnio = Xnio.getInstance();
+        KnativeHttpEndpoint endpoint = getEndpoint();
+        Message message = exchange.getMessage();
 
-        pool = new DefaultByteBufferPool(true, 17 * 1024);
-        worker = xnio.createWorker(options);
+        MultiMap headers = MultiMap.caseInsensitiveMultiMap();
+        headers.add(HttpHeaders.HOST, endpoint.getHost());
+        headers.add(HttpHeaders.CONTENT_LENGTH, Integer.toString(payload.length));
 
-        SSLContextParameters sslContext = getEndpoint().getSslContextParameters();
-        if (sslContext != null) {
-            ssl = new UndertowXnioSsl(xnio, options, sslContext.createSSLContext(getEndpoint().getCamelContext()));
+        String contentType = MessageHelper.getContentType(message);
+        if (contentType != null) {
+            headers.add(HttpHeaders.CONTENT_TYPE, contentType);
         }
 
-        client = UndertowClient.getInstance();
+        for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) {
+            if (!endpoint.getHeaderFilterStrategy().applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) {
+                headers.add(entry.getKey(), entry.getValue().toString());
+            }
+        }
+
+        client.post(endpoint.getPort(), endpoint.getHost(), endpoint.getPath())
+            .putHeaders(headers)
+            .sendBuffer(Buffer.buffer(payload), response -> {
+                HttpResponse<Buffer> result = response.result();
+
+                Message answer = new DefaultMessage(exchange.getContext());
+                answer.setHeader(Exchange.HTTP_RESPONSE_CODE, result.statusCode());
+
+                for (Map.Entry<String, String> entry : result.headers().entries()) {
+                    if (!endpoint.getHeaderFilterStrategy().applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange)) {
+                        KnativeHttpSupport.appendHeader(message.getHeaders(), entry.getKey(), entry.getValue());
+                    }
+                }
+
+                exchange.setMessage(answer);
+
+                if (response.failed() && endpoint.getThrowExceptionOnFailure()) {
+                    Exception cause = new HttpOperationFailedException(
+                        getURI(),
+                        result.statusCode(),
+                        result.statusMessage(),
+                        null,
+                        KnativeHttpSupport.asStringMap(answer.getHeaders()),
+                        ExchangeHelper.convertToType(exchange, String.class, answer.getBody())
+                    );
 
-        LOGGER.debug("Created worker: {} with options: {}", worker, options);
+                    exchange.setException(cause);
+                }
+
+                callback.done(false);
+            });
+
+        return false;
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        this.client = WebClient.create(vertx, clientOptions);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
 
-        if (worker != null && !worker.isShutdown()) {
-            LOGGER.debug("Shutting down worker: {}", worker);
-            worker.shutdown();
+        if (this.client != null) {
+            LOGGER.debug("Shutting down client: {}", client);
+            this.client.close();
+            this.client = null;
+        }
+    }
+
+    private String getURI() {
+        String p = getEndpoint().getPath();
+
+        if (p == null) {
+            p = KnativeHttp.DEFAULT_PATH;
+        } else if (!p.startsWith("/")) {
+            p = "/" + p;
         }
+
+        return String.format("http://%s:%d%s", getEndpoint().getHost(), getEndpoint().getPort(), p);
     }
 }
diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
new file mode 100644
index 0000000..a858f75
--- /dev/null
+++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.knative.http;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final  class KnativeHttpSupport {
+    private KnativeHttpSupport() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void appendHeader(Map<String, Object> headers, String key, Object value) {
+        if (headers.containsKey(key)) {
+            Object existing = headers.get(key);
+            List<Object> list;
+            if (existing instanceof List) {
+                list = (List<Object>) existing;
+            } else {
+                list = new ArrayList<>();
+                list.add(existing);
+            }
+            list.add(value);
+            value = list;
+        }
+
+        headers.put(key, value);
+    }
+
+    public static Map<String, String> asStringMap(Map<String, Object> map) {
+        return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    }
+}
diff --git a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index e0d1ab6..0b40284 100644
--- a/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.knative.http;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -29,7 +28,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 public class KnativeHttpTest {
 
@@ -51,7 +49,7 @@ public class KnativeHttpTest {
     }
 
     @AfterEach
-    public void after() throws Exception {
+    public void after() {
         if (this.context != null) {
             this.context.stop();
         }
@@ -65,150 +63,164 @@ public class KnativeHttpTest {
 
     @Test
     void testWithPaths() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                fromF("knative-http:0.0.0.0:%d/a/1", port)
-                    .routeId("r1")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:r1");
-                fromF("knative-http:0.0.0.0:%d/a/2", port)
-                    .routeId("r2")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:r2");
-                from("direct:start")
-                    .toD("undertow:http://localhost:" + port + "/a/${body}");
-                }
-            }
-        );
-
-        MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
-        m1.expectedMessageCount(1);
-        m1.expectedBodiesReceived("r1");
-
-        MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
-        m2.expectedMessageCount(1);
-        m2.expectedBodiesReceived("r2");
-
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d/a/1", port)
+                .routeId("r1")
+                .setBody().simple("${routeId}")
+                .to("mock:r1");
+            b.fromF("knative-http:0.0.0.0:%d/a/2", port)
+                .routeId("r2")
+                .setBody().simple("${routeId}")
+                .to("mock:r2");
+
+            b.from("direct:start")
+                .toD("undertow:http://localhost:" + port + "/a/${body}");
+        });
+
+        context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
+        context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
         context.start();
 
-        assertThat(
-            template.requestBody("direct:start", "1", String.class)
-        ).isEqualTo("r1");
-        assertThat(
-            template.requestBody("direct:start", "2", String.class)
-        ).isEqualTo("r2");
+        assertThat(template.requestBody("direct:start", "1", String.class)).isEqualTo("r1");
+        assertThat(template.requestBody("direct:start", "2", String.class)).isEqualTo("r2");
 
-        m1.assertIsSatisfied();
-        m2.assertIsSatisfied();
+        MockEndpoint.assertIsSatisfied(context);
     }
 
     @Test
     void testWithFilters() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
-                    .routeId("r1")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:r1");
-                fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
-                    .routeId("r2")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:r2");
-                fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
-                    .routeId("r3")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:r3");
-                from("direct:start")
-                    .setHeader("MyHeader").body()
-                    .toF("undertow:http://localhost:%d", port);
-                }
-            }
-        );
-
-        MockEndpoint m1 = context.getEndpoint("mock:r1", MockEndpoint.class);
-        m1.expectedMessageCount(1);
-        m1.expectedBodiesReceived("r1");
-
-        MockEndpoint m2 = context.getEndpoint("mock:r2", MockEndpoint.class);
-        m2.expectedMessageCount(1);
-        m2.expectedBodiesReceived("r2");
-
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h1", port)
+                .routeId("r1")
+                .setBody().simple("${routeId}")
+                .to("mock:r1");
+            b.fromF("knative-http:0.0.0.0:%d?filter.myheader=h2", port)
+                .routeId("r2")
+                .setBody().simple("${routeId}")
+                .to("mock:r2");
+            b.fromF("knative-http:0.0.0.0:%d?filter.myheader=t.*", port)
+                .routeId("r3")
+                .setBody().simple("${routeId}")
+                .to("mock:r3");
+
+            b.from("direct:start")
+                .setHeader("MyHeader").body()
+                .toF("undertow:http://localhost:%d", port);
+        });
+
+        context.getEndpoint("mock:r1", MockEndpoint.class).expectedMessageCount(1);
+        context.getEndpoint("mock:r2", MockEndpoint.class).expectedMessageCount(1);
         context.start();
 
-        assertThat(
-            template.requestBody("direct:start", "h1", String.class)
-        ).isEqualTo("r1");
-        assertThat(
-            template.requestBody("direct:start", "h2", String.class)
-        ).isEqualTo("r2");
-        assertThat(
-            template.requestBody("direct:start", "t1", String.class)
-        ).isEqualTo("r3");
-        assertThat(
-            template.requestBody("direct:start", "t2", String.class)
-        ).isEqualTo("r3");
-
-        m1.assertIsSatisfied();
-        m2.assertIsSatisfied();
+        assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+        assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+        assertThat(template.requestBody("direct:start", "t1", String.class)).isEqualTo("r3");
+        assertThat(template.requestBody("direct:start", "t2", String.class)).isEqualTo("r3");
+
+        MockEndpoint.assertIsSatisfied(context);
     }
 
     @Test
     void testWithRexFilters() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
-                    .routeId("r1")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class);
-                from("direct:start")
-                    .setHeader("MyHeader").body()
-                    .toF("undertow:http://localhost:%d", port);
-                }
-            }
-        );
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d?filter.MyHeader=h.*", port)
+                .routeId("r1")
+                .setBody().simple("${routeId}");
+
+            b.from("direct:start")
+                .setHeader("MyHeader").body()
+                .toF("undertow:http://localhost:%d", port);
+        });
+
+        context.start();
+
+        assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+        assertThat(template.request("direct:start", e -> e.getMessage().setBody("t1"))).satisfies(e -> {
+            assertThat(e.isFailed()).isTrue();
+            assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+        });
+    }
+
+    @Test
+    void testRemoveConsumer() throws Exception {
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
+                .routeId("r1")
+                .setBody().simple("${routeId}");
+            b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
+                .routeId("r2")
+                .setBody().simple("${routeId}");
+        });
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .setHeader("h").body()
+                .toF("undertow:http://localhost:%d", port);
+        });
 
         context.start();
 
-        assertThat(
-            template.requestBody("direct:start", "h1", String.class)
-        ).isEqualTo("r1");
-        assertThatThrownBy(
-            () -> template.requestBody("direct:start", "t1", String.class)
-        ).isInstanceOf(CamelExecutionException.class).hasCauseExactlyInstanceOf(HttpOperationFailedException.class);
+        assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+        assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
+
+        context.getRouteController().stopRoute("r2");
+
+        assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+            assertThat(e.isFailed()).isTrue();
+            assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+        });
+    }
+
+    @Test
+    void testAddConsumer() throws Exception {
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d?filter.h=h1", port)
+                .routeId("r1")
+                .setBody().simple("${routeId}");
+        });
+        RouteBuilder.addRoutes(context, b -> {
+            b.from("direct:start")
+                .setHeader("h").body()
+                .toF("undertow:http://localhost:%d", port);
+        });
+
+        context.start();
+
+        assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+        assertThat(template.request("direct:start", e -> e.getMessage().setBody("h2"))).satisfies(e -> {
+            assertThat(e.isFailed()).isTrue();
+            assertThat(e.getException()).isInstanceOf(HttpOperationFailedException.class);
+        });
+
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("knative-http:0.0.0.0:%d?filter.h=h2", port)
+                .routeId("r2")
+                .setBody().simple("${routeId}");
+        });
+
+        assertThat(template.requestBody("direct:start", "h1", String.class)).isEqualTo("r1");
+        assertThat(template.requestBody("direct:start", "h2", String.class)).isEqualTo("r2");
     }
 
     @Test
     void testInvokeEndpoint() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                fromF("undertow:http://0.0.0.0:%d", port)
-                    .routeId("endpoint")
-                    .setBody().simple("${routeId}")
-                    .convertBodyTo(String.class)
-                    .to("mock:endpoint");
-                from("direct:start")
-                    .toF("knative-http:0.0.0.0:%d", port);
-                }
-            }
-        );
+        RouteBuilder.addRoutes(context, b -> {
+            b.fromF("undertow:http://0.0.0.0:%d", port)
+                .routeId("endpoint")
+                .setBody().simple("${routeId}")
+                .to("mock:endpoint");
+
+            b.from("direct:start")
+                .toF("knative-http:0.0.0.0:%d", port);
+        });
 
         MockEndpoint mock = context.getEndpoint("mock:endpoint", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedBodiesReceived("endpoint");
         mock.expectedHeaderReceived("Host", "0.0.0.0");
+        mock.expectedMessageCount(1);
 
         context.start();
 
-        template.requestBody("direct:start", "1", String.class);
+        template.sendBody("direct:start", "1");
 
         mock.assertIsSatisfied();
     }
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
index 1376c92..cb17060 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java
@@ -96,7 +96,6 @@ public class CloudEventsV01Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event");
         mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -104,6 +103,7 @@ public class CloudEventsV01Test {
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -166,7 +166,6 @@ public class CloudEventsV01Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -174,9 +173,9 @@ public class CloudEventsV01Test {
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessageCount(1);
         mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock2.expectedHeaderReceived("CE-EventType", "my.type");
         mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
@@ -184,6 +183,7 @@ public class CloudEventsV01Test {
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
         mock2.expectedBodiesReceived("test2");
+        mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -238,7 +238,6 @@ public class CloudEventsV01Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -246,6 +245,7 @@ public class CloudEventsV01Test {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -301,7 +301,6 @@ public class CloudEventsV01Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -309,6 +308,7 @@ public class CloudEventsV01Test {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -387,22 +387,22 @@ public class CloudEventsV01Test {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessageCount(1);
         mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock1.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
         mock1.expectedHeaderReceived("CE-Source", "CE1");
         mock1.expectedBodiesReceived("test");
+        mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessageCount(1);
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
         mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
         mock2.expectedHeaderReceived("CE-Source", "CE2");
         mock2.expectedBodiesReceived("test");
+        mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
index 75bfc0f..f0d9607 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java
@@ -96,7 +96,6 @@ public class CloudEventsV02Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event");
         mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
@@ -104,6 +103,7 @@ public class CloudEventsV02Test {
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -166,7 +166,6 @@ public class CloudEventsV02Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
         mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
@@ -174,9 +173,9 @@ public class CloudEventsV02Test {
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessageCount(1);
         mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock2.expectedHeaderReceived("ce-type", "my.type");
         mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
@@ -184,6 +183,7 @@ public class CloudEventsV02Test {
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
         mock2.expectedBodiesReceived("test2");
+        mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -238,7 +238,6 @@ public class CloudEventsV02Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
         mock.expectedHeaderReceived("ce-id", "myEventID");
@@ -246,6 +245,7 @@ public class CloudEventsV02Test {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -301,7 +301,6 @@ public class CloudEventsV02Test {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
         mock.expectedHeaderReceived("ce-id", "myEventID");
@@ -309,6 +308,7 @@ public class CloudEventsV02Test {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -387,22 +387,22 @@ public class CloudEventsV02Test {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessageCount(1);
         mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock1.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock1.expectedHeaderReceived("ce-type", "org.apache.camel.event");
         mock1.expectedHeaderReceived("ce-id", "myEventID1");
         mock1.expectedHeaderReceived("ce-source", "CE1");
         mock1.expectedBodiesReceived("test");
+        mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessageCount(1);
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
         mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
         mock2.expectedHeaderReceived("ce-type", "org.apache.camel.event");
         mock2.expectedHeaderReceived("ce-id", "myEventID2");
         mock2.expectedHeaderReceived("ce-source", "CE2");
         mock2.expectedBodiesReceived("test");
+        mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
index da81f6a..996d43e 100644
--- a/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
+++ b/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeComponentTest.java
@@ -31,7 +31,6 @@ import org.apache.camel.component.knative.http.KnativeHttpEndpoint;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.support.DefaultHeaderFilterStrategy;
 import org.apache.camel.test.AvailablePortFinder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -348,7 +347,6 @@ public class KnativeComponentTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
@@ -356,13 +354,9 @@ public class KnativeComponentTest {
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
-        context.createProducerTemplate().send(
-            "direct:source",
-            e -> {
-                e.getIn().setBody("test");
-            }
-        );
+        context.createProducerTemplate().sendBody("direct:source", "test");
 
         mock.assertIsSatisfied();
     }
@@ -403,7 +397,6 @@ public class KnativeComponentTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -411,6 +404,7 @@ public class KnativeComponentTest {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -467,7 +461,6 @@ public class KnativeComponentTest {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedMessageCount(1);
         mock.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
         mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock.expectedHeaderReceived("CE-EventID", "myEventID");
@@ -475,6 +468,7 @@ public class KnativeComponentTest {
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
         mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock.expectedBodiesReceived("test");
+        mock.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -553,22 +547,22 @@ public class KnativeComponentTest {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessageCount(1);
         mock1.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock1.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
         mock1.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock1.expectedHeaderReceived("CE-EventID", "myEventID1");
         mock1.expectedHeaderReceived("CE-Source", "CE1");
         mock1.expectedBodiesReceived("test");
+        mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessageCount(1);
         mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
         mock2.expectedHeaderReceived("CE-CloudEventsVersion", "0.1");
         mock2.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
         mock2.expectedHeaderReceived("CE-EventID", "myEventID2");
         mock2.expectedHeaderReceived("CE-Source", "CE2");
         mock2.expectedBodiesReceived("test");
+        mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
@@ -596,112 +590,4 @@ public class KnativeComponentTest {
         mock1.assertIsSatisfied();
         mock2.assertIsSatisfied();
     }
-
-    @Test
-    void testDefaultHeadersFilter() throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-
-        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
-            new KnativeEnvironment.KnativeServiceDefinition(
-                Knative.Type.endpoint,
-                Knative.Protocol.http,
-                "myEndpoint",
-                "localhost",
-                port,
-                KnativeSupport.mapOf(
-                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
-                    Knative.CONTENT_TYPE, "text/plain"
-                ))
-        ));
-
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
-        component.setEnvironment(env);
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:source")
-                    .setHeader("CamelHeader")
-                        .constant("CamelHeaderValue")
-                    .setHeader("MyHeader")
-                        .constant("MyHeaderValue")
-                    .to("knative:endpoint/myEndpoint")
-                    .to("mock:source");
-
-                fromF("undertow:http://localhost:%d", port)
-                    .setBody().constant("test");
-            }
-        });
-
-        context.start();
-
-        MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class);
-        mock.expectedMessageCount(1);
-
-        context.createProducerTemplate().sendBody("direct:source", "test");
-
-        mock.assertIsSatisfied();
-
-
-        assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("CamelHeader");
-        assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("MyHeader", "MyHeaderValue");
-    }
-
-    @Test
-    void testCustomHeadersFilter() throws Exception {
-        final int port = AvailablePortFinder.getNextAvailable();
-
-        KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
-            new KnativeEnvironment.KnativeServiceDefinition(
-                Knative.Type.endpoint,
-                Knative.Protocol.http,
-                "myEndpoint",
-                "localhost",
-                port,
-                KnativeSupport.mapOf(
-                    Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
-                    Knative.CONTENT_TYPE, "text/plain"
-                ))
-        ));
-
-        KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
-        component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
-        component.setEnvironment(env);
-
-        DefaultHeaderFilterStrategy hfs = new DefaultHeaderFilterStrategy();
-        hfs.setOutFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*");
-        hfs.setInFilterPattern("(?i)(My)[\\.|a-z|A-z|0-9]*");
-
-
-        context.getRegistry().bind("myFilterStrategy", hfs);
-
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:source")
-                    .setHeader("CamelHeader")
-                        .constant("CamelHeaderValue")
-                    .setHeader("MyHeader")
-                        .constant("MyHeaderValue")
-                    .to("knative:endpoint/myEndpoint?transport.headerFilterStrategy=#myFilterStrategy")
-                    .to("mock:source");
-
-                fromF("undertow:http://localhost:%d?headerFilterStrategy=#myFilterStrategy", port)
-                    .setBody().constant("test");
-            }
-        });
-
-        context.start();
-
-        MockEndpoint mock = context.getEndpoint("mock:source", MockEndpoint.class);
-        mock.expectedMessageCount(1);
-
-        context.createProducerTemplate().sendBody("direct:source", "test");
-
-        mock.assertIsSatisfied();
-
-        assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).doesNotContainKey("MyHeader");
-        assertThat(mock.getExchanges().get(0).getMessage().getHeaders()).containsEntry("CamelHeader", "CamelHeaderValue");
-    }
 }
diff --git a/pom.xml b/pom.xml
index e9c0068..9458562 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
         <immutables.version>2.7.5</immutables.version>
         <semver4j.version>3.0.0</semver4j.version>
         <undertow.version>1.4.26.Final</undertow.version>
+        <vertx.version>3.8.0</vertx.version>
         <graalvm.version>19.1.1</graalvm.version>
         <gmavenplus-plugin.version>1.7.1</gmavenplus-plugin.version>
         <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>


Mime
View raw message