camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nferr...@apache.org
Subject [2/2] camel git commit: CAMEL-10612: API refactoring and documentation
Date Mon, 06 Feb 2017 00:03:46 GMT
CAMEL-10612: API refactoring and documentation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c441ec2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c441ec2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c441ec2

Branch: refs/heads/master
Commit: 9c441ec2f74f84a07a57e201977dbf763f9b044e
Parents: baf4ae2
Author: Nicola Ferraro <ni.ferraro@gmail.com>
Authored: Mon Feb 6 01:00:36 2017 +0100
Committer: Nicola Ferraro <ni.ferraro@gmail.com>
Committed: Mon Feb 6 01:03:29 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   | 165 +++++++++++++++++--
 .../api/CamelReactiveStreamsService.java        |  64 ++++---
 .../engine/CamelReactiveStreamsServiceImpl.java |  89 ++++++----
 .../BackpressurePublisherRoutePolicyTest.java   |   4 +-
 .../streams/BackpressureStrategyTest.java       |   8 +-
 .../streams/BackpressureSubscriberTest.java     |   6 +-
 .../reactive/streams/BasicPublisherTest.java    |   8 +-
 .../reactive/streams/BasicSubscriberTest.java   |   6 +-
 .../streams/ConcurrentConsumersTest.java        |   4 +-
 .../reactive/streams/DirectClientAPITest.java   |  46 +++++-
 .../reactive/streams/EventTypeTest.java         |  10 +-
 .../reactive/streams/ExchangeRequestTest.java   |   6 +-
 .../streams/PublisherTypeConversionTest.java    |   6 +-
 .../platforms/AbstractPlatformTestSupport.java  |   4 +-
 .../support/ReactiveStreamsTestService.java     |  42 +++--
 ...amelPublisherConversionVerificationTest.java |   2 +-
 .../tck/CamelPublisherVerificationTest.java     |   2 +-
 ...melSubscriberConversionVerificationTest.java |   2 +-
 .../tck/CamelSubscriberVerificationTest.java    |   2 +-
 .../streams/BasicCamelToReactorExample.java     |   4 +-
 .../BasicCamelToReactorInOutExample.java        |   2 +-
 .../streams/BasicReactorToCamelExample.java     |   2 +-
 .../BasicReactorToCamelInOutExample.java        |   2 +-
 .../reactive/streams/ClientAPIRestExample.java  |   4 +-
 .../streams/ClientAPIWorkflowExample.java       |   6 +-
 25 files changed, 362 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index c3ba5de..40bea33 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -2,13 +2,18 @@
 
 *Available as of Camel version 2.19.0*
 
-The *reactive-streams:* component allows you exchanging messages with reactive
+The *reactive-streams:* component allows you to exchange messages with reactive
 stream processing libraries compatible with the
 http://www.reactive-streams.org/[reactive streams] standard.
 
 The component supports backpressure and has been tested using the reactive streams technology
 compatibility kit (TCK).
 
+The Camel module provides a *reactive-streams* component that provides a mechanism to define incoming and
+outgoing streams within Camel routes, and a direct client API that allows using Camel endpoints
+ directly into any external reactive framework (Camel uses an internal implementation of the reactive streams
+ _Publisher_ and _Subscriber_, so it's not tied to any specific framework).
+
 Maven users will need to add the following dependency to their `pom.xml`
 for this component:
 
@@ -80,11 +85,15 @@ The Reactive Streams component supports 11 endpoint options which are listed bel
 
 ### Usage
 
-External reactive streams compatible systems can be configured as Subscribers or Publishers,
-to consume or send events to Camel routes respectively.
+The library is aimed to support all the communication modes needed by an application to interact with Camel data:
+
+* *Get* data from Camel routes (In-Only from Camel)
+* *Send* data to Camel routes (In-Only towards Camel)
+* *Request* a transformation to a Camel route (In-Out towards Camel)
+* *Process* data flowing from a Camel route using a reactive processing step (In-Out from Camel)
 
-### Subscribing to Camel exchanges
-In order to subscribe to exchanges flowing in a Camel route, exchanges should be redirected to
+### Getting data from Camel
+In order to subscribe to data flowing from a Camel route, exchanges should be redirected to
 a named stream, like in the following snippet:
 
 [source,java]
@@ -104,10 +113,10 @@ The stream can be accessed using the `CamelReactiveStreams` utility class.
 CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
 
 // Getting a stream of exchanges
-Publisher<Exchange> exchanges = camel.getPublisher("numbers");
+Publisher<Exchange> exchanges = camel.fromStream("numbers");
 
 // Getting a stream of Integers (using Camel standard conversion system)
-Publisher<Integer> numbers = camel.getPublisher("numbers", Integer.class);
+Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
 ---------------------------------------------------------
 
 The stream can be used easily with any reactive streams compatible library.
@@ -123,7 +132,26 @@ Flowable.fromPublisher(integers)
 
 The example prints all numbers generated by Camel into `System.out`.
 
-### Publishing to a Camel route
+#### Getting data from Camel using the direct API
+
+For short Camel routes and for users that prefer defining the whole processing flow
+using functional constructs of the reactive framework (without using the Camel DSL at all),
+streams can also be defined using Camel URIs.
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+// Get a stream from all the files in a directory
+Publisher<String> files = camel.from("file:folder", String.class);
+
+// Use the stream in RxJava2
+Flowable.fromPublisher(files)
+    .doOnNext(System.out::println)
+    .subscribe();
+---------------------------------------------------------
+
+### Sending data to Camel
 When an external library needs to push events into a Camel route, the Reactive Streams
 endpoint must be set as consumer.
 
@@ -139,7 +167,7 @@ A handle to the `elements` stream can be obtained from the `CamelReactiveStreams
 ---------------------------------------------------------
 CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
 
-Subscriber<String> elements = camel.getSubscriber("elements", String.class);
+Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
 ---------------------------------------------------------
 
 The subscriber can be used to push events to the Camel route that consumes from the `elements` stream.
@@ -156,7 +184,122 @@ Flowable.interval(1, TimeUnit.SECONDS)
 
 String items are generated every second by RxJava in the example and they are pushed into the Camel route defined above.
 
-### Controlling Backpressure (producer side)
+#### Sending data to Camel using the direct API
+
+Also in this case, the direct API can be used to obtain a Camel subscriber from an endpoint URI.
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+// Send two strings to the "seda:queue" endpoint
+Flowable.just("hello", "world")
+    .subscribe(camel.subscriber("seda:queue", String.class));
+---------------------------------------------------------
+
+### Request a transformation to Camel
+
+Routes defined in some Camel DSL can be used within a reactive stream framework to perform a
+specific transformation (the same mechanism can be also used to eg. just send data to a _http_ endpoint and continue).
+
+The following snippet shows how RxJava functional code can request the task of loading and marshalling files to Camel.
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+// Process files starting from their names
+Flowable.just(new File("file1.txt"), new File("file2.txt"))
+    .flatMap(file -> camel.toStream("readAndMarshal", File.class))
+    // other steps
+    .subscribe();
+---------------------------------------------------------
+
+In order this to work, a route like the following should be defined in the Camel context:
+
+[source,java]
+---------------------------------------------------------
+from("reactive-streams:readAndMarshal")
+.marshal() // ... other details
+---------------------------------------------------------
+
+#### Request a transformation to Camel using the direct API
+
+An alternative approach consists in using the URI endpoints directly in the reactive flow:
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+// Process files starting from their names
+Flowable.just(new File("file1.txt"), new File("file2.txt"))
+    .flatMap(file -> camel.to("direct:process", File.class))
+    // other steps
+    .subscribe();
+---------------------------------------------------------
+
+When using the _to()_ method instead of the _toStream_, there is no need to define the
+route using "reactive-streams:" endpoints (although they are used under the hood).
+
+In this case, the Camel transformation can be just:
+
+[source,java]
+---------------------------------------------------------
+from("direct:process")
+.marshal() // ... other details
+---------------------------------------------------------
+
+
+### Process Camel data into the reactive framework
+
+While a reactive streams _Publisher_ allows exchanging data in a unidirectional way,
+Camel routes often use a in-out exchange pattern (eg. to define REST endpoints and, in general,
+where a reply is needed for each request).
+
+In these circumstances, users can add a reactive processing step to the flow, to enhance a Camel route or to
+define the entire transformation using the reactive framework.
+
+For example, given the following route:
+
+[source,java]
+---------------------------------------------------------
+from("timer:clock")
+.setBody().header(Exchange.TIMER_COUNTER)
+.to("direct:reactive")
+.log("Continue with Camel route... n=${body}");
+---------------------------------------------------------
+
+A reactive processing step can be associated to the "direct:reactive" endpoint:
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+camel.process("direct:reactive", Integer.class, items ->
+    Flowable.fromPublisher(items) // RxJava2
+        .map(n -> -n)); // make every number negative
+---------------------------------------------------------
+
+Data flowing in the Camel route will be processed by the external reactive
+framework then continue the processing flow inside Camel.
+
+This mechanism can also be used to define a In-Out exchange in a completely
+reactive way.
+
+[source,java]
+---------------------------------------------------------
+CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+// requires a rest-capable Camel component
+camel.process("rest:get:orders", exchange ->
+                    Flowable.fromPublisher(exchange)
+                            .flatMap(ex -> allOrders())); // retrieve orders asynchronously
+---------------------------------------------------------
+
+See Camel examples (*camel-example-reactive-streams*) for details.
+
+### Advanced Topics
+#### Controlling Backpressure (producer side)
 
 When routing Camel exchanges to an external subscriber, backpressure is handled by an internal buffer that caches exchanges
 before delivering them.
@@ -206,7 +349,7 @@ from("direct:thermostat")
 
 When the `LATEST` backpressure strategy is used, only the last exchange received from the route is kept by the publisher, while older data is discarded (other options are available).
 
-### Controlling Backpressure (consumer side)
+#### Controlling Backpressure (consumer side)
 
 When Camel consumes items from a reactive-streams publisher, the maximum number of inflight exchanges can be set as endpoint option.
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 1c1ea40..cbd9fda 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -42,7 +42,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param name the stream name
      * @return the stream publisher
      */
-    Publisher<Exchange> getPublisher(String name);
+    Publisher<Exchange> fromStream(String name);
 
     /**
      * Returns the publisher associated to the given stream name.
@@ -55,7 +55,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the type of items emitted by the publisher
      * @return the publisher associated to the stream
      */
-    <T> Publisher<T> getPublisher(String name, Class<T> type);
+    <T> Publisher<T> fromStream(String name, Class<T> type);
 
     /**
      * Returns the subscriber associated to the given stream name.
@@ -64,7 +64,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param name the stream name
      * @return the subscriber associated with the stream
      */
-    Subscriber<Exchange> getSubscriber(String name);
+    Subscriber<Exchange> streamSubscriber(String name);
 
     /**
      * Returns the subscriber associated to the given stream name.
@@ -77,7 +77,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the type of items accepted by the subscriber
      * @return the subscriber associated with the stream
      */
-    <T> Subscriber<T> getSubscriber(String name, Class<T> type);
+    <T> Subscriber<T> streamSubscriber(String name, Class<T> type);
 
     /**
      * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
@@ -87,18 +87,18 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param data the data to push
      * @return a publisher with the resulting exchange
      */
-    Publisher<Exchange> request(String name, Object data);
+    Publisher<Exchange> toStream(String name, Object data);
 
     /**
      * Returns a function that pushes data into the specified Camel stream and
      * returns a Publisher (mono) holding the resulting exchange or an error.
      *
-     * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object)}.
+     * This is a curryied version of {@link CamelReactiveStreamsService#toStream(String, Object)}.
      *
      * @param name the stream name
      * @return a function that returns a publisher with the resulting exchange
      */
-    Function<?, ? extends Publisher<Exchange>> request(String name);
+    Function<?, ? extends Publisher<Exchange>> toStream(String name);
 
     /**
      * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
@@ -110,20 +110,20 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the generic type of the resulting Publisher
      * @return a publisher with the resulting data
      */
-    <T> Publisher<T> request(String name, Object data, Class<T> type);
+    <T> Publisher<T> toStream(String name, Object data, Class<T> type);
 
     /**
      * Returns a function that pushes data into the specified Camel stream and
      * returns a Publisher (mono) holding the exchange output or an error.
      *
-     * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object, Class)}.
+     * This is a curryied version of {@link CamelReactiveStreamsService#toStream(String, Object, Class)}.
      *
      * @param name the stream name
      * @param type  the type to which the output should be converted
      * @param <T> the generic type of the resulting Publisher
      * @return a function that returns a publisher with the resulting data
      */
-    <T> Function<Object, Publisher<T>> request(String name, Class<T> type);
+    <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type);
 
     /*
      * Direct client API methods
@@ -138,7 +138,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param uri the consumer uri
      * @return the publisher associated to the uri
      */
-    Publisher<Exchange> publishURI(String uri);
+    Publisher<Exchange> from(String uri);
 
     /**
      * Creates a new stream of the given type from the endpoint URI (used as Camel Consumer) and returns
@@ -151,7 +151,31 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the type to which Camel should convert exchanges to
      * @return the publisher associated to the uri
      */
-    <T> Publisher<T> publishURI(String uri, Class<T> type);
+    <T> Publisher<T> from(String uri, Class<T> type);
+
+    /**
+     * Creates a new route that pushes data to the endpoint URI and returns
+     * the associated {@code Subscriber}.
+     *
+     * This method always create a new stream.
+     *
+     * @param uri the target uri
+     * @return the subscriber associated to the uri
+     */
+    Subscriber<Exchange> subscriber(String uri);
+
+    /**
+     * Creates a new route that pushes data to the endpoint URI and returns
+     * the associated {@code Subscriber}.
+     *
+     * This method always create a new stream.
+     *
+     * @param uri the target uri
+     * @param type the type of items that the subscriber can receive
+     * @param <T> the type from which Camel should convert data to exchanges
+     * @return the subscriber associated to the uri
+     */
+    <T> Subscriber<T> subscriber(String uri, Class<T> type);
 
     /**
      * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
@@ -161,7 +185,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param data the data to push
      * @return a publisher with the resulting exchange
      */
-    Publisher<Exchange> requestURI(String uri, Object data);
+    Publisher<Exchange> to(String uri, Object data);
 
     /**
      * Creates a new route that uses the endpoint URI as producer, and returns a
@@ -169,12 +193,12 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * {@code Publisher} that holds the resulting exchange or the error.
      *
      *
-     * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object)}.
+     * This is a curryied version of {@link CamelReactiveStreamsService#to(String, Object)}.
      *
      * @param uri the producer uri
      * @return a function that returns a publisher with the resulting exchange
      */
-    Function<Object, Publisher<Exchange>> requestURI(String uri);
+    Function<Object, Publisher<Exchange>> to(String uri);
 
     /**
      * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
@@ -186,21 +210,21 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the generic type of the resulting Publisher
      * @return a publisher with the resulting data
      */
-    <T> Publisher<T> requestURI(String uri, Object data, Class<T> type);
+    <T> Publisher<T> to(String uri, Object data, Class<T> type);
 
     /**
      * Creates a new route that uses the endpoint URI as producer, and returns a
      * function that pushes the data into the route and returns the
      * {@code Publisher} that holds the exchange output or an error.
      *
-     * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object, Class)}.
+     * This is a curryied version of {@link CamelReactiveStreamsService#to(String, Object, Class)}.
      *
      * @param uri the producer uri
      * @param type  the type to which the output should be converted
      * @param <T> the generic type of the resulting Publisher
      * @return a function that returns a publisher with the resulting data
      */
-    <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type);
+    <T> Function<Object, Publisher<T>> to(String uri, Class<T> type);
 
     /**
      * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
@@ -213,7 +237,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param uri the uri where the processor should be attached
      * @param processor the reactive processor
      */
-    void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor);
+    void process(String uri, Function<? super Publisher<Exchange>, ?> processor);
 
     /**
      * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
@@ -228,7 +252,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      * @param <T> the generic type of the Publisher that should be processed
      * @param processor the reactive processor
      */
-    <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor);
+    <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor);
 
     /*
      * Methods for Camel producers.

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index eb3a767..15a10e4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -74,32 +74,32 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public Publisher<Exchange> getPublisher(String name) {
+    public Publisher<Exchange> fromStream(String name) {
         return new UnwrappingPublisher<>(getPayloadPublisher(name));
     }
 
     @SuppressWarnings("unchecked")
-    public <T> Publisher<T> getPublisher(String name, Class<T> cls) {
+    public <T> Publisher<T> fromStream(String name, Class<T> cls) {
         if (Exchange.class.equals(cls)) {
-            return (Publisher<T>) getPublisher(name);
+            return (Publisher<T>) fromStream(name);
         }
 
-        return new ConvertingPublisher<T>(getPublisher(name), cls);
+        return new ConvertingPublisher<T>(fromStream(name), cls);
     }
 
     @Override
-    public CamelSubscriber getSubscriber(String name) {
+    public CamelSubscriber streamSubscriber(String name) {
         subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
         return subscribers.get(name);
     }
 
     @SuppressWarnings("unchecked")
-    public <T> Subscriber<T> getSubscriber(String name, Class<T> type) {
+    public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
         if (Exchange.class.equals(type)) {
-            return (Subscriber<T>) getSubscriber(name);
+            return (Subscriber<T>) streamSubscriber(name);
         }
 
-        return new ConvertingSubscriber<T>(getSubscriber(name), getCamelContext());
+        return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext());
     }
 
     @Override
@@ -109,23 +109,23 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public Publisher<Exchange> request(String name, Object data) {
+    public Publisher<Exchange> toStream(String name, Object data) {
         Exchange exchange = convertToExchange(data);
         return doRequest(name, exchange);
     }
 
     @Override
-    public Function<?, ? extends Publisher<Exchange>> request(String name) {
-        return data -> request(name, data);
+    public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
+        return data -> toStream(name, data);
     }
 
     @Override
-    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
-        return new ConvertingPublisher<>(request(name, data), type);
+    public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
+        return new ConvertingPublisher<>(toStream(name, data), type);
     }
 
     protected Publisher<Exchange> doRequest(String name, Exchange data) {
-        ReactiveStreamsConsumer consumer = getSubscriber(name).getConsumer();
+        ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer();
         if (consumer == null) {
             throw new IllegalStateException("No consumers attached to the stream " + name);
         }
@@ -155,8 +155,8 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
-        return data -> request(name, data, type);
+    public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
+        return data -> toStream(name, data, type);
     }
 
     private CamelPublisher getPayloadPublisher(String name) {
@@ -165,7 +165,7 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public Publisher<Exchange> publishURI(String uri) {
+    public Publisher<Exchange> from(String uri) {
         publishedUriToStream.computeIfAbsent(uri, u -> {
             try {
                 String uuid = context.getUuidGenerator().generateUuid();
@@ -182,16 +182,39 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
                 throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
             }
         });
-        return getPublisher(publishedUriToStream.get(uri));
+        return fromStream(publishedUriToStream.get(uri));
     }
 
     @Override
-    public <T> Publisher<T> publishURI(String uri, Class<T> type) {
-        return new ConvertingPublisher<T>(publishURI(uri), type);
+    public <T> Publisher<T> from(String uri, Class<T> type) {
+        return new ConvertingPublisher<T>(from(uri), type);
     }
 
     @Override
-    public Publisher<Exchange> requestURI(String uri, Object data) {
+    public Subscriber<Exchange> subscriber(String uri) {
+        try {
+            String uuid = context.getUuidGenerator().generateUuid();
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("reactive-streams:" + uuid)
+                            .to(uri);
+                }
+            }.addRoutesToCamelContext(context);
+
+            return streamSubscriber(uuid);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e);
+        }
+    }
+
+    @Override
+    public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
+        return new ConvertingSubscriber<T>(subscriber(uri), context);
+    }
+
+    @Override
+    public Publisher<Exchange> to(String uri, Object data) {
         requestedUriToStream.computeIfAbsent(uri, u -> {
             try {
                 String uuid = context.getUuidGenerator().generateUuid();
@@ -208,27 +231,27 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
                 throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
             }
         });
-        return request(requestedUriToStream.get(uri), data);
+        return toStream(requestedUriToStream.get(uri), data);
     }
 
     @Override
-    public Function<Object, Publisher<Exchange>> requestURI(String uri) {
-        return data -> requestURI(uri, data);
+    public Function<Object, Publisher<Exchange>> to(String uri) {
+        return data -> to(uri, data);
     }
 
     @Override
-    public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
-        return new ConvertingPublisher<T>(requestURI(uri, data), type);
+    public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
+        return new ConvertingPublisher<T>(to(uri, data), type);
     }
 
     @Override
-    public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
-        return data -> requestURI(uri, data, type);
+    public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
+        return data -> to(uri, data, type);
     }
 
 
     @Override
-    public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+    public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
         try {
             new RouteBuilder() {
                 @Override
@@ -248,18 +271,18 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     @Override
-    public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
-        processFromURI(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
+    public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+        process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
     }
 
     @Override
     public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-        getSubscriber(name).attachConsumer(consumer);
+        streamSubscriber(name).attachConsumer(consumer);
     }
 
     @Override
     public void detachCamelConsumer(String name) {
-        getSubscriber(name).detachConsumer();
+        streamSubscriber(name).detachConsumer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
index 81d03bc..06d0263 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java
@@ -58,7 +58,7 @@ public class BackpressurePublisherRoutePolicyTest extends CamelTestSupport {
 
         CountDownLatch receptionLatch = new CountDownLatch(35);
 
-        Publisher<Exchange> pub = CamelReactiveStreams.get(context()).getPublisher("pub", Exchange.class);
+        Publisher<Exchange> pub = CamelReactiveStreams.get(context()).fromStream("pub", Exchange.class);
         TestSubscriber<Exchange> subscriber = new TestSubscriber<Exchange>() {
             @Override
             public void onNext(Exchange o) {
@@ -117,7 +117,7 @@ public class BackpressurePublisherRoutePolicyTest extends CamelTestSupport {
 
         CountDownLatch receptionLatch = new CountDownLatch(35);
 
-        Publisher<Exchange> pub = CamelReactiveStreams.get(context()).getPublisher("pub", Exchange.class);
+        Publisher<Exchange> pub = CamelReactiveStreams.get(context()).fromStream("pub", Exchange.class);
         TestSubscriber<Exchange> subscriber = new TestSubscriber<Exchange>() {
             @Override
             public void onNext(Exchange o) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
index 32e9cab..20f4119 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureStrategyTest.java
@@ -43,7 +43,7 @@ public class BackpressureStrategyTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Flowable<Integer> integers = Flowable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("integers", Integer.class));
+        Flowable<Integer> integers = Flowable.fromPublisher(CamelReactiveStreams.get(context).fromStream("integers", Integer.class));
 
         ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
         CountDownLatch latch = new CountDownLatch(1);
@@ -94,7 +94,7 @@ public class BackpressureStrategyTest extends CamelTestSupport {
             }
         };
         subscriber.setInitiallyRequested(1);
-        CamelReactiveStreams.get(context).getPublisher("integers", Integer.class).subscribe(subscriber);
+        CamelReactiveStreams.get(context).fromStream("integers", Integer.class).subscribe(subscriber);
 
         context().start();
 
@@ -140,7 +140,7 @@ public class BackpressureStrategyTest extends CamelTestSupport {
             }
         };
         subscriber.setInitiallyRequested(1);
-        CamelReactiveStreams.get(context).getPublisher("integers", Integer.class).subscribe(subscriber);
+        CamelReactiveStreams.get(context).fromStream("integers", Integer.class).subscribe(subscriber);
 
         context().start();
 
@@ -182,7 +182,7 @@ public class BackpressureStrategyTest extends CamelTestSupport {
             }
         };
         subscriber.setInitiallyRequested(1);
-        CamelReactiveStreams.get(context).getPublisher("integers", Integer.class).subscribe(subscriber);
+        CamelReactiveStreams.get(context).fromStream("integers", Integer.class).subscribe(subscriber);
 
         context().start();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureSubscriberTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureSubscriberTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureSubscriberTest.java
index edcad30..a2723e7 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureSubscriberTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressureSubscriberTest.java
@@ -38,7 +38,7 @@ public class BackpressureSubscriberTest extends CamelTestSupport {
         long start = System.currentTimeMillis();
         Observable.range(0, 10)
                 .toFlowable(BackpressureStrategy.BUFFER)
-                .subscribe(CamelReactiveStreams.get(context).getSubscriber("slowNumbers", Integer.class));
+                .subscribe(CamelReactiveStreams.get(context).streamSubscriber("slowNumbers", Integer.class));
 
         MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
         endpoint.expectedMessageCount(10);
@@ -56,7 +56,7 @@ public class BackpressureSubscriberTest extends CamelTestSupport {
         long start = System.currentTimeMillis();
         Observable.range(0, 2)
                 .toFlowable(BackpressureStrategy.BUFFER)
-                .subscribe(CamelReactiveStreams.get(context).getSubscriber("slowerNumbers", Integer.class));
+                .subscribe(CamelReactiveStreams.get(context).streamSubscriber("slowerNumbers", Integer.class));
 
         MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
         endpoint.expectedMessageCount(2);
@@ -73,7 +73,7 @@ public class BackpressureSubscriberTest extends CamelTestSupport {
 
         long start = System.currentTimeMillis();
         Flowable.range(0, 40)
-                .subscribe(CamelReactiveStreams.get(context).getSubscriber("parallelSlowNumbers", Integer.class));
+                .subscribe(CamelReactiveStreams.get(context).streamSubscriber("parallelSlowNumbers", Integer.class));
 
         MockEndpoint endpoint = getMockEndpoint("mock:endpoint");
         endpoint.expectedMessageCount(40);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
index 11910dc..c425c37 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicPublisherTest.java
@@ -48,7 +48,7 @@ public class BasicPublisherTest extends CamelTestSupport {
         CountDownLatch latch = new CountDownLatch(num);
         List<Integer> recv = new LinkedList<>();
 
-        Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("pub", Integer.class))
+        Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("pub", Integer.class))
                 .doOnNext(recv::add)
                 .doOnNext(n -> latch.countDown())
                 .subscribe();
@@ -75,14 +75,14 @@ public class BasicPublisherTest extends CamelTestSupport {
         }.addRoutesToCamelContext(context);
 
         CountDownLatch latch1 = new CountDownLatch(5);
-        Disposable disp1 = Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("unbounded", Integer.class))
+        Disposable disp1 = Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("unbounded", Integer.class))
                 .subscribe(n -> latch1.countDown());
 
         context.start();
 
         // Add another subscription
         CountDownLatch latch2 = new CountDownLatch(5);
-        Disposable disp2 = Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("unbounded", Integer.class))
+        Disposable disp2 = Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("unbounded", Integer.class))
                 .subscribe(n -> latch2.countDown());
 
         assertTrue(latch1.await(5, TimeUnit.SECONDS));
@@ -97,7 +97,7 @@ public class BasicPublisherTest extends CamelTestSupport {
 
         // Add another subscription
         CountDownLatch latch3 = new CountDownLatch(5);
-        Disposable disp3 = Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("unbounded", Integer.class))
+        Disposable disp3 = Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("unbounded", Integer.class))
                 .subscribe(n -> latch3.countDown());
 
         assertTrue(latch3.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicSubscriberTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicSubscriberTest.java
index ba7bea8..567cd3c 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicSubscriberTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BasicSubscriberTest.java
@@ -56,9 +56,9 @@ public class BasicSubscriberTest extends CamelTestSupport {
     @SuppressWarnings("unchecked")
     protected void doPostSetup() throws Exception {
 
-        Subscriber<Integer> sub = CamelReactiveStreams.get(context()).getSubscriber("sub", Integer.class);
-        Subscriber<Integer> sub2 = CamelReactiveStreams.get(context()).getSubscriber("sub2", Integer.class);
-        Publisher<Integer> pub = CamelReactiveStreams.get(context()).getPublisher("pub", Integer.class);
+        Subscriber<Integer> sub = CamelReactiveStreams.get(context()).streamSubscriber("sub", Integer.class);
+        Subscriber<Integer> sub2 = CamelReactiveStreams.get(context()).streamSubscriber("sub2", Integer.class);
+        Publisher<Integer> pub = CamelReactiveStreams.get(context()).fromStream("pub", Integer.class);
 
         pub.subscribe(sub);
         pub.subscribe(sub2);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ConcurrentConsumersTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ConcurrentConsumersTest.java
index 95f6474..5309790 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ConcurrentConsumersTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ConcurrentConsumersTest.java
@@ -41,7 +41,7 @@ public class ConcurrentConsumersTest extends CamelTestSupport {
 
         Observable.intervalRange(0, 1000, 0, 300, TimeUnit.MICROSECONDS)
                 .toFlowable(BackpressureStrategy.BUFFER)
-                .subscribe(CamelReactiveStreams.get(context()).getSubscriber("singleConsumer", Long.class));
+                .subscribe(CamelReactiveStreams.get(context()).streamSubscriber("singleConsumer", Long.class));
 
         MockEndpoint endpoint = getMockEndpoint("mock:singleBucket");
         endpoint.expectedMessageCount(1000);
@@ -69,7 +69,7 @@ public class ConcurrentConsumersTest extends CamelTestSupport {
 
         Observable.intervalRange(0, 1000, 0, 300, TimeUnit.MICROSECONDS)
                 .toFlowable(BackpressureStrategy.BUFFER)
-                .subscribe(CamelReactiveStreams.get(context()).getSubscriber("multipleConsumers", Long.class));
+                .subscribe(CamelReactiveStreams.get(context()).streamSubscriber("multipleConsumers", Long.class));
 
         MockEndpoint endpoint = getMockEndpoint("mock:multipleBucket");
         endpoint.expectedMessageCount(1000);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
index 3663476..06bf882 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
@@ -36,7 +36,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
     @Test
     public void testFromDirect() throws Exception {
 
-        Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+        Publisher<Integer> data = camel.from("direct:endpoint", Integer.class);
 
         BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
 
@@ -59,7 +59,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
         context.start();
         Thread.sleep(200);
 
-        Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+        Publisher<Integer> data = camel.from("direct:endpoint", Integer.class);
 
         BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
 
@@ -82,7 +82,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
         BlockingQueue<String> queue = new LinkedBlockingDeque<>();
 
         Flowable.just(1, 2, 3)
-                .flatMap(camel.requestURI("bean:hello", String.class)::apply)
+                .flatMap(camel.to("bean:hello", String.class)::apply)
                 .doOnNext(queue::add)
                 .subscribe();
 
@@ -94,13 +94,41 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
     }
 
     @Test
+    public void testDirectSendAndForget() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:data")
+                        .to("mock:result");
+            }
+        }.addRoutesToCamelContext(context);
+
+        context.start();
+
+        Flowable.just(1, 2, 3)
+                .subscribe(camel.subscriber("direct:data", Integer.class));
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+        mock.assertIsSatisfied();
+
+        int idx = 1;
+        for (Exchange ex : mock.getExchanges()) {
+            Integer num = ex.getIn().getBody(Integer.class);
+            assertEquals(new Integer(idx++), num);
+        }
+
+    }
+
+    @Test
     public void testDirectCallOverload() throws Exception {
         context.start();
 
         BlockingQueue<String> queue = new LinkedBlockingDeque<>();
 
         Flowable.just(1, 2, 3)
-                .flatMap(e -> camel.requestURI("bean:hello", e, String.class))
+                .flatMap(e -> camel.to("bean:hello", e, String.class))
                 .doOnNext(queue::add)
                 .subscribe();
 
@@ -118,7 +146,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
         BlockingQueue<String> queue = new LinkedBlockingDeque<>();
 
         Flowable.just(1, 2, 3)
-                .flatMap(camel.requestURI("bean:hello")::apply)
+                .flatMap(camel.to("bean:hello")::apply)
                 .map(ex -> ex.getOut().getBody(String.class))
                 .doOnNext(queue::add)
                 .subscribe();
@@ -137,7 +165,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
         BlockingQueue<String> queue = new LinkedBlockingDeque<>();
 
         Flowable.just(1, 2, 3)
-                .flatMap(e -> camel.requestURI("bean:hello", e))
+                .flatMap(e -> camel.to("bean:hello", e))
                 .map(ex -> ex.getOut().getBody(String.class))
                 .doOnNext(queue::add)
                 .subscribe();
@@ -167,7 +195,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
         BlockingQueue<String> queue = new LinkedBlockingDeque<>();
 
         Flowable.just(1, 2, 3)
-                .flatMap(camel.requestURI("direct:proxy", String.class)::apply)
+                .flatMap(camel.to("direct:proxy", String.class)::apply)
                 .doOnNext(queue::add)
                 .subscribe();
 
@@ -193,7 +221,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
 
         context.start();
 
-        camel.processFromURI("direct:stream", p ->
+        camel.process("direct:stream", p ->
                 Flowable.fromPublisher(p)
                         .map(exchange -> {
                             int val = exchange.getIn().getBody(Integer.class);
@@ -233,7 +261,7 @@ public class DirectClientAPITest extends ReactiveStreamsTestSupport {
 
         context.start();
 
-        camel.processFromURI("direct:stream", Integer.class, p ->
+        camel.process("direct:stream", Integer.class, p ->
                 Flowable.fromPublisher(p)
                         .map(i -> -i)
         );

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
index bb69211..871282a 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/EventTypeTest.java
@@ -40,7 +40,7 @@ public class EventTypeTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).streamSubscriber("numbers", Integer.class);
 
         context.start();
 
@@ -66,7 +66,7 @@ public class EventTypeTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).streamSubscriber("numbers", Integer.class);
 
         context.start();
 
@@ -90,7 +90,7 @@ public class EventTypeTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).streamSubscriber("numbers", Integer.class);
 
         context.start();
 
@@ -117,7 +117,7 @@ public class EventTypeTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).streamSubscriber("numbers", Integer.class);
 
         context.start();
 
@@ -153,7 +153,7 @@ public class EventTypeTest extends CamelTestSupport {
             }
         }.addRoutesToCamelContext(context);
 
-        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
+        Subscriber<Integer> numbers = CamelReactiveStreams.get(context).streamSubscriber("numbers", Integer.class);
 
         context.start();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
index 4e96ac6..dcc9c3e 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
@@ -36,7 +36,7 @@ public class ExchangeRequestTest extends CamelTestSupport {
 
         CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
 
-        Publisher<Exchange> string = camel.request("data", new DefaultExchange(context));
+        Publisher<Exchange> string = camel.toStream("data", new DefaultExchange(context));
 
         Exchange res = Flowable.fromPublisher(string).blockingFirst();
 
@@ -52,7 +52,7 @@ public class ExchangeRequestTest extends CamelTestSupport {
 
         CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
 
-        Integer res = Flowable.fromPublisher(camel.request("plusOne", 1L, Integer.class))
+        Integer res = Flowable.fromPublisher(camel.toStream("plusOne", 1L, Integer.class))
                 .blockingFirst();
 
         assertNotNull(res);
@@ -64,7 +64,7 @@ public class ExchangeRequestTest extends CamelTestSupport {
         CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
 
         Integer sum = Flowable.just(1, 2, 3)
-                .flatMap(e -> camel.request("plusOne", e, Integer.class))
+                .flatMap(e -> camel.toStream("plusOne", e, Integer.class))
                 .reduce((i, j) -> i + j)
                 .blockingGet();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/PublisherTypeConversionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/PublisherTypeConversionTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/PublisherTypeConversionTest.java
index f117878..5e5dec3 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/PublisherTypeConversionTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/PublisherTypeConversionTest.java
@@ -39,21 +39,21 @@ public class PublisherTypeConversionTest extends CamelTestSupport {
         CountDownLatch latch = new CountDownLatch(3);
         List<Integer> integers = new LinkedList<>();
 
-        Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("pub", Exchange.class))
+        Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("pub", Exchange.class))
                 .map(x -> x.getIn().getBody(Integer.class))
                 .subscribe(n -> {
                     integers.add(n);
                     latch.countDown();
                 });
 
-        Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("pub"))
+        Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("pub"))
                 .map(x -> x.getIn().getBody(Integer.class))
                 .subscribe(n -> {
                     integers.add(n);
                     latch.countDown();
                 });
 
-        Observable.fromPublisher(CamelReactiveStreams.get(context).getPublisher("pub", Integer.class))
+        Observable.fromPublisher(CamelReactiveStreams.get(context).fromStream("pub", Integer.class))
                 .subscribe(n -> {
                     integers.add(n);
                     latch.countDown();

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
index b7ff7ff..05297b1 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/platforms/AbstractPlatformTestSupport.java
@@ -55,7 +55,7 @@ public abstract class AbstractPlatformTestSupport extends CamelTestSupport {
         List<Integer> elements = new LinkedList<>();
         CountDownLatch latch = new CountDownLatch(num);
 
-        this.changeSign(camel.getPublisher("integers", Integer.class), i -> {
+        this.changeSign(camel.fromStream("integers", Integer.class), i -> {
             elements.add(i);
             latch.countDown();
         });
@@ -95,7 +95,7 @@ public abstract class AbstractPlatformTestSupport extends CamelTestSupport {
             elements.add(i);
         }
 
-        changeSign(elements, camel.getSubscriber("integers", Integer.class));
+        changeSign(elements, camel.streamSubscriber("integers", Integer.class));
         context.start();
 
         MockEndpoint mock = getMockEndpoint("mock:endpoint");

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index cdbcc08..995c218 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -62,22 +62,22 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public Publisher<Exchange> getPublisher(String name) {
+    public Publisher<Exchange> fromStream(String name) {
         return null;
     }
 
     @Override
-    public <T> Publisher<T> getPublisher(String name, Class<T> type) {
+    public <T> Publisher<T> fromStream(String name, Class<T> type) {
         return null;
     }
 
     @Override
-    public Subscriber<Exchange> getSubscriber(String name) {
+    public Subscriber<Exchange> streamSubscriber(String name) {
         return null;
     }
 
     @Override
-    public <T> Subscriber<T> getSubscriber(String name, Class<T> type) {
+    public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
         return null;
     }
 
@@ -107,65 +107,75 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
-    public Publisher<Exchange> request(String name, Object data) {
+    public Publisher<Exchange> toStream(String name, Object data) {
         return null;
     }
 
     @Override
-    public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+    public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
         return null;
     }
 
     @Override
-    public Function<?, ? extends Publisher<Exchange>> request(String name) {
+    public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
         return null;
     }
 
     @Override
-    public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
+    public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
         return null;
     }
 
     @Override
-    public Publisher<Exchange> publishURI(String uri) {
+    public Publisher<Exchange> from(String uri) {
         return null;
     }
 
     @Override
-    public <T> Publisher<T> publishURI(String uri, Class<T> type) {
+    public <T> Publisher<T> from(String uri, Class<T> type) {
         return null;
     }
 
     @Override
-    public Publisher<Exchange> requestURI(String uri, Object data) {
+    public Publisher<Exchange> to(String uri, Object data) {
         return null;
     }
 
     @Override
-    public Function<Object, Publisher<Exchange>> requestURI(String uri) {
+    public Function<Object, Publisher<Exchange>> to(String uri) {
         return null;
     }
 
     @Override
-    public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
+    public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
         return null;
     }
 
     @Override
-    public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
+    public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
         return null;
     }
 
     @Override
-    public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+    public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
 
     }
 
     @Override
-    public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+    public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
 
     }
 
+    @Override
+    public Subscriber<Exchange> subscriber(String uri) {
+        return null;
+    }
+
+    @Override
+    public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
+        return null;
+    }
+
     public String getName() {
         return name;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
index 3e3c4ae..1817fdf 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherConversionVerificationTest.java
@@ -43,7 +43,7 @@ public class CamelPublisherConversionVerificationTest extends PublisherVerificat
             }
         };
 
-        Publisher<Long> pub = CamelReactiveStreams.get(context).getPublisher("prod", Long.class);
+        Publisher<Long> pub = CamelReactiveStreams.get(context).fromStream("prod", Long.class);
 
         try {
             builder.addRoutesToCamelContext(context);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
index 72ce04b..f3a705e 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelPublisherVerificationTest.java
@@ -43,7 +43,7 @@ public class CamelPublisherVerificationTest extends PublisherVerification<Exchan
             }
         };
 
-        Publisher<Exchange> pub = CamelReactiveStreams.get(context).getPublisher("prod");
+        Publisher<Exchange> pub = CamelReactiveStreams.get(context).fromStream("prod");
 
         try {
             builder.addRoutesToCamelContext(context);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
index b36417c..33c44a1 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberConversionVerificationTest.java
@@ -43,7 +43,7 @@ public class CamelSubscriberConversionVerificationTest extends SubscriberBlackbo
             }
         };
 
-        Subscriber<Integer> sub = CamelReactiveStreams.get(context).getSubscriber("sub", Integer.class);
+        Subscriber<Integer> sub = CamelReactiveStreams.get(context).streamSubscriber("sub", Integer.class);
 
         try {
             builder.addRoutesToCamelContext(context);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
index 2452f7d..75bb064 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/tck/CamelSubscriberVerificationTest.java
@@ -45,7 +45,7 @@ public class CamelSubscriberVerificationTest extends SubscriberBlackboxVerificat
             }
         };
 
-        Subscriber<Exchange> sub = CamelReactiveStreams.get(context).getSubscriber("sub");
+        Subscriber<Exchange> sub = CamelReactiveStreams.get(context).streamSubscriber("sub");
 
         try {
             builder.addRoutesToCamelContext(context);

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
index 1929255..d5c7186 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
@@ -59,8 +59,8 @@ public class BasicCamelToReactorExample {
         public void setupStreams() {
 
             // Use two streams from Camel
-            Publisher<Integer> numbers = camel.getPublisher("numbers", Integer.class);
-            Publisher<String> strings = camel.getPublisher("strings", String.class);
+            Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class);
+            Publisher<String> strings = camel.fromStream("strings", String.class);
 
             Flux.from(numbers)
                     .zipWith(strings) // emit items in pairs

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
index 1d0024c..a5ba6df 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
@@ -84,7 +84,7 @@ public class BasicCamelToReactorInOutExample {
             from("timer:clock?period=9000&delay=1500")
                     .setBody().header(Exchange.TIMER_COUNTER).convertBodyTo(Long.class) // Sample ID
                     .bean("userBean", "getUserInfo") // Get the user info from reactor code
-                    .process(new UnwrapStreamProcessor()).split().body() // Unwrap the Publisher
+                    .process(new UnwrapStreamProcessor()) // Unwrap the Publisher
                     .log("BasicCamelToReactorInOut - Got ${body}");
 
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
index 5a71eb5..a16dbca 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
@@ -56,7 +56,7 @@ public class BasicReactorToCamelExample {
         public void setupStreams() {
 
             // Get a subscriber from camel
-            Subscriber<String> elements = camel.getSubscriber("elements", String.class);
+            Subscriber<String> elements = camel.streamSubscriber("elements", String.class);
 
             // Emit a string every 7 seconds and push it to the Camel "elements" stream
             Flux.interval(Duration.ofSeconds(7))

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
index 36bc59e..8124926 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
@@ -59,7 +59,7 @@ public class BasicReactorToCamelInOutExample {
 
             Flux.interval(Duration.ofSeconds(8))
                     .map(i -> i + 1) // to start from 1
-                    .flatMap(camel.request("sqrt", Double.class)) // call Camel and continue
+                    .flatMap(camel.toStream("sqrt", Double.class)) // call Camel and continue
                     .map(d -> "BasicReactorToCamelInOut - sqrt=" + d)
                     .doOnNext(LOG::info)
                     .subscribe();

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java
index 8a01965..b4da776 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIRestExample.java
@@ -47,7 +47,7 @@ public class ClientAPIRestExample {
         public void setup() {
 
             // Rest endpoint to retrieve all orders: http://localhost:8080/camel/orders
-            camel.processFromURI("rest:get:orders", exchange ->
+            camel.process("rest:get:orders", exchange ->
                     Flux.from(exchange)
                             .flatMap(ex -> allOrders()));
 
@@ -55,7 +55,7 @@ public class ClientAPIRestExample {
             // Rest endpoint to retrieve an order.
             // Try: http://localhost:8080/camel/orders/1
             // Or: http://localhost:8080/camel/orders/xxx
-            camel.processFromURI("rest:get:orders/{orderId}", exchange ->
+            camel.process("rest:get:orders/{orderId}", exchange ->
                     Flux.from(exchange)
                             .map(ex -> ex.getIn().getHeader("orderId", String.class))
                             .flatMap(this::toOrderInfo)

http://git-wip-us.apache.org/repos/asf/camel/blob/9c441ec2/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java
index c1ad65c..3578265 100644
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ClientAPIWorkflowExample.java
@@ -55,10 +55,10 @@ public class ClientAPIWorkflowExample {
              * and sends them to an external system (simulation)
              * only if they contain the word "camel".
              */
-            Flux.from(camel.publishURI("file:input", InputStream.class))
-                    .flatMap(camel.requestURI("direct:unmarshal", String.class))
+            Flux.from(camel.from("file:input", InputStream.class))
+                    .flatMap(camel.to("direct:unmarshal", String.class))
                     .filter(text -> text.contains("camel"))
-                    .flatMap(camel.requestURI("direct:send", String.class))
+                    .flatMap(camel.to("direct:send", String.class))
                     .subscribe();
 
         }


Mime
View raw message