camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acosent...@apache.org
Subject [camel] branch master updated: CAMEL-12481: Add Change Streams support to MongoDB component
Date Wed, 05 Jun 2019 09:36:39 GMT
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 43e863e  CAMEL-12481: Add Change Streams support to MongoDB component
43e863e is described below

commit 43e863e7320d6cc5b10c9ba245c662389ad72b4b
Author: Denis Istomin <istomin.den@gmail.com>
AuthorDate: Thu May 30 23:59:45 2019 +0500

    CAMEL-12481: Add Change Streams support to MongoDB component
---
 .../src/main/docs/mongodb3-component.adoc          |  62 ++++--
 .../mongodb3/MongoAbstractConsumerThread.java      | 111 +++++++++++
 ...umer.java => MongoDbChangeStreamsConsumer.java} |  40 ++--
 .../mongodb3/MongoDbChangeStreamsThread.java       |  94 +++++++++
 .../camel/component/mongodb3/MongoDbComponent.java |  12 +-
 .../component/mongodb3/MongoDbConsumerType.java    |   4 +-
 .../camel/component/mongodb3/MongoDbEndpoint.java  |  97 ++++++---
 .../mongodb3/MongoDbTailTrackingManager.java       |   8 +-
 .../mongodb3/MongoDbTailableCursorConsumer.java    |  15 +-
 .../component/mongodb3/MongoDbTailingProcess.java  | 222 ---------------------
 .../component/mongodb3/MongoDbTailingThread.java   | 146 ++++++++++++++
 .../mongodb3/EmbedMongoConfiguration.java          |  15 +-
 .../mongodb3/MongoDbChangeStreamsConsumerTest.java | 113 +++++++++++
 13 files changed, 633 insertions(+), 306 deletions(-)

diff --git a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
index 0c75bbc..d20124c 100644
--- a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
+++ b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
@@ -3,7 +3,8 @@
 
 *Available as of Camel version 2.19*
 
-Note: Camel MongoDB3 component Use the Mongo Driver for Java 3.4. If your are looking for previews versions look the Camel MongoDB component
+Note: Camel MongoDB3 component Use the Mongo Driver for Java 3.x.
+If your are looking for previews versions look the Camel MongoDB component.
 
 According to Wikipedia: "NoSQL is a movement promoting a loosely defined
 class of non-relational data stores that break with a long history of
@@ -90,7 +91,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (20 parameters):
+==== Query Parameters (22 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -103,6 +104,7 @@ with the following path and query parameters:
 | *operation* (common) | Sets the operation this endpoint will execute against MongoDB. For possible values, see MongoDbOperation. |  | MongoDbOperation
 | *outputType* (common) | Convert the output of the producer to the selected type : DocumentList Document or MongoIterable. DocumentList or MongoIterable applies to findAll and aggregate. Document applies to all other operations. |  | MongoDbOutputType
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *consumerType* (consumer) | Consumer type. |  | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
@@ -110,6 +112,7 @@ with the following path and query parameters:
 | *dynamicity* (advanced) | Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties. Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI. It is disabled by default to boost performance. Enabling it will take a minimal performance hit. | false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
 | *writeResultAsHeader* (advanced) | In write operations, it determines whether instead of returning WriteResult as the body of the OUT message, we transfer the IN message to the OUT and attach the WriteResult as a header. | false | boolean
+| *streamFilter* (changeStream) | Filter condition for change streams consumer. |  | String
 | *persistentId* (tail) | One tail tracking collection can host many trackers for several tailable consumers. To keep them separate, each tracker should have its own unique persistentId. |  | String
 | *persistentTailTracking* (tail) | Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts. The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records. | false | boolean
 | *tailTrackCollection* (tail) | Collection where tail tracking information will be persisted. If not specified, MongoDbTailTrackingConfig#DEFAULT_COLLECTION will be used by default. |  | String
@@ -148,7 +151,7 @@ The component supports 3 options, which are listed below.
 // spring-boot-auto-configure options: END
 
 
-Note on options of MoongoDB component 
+Note on options of MongoDB component
 
 writeConcern *Remove in camel 2.19.* See Mongo client options <<MongoDB-ConfigurationofdatabaseinSpringXML>>. Set the WriteConcern for write operations on MongoDB using the standard ones. Resolved from the fields of the WriteConcern class by calling the link WriteConcernvalueOf(String) method.
 
@@ -431,8 +434,7 @@ to a single field, based on the `documentTimestamp` field:
 .setHeader(MongoDbConstants.FIELDS_PROJECTION).constant(Projection.include("documentTimestamp"))
 .setBody().constant("{}")
 .to("mongodb3:myDb?database=local&collection=myDemoCollection&operation=findOneByQuery")
-.to("direct:aMyBatisParameterizedSelect")
-;
+.to("direct:aMyBatisParameterizedSelect");
 ----------------------------------------------------------------------------------------------------------------------------
 
 ==== Create/update operations
@@ -783,8 +785,8 @@ in the shell, in the form of a `Document` in the OUT message body.
 
 ===== command
 
-Run the body as a command on database. Usefull for admin operation as
-getting host informations, replication or sharding status.
+Run the body as a command on database. Useful for admin operation as
+getting host information, replication or sharding status.
 
 Collection parameter is not use for this operation.
 
@@ -813,7 +815,13 @@ Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody",
 assertTrue("Result is not of type Long", result instanceof Long);
 -----------------------------------------------------------------------------------------------------------------------------
 
-=== Tailable Cursor Consumer
+=== Consumers
+There are several types of consumers:
+
+. Tailable Cursor Consumer
+. Change Streams Consumer
+
+==== Tailable Cursor Consumer
 
 MongoDB offers a mechanism to instantaneously consume ongoing data from
 a collection, by keeping the cursor open just like the `tail -f` command
@@ -835,7 +843,7 @@ new objects are inserted, MongoDB will push them as `Document` in natural
 order to your tailable cursor consumer, who will transform them to an
 Exchange and will trigger your route logic.
 
-=== How the tailable cursor consumer works
+===== How the tailable cursor consumer works
 
 To turn a cursor into a tailable cursor, a few special flags are to be
 signalled to MongoDB when first generating the cursor. Once created, the
@@ -886,7 +894,7 @@ The above route will consume from the "flights.cancellations" capped
 collection, using "departureTime" as the increasing field, with a
 default regeneration cursor delay of 1000ms.
 
-=== Persistent tail tracking
+===== Persistent tail tracking
 
 Standard tail tracking is volatile and the last value is only kept in
 memory. However, in practice you will need to restart your Camel
@@ -906,7 +914,7 @@ persisting at regular intervals too in the future (flush every 5
 seconds) for added robustness if the demand is there. To request this
 feature, please open a ticket in the Camel JIRA.
 
-=== Enabling persistent tail tracking
+===== Enabling persistent tail tracking
 
 To enable this function, set at least the following options on the
 endpoint URI:
@@ -952,6 +960,35 @@ from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasin
     .to("mock:test");
 -----------------------------------------------------------------------------------------------------------------------------------
 
+==== Change Streams Consumer
+
+Change Streams allow applications to access real-time data changes without the complexity and risk of tailing the MongoDB oplog.
+Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.
+Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
+
+To configure Change Streams Consumer you need to specify `consumerType`, `database`, `collection`
+and optional JSON property `streamFilter` to filter events.
+That JSON property is standart MongoDB `$match` aggregation.
+It could be easily specified using XML DSL configuration:
+
+[source,xml]
+-------------
+<route id="filterConsumer">
+    <from uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
+    <to uri="mock:test"/>
+
+    <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
+</route>
+-------------
+
+Java configuration:
+[source,java]
+-------------
+from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
+    .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
+    .to("mock:test");
+-------------
+
 === Type conversions
 
 The `MongoDbBasicConverters` type converter included with the
@@ -970,8 +1007,7 @@ object to a `Map`, which is in turn used to initialise a new
 |fromStringToList |`String` |`List<Bson>` |uses `org.bson.codecs.configuration.CodecRegistries` to convert to BsonArray then to List<Bson>.
 |=======================================================================
 
-This type converter is auto-discovered, so you don't need to configure
-anything manually.
+This type converter is auto-discovered, so you don't need to configure anything manually.
 
 === See also
 
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java
new file mode 100644
index 0000000..5c27c94
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoAbstractConsumerThread.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.concurrent.CountDownLatch;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+import org.apache.camel.Consumer;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class MongoAbstractConsumerThread implements Runnable {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    // local final copies of variables for increased performance
+    protected final long cursorRegenerationDelay;
+    protected final boolean cursorRegenerationDelayEnabled;
+
+    protected final MongoCollection<Document> dbCol;
+    protected final Consumer consumer;
+    protected final MongoDbEndpoint endpoint;
+    protected MongoCursor cursor;
+
+    volatile boolean keepRunning = true;
+    private volatile boolean stopped;
+    private volatile CountDownLatch stoppedLatch;
+
+    MongoAbstractConsumerThread(MongoDbEndpoint endpoint, Consumer consumer) {
+        this.endpoint = endpoint;
+        this.consumer = consumer;
+        this.dbCol = endpoint.getMongoCollection();
+        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+    }
+
+    protected abstract MongoCursor<Document> initializeCursor();
+    protected abstract void init() throws Exception;
+    protected abstract void doRun();
+    protected abstract void regeneratingCursor();
+
+    /**
+     * Main loop.
+     */
+    @Override
+    public void run() {
+        stoppedLatch = new CountDownLatch(1);
+        while (keepRunning) {
+            doRun();
+            // regenerate the cursor, if reading failed for some reason
+            if (keepRunning) {
+                cursor.close();
+                regeneratingCursor();
+
+                if (cursorRegenerationDelayEnabled) {
+                    try {
+                        Thread.sleep(cursorRegenerationDelay);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+
+                cursor = initializeCursor();
+            }
+        }
+
+        stopped = true;
+        stoppedLatch.countDown();
+    }
+
+    protected void stop() throws Exception {
+        if (log.isInfoEnabled()) {
+            log.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}",
+                    String.format("db: %s, col: %s", endpoint.getDatabase(), endpoint.getCollection()));
+        }
+
+        keepRunning = false;
+        if (cursor != null) {
+            cursor.close();
+        }
+        awaitStopped();
+
+        if (log.isInfoEnabled()) {
+            log.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}",
+                    String.format("db: %s, col: %s", endpoint.getDatabase(), endpoint.getCollection()));
+        }
+    }
+
+    private void awaitStopped() throws InterruptedException {
+        if (!stopped) {
+            log.info("Going to wait for stopping");
+            stoppedLatch.await();
+        }
+    }
+}
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
similarity index 61%
copy from components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
copy to components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
index d46b786..7edaada 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java
@@ -16,20 +16,28 @@
  */
 package org.apache.camel.component.mongodb3;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import static java.util.Collections.singletonList;
+
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.BsonDocument;
 
 /**
- * The MongoDb consumer.
+ * The MongoDb Change Streams consumer.
  */
-public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+public class MongoDbChangeStreamsConsumer extends DefaultConsumer {
+
+    private static final String STREAM_FILTER_PROPERTY = "streamFilter";
+
     private final MongoDbEndpoint endpoint;
     private ExecutorService executor;
-    private MongoDbTailingProcess tailingProcess;
+    private MongoDbChangeStreamsThread changeStreamsThread;
 
-    public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
+    public MongoDbChangeStreamsConsumer(MongoDbEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
@@ -37,8 +45,8 @@ public class MongoDbTailableCursorConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (tailingProcess != null) {
-            tailingProcess.stop();
+        if (changeStreamsThread != null) {
+            changeStreamsThread.stop();
         }
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
@@ -49,17 +57,15 @@ public class MongoDbTailableCursorConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
-        MongoDbTailTrackingManager trackingManager = initTailTracking();
-        tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
-        tailingProcess.initializeProcess();
-        executor.execute(tailingProcess);
-    }
+        String streamFilter = (String) getRoute().getProperties().get(STREAM_FILTER_PROPERTY);
+        List<BsonDocument> bsonFilter = null;
+        if (ObjectHelper.isNotEmpty(streamFilter)) {
+            bsonFilter = singletonList(BsonDocument.parse(streamFilter));
+        }
 
-    protected MongoDbTailTrackingManager initTailTracking() throws Exception {
-        MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
-        answer.initialize();
-        return answer;
+        executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
+        changeStreamsThread = new MongoDbChangeStreamsThread(endpoint, this, bsonFilter);
+        changeStreamsThread.init();
+        executor.execute(changeStreamsThread);
     }
-
 }
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java
new file mode 100644
index 0000000..15e7612
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsThread.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.List;
+
+import com.mongodb.MongoException;
+import com.mongodb.client.ChangeStreamIterable;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import org.apache.camel.Exchange;
+import org.bson.BsonDocument;
+import org.bson.Document;
+
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+class MongoDbChangeStreamsThread extends MongoAbstractConsumerThread {
+    private List<BsonDocument> bsonFilter;
+    private BsonDocument resumeToken;
+
+    MongoDbChangeStreamsThread(MongoDbEndpoint endpoint, MongoDbChangeStreamsConsumer consumer, List<BsonDocument> bsonFilter) {
+        super(endpoint, consumer);
+        this.bsonFilter = bsonFilter;
+    }
+
+    @Override
+    protected void init() {
+        cursor = initializeCursor();
+    }
+
+    @Override
+    protected MongoCursor initializeCursor() {
+        ChangeStreamIterable<Document> iterable = bsonFilter != null
+                ? dbCol.watch(bsonFilter)
+                : dbCol.watch();
+
+        if (resumeToken != null) {
+            iterable = iterable.resumeAfter(resumeToken);
+        }
+
+        MongoCursor<ChangeStreamDocument<Document>> cursor = iterable.iterator();
+        return cursor;
+    }
+
+    @Override
+    protected void regeneratingCursor() {
+        if (log.isDebugEnabled()) {
+            log.debug("Regenerating cursor, waiting {}ms first", cursorRegenerationDelay);
+        }
+    }
+
+    @Override
+    protected void doRun() {
+        try {
+            while (cursor.hasNext() && keepRunning) {
+                ChangeStreamDocument<Document> dbObj = (ChangeStreamDocument<Document>) cursor.next();
+                Exchange exchange = endpoint.createMongoDbExchange(dbObj.getFullDocument());
+
+                try {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.getFullDocument().get(MONGO_ID));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception ignored) {
+                }
+
+                this.resumeToken = dbObj.getResumeToken();
+            }
+        } catch (MongoException e) {
+            // cursor.hasNext() opens socket and waiting for data
+            // it throws exception when cursor is closed in another thread
+            // there is no way to stop hasNext() before closing cursor
+            if (keepRunning) {
+                throw e;
+            } else {
+                log.debug("Exception from MongoDB, will regenerate cursor.", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbComponent.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbComponent.java
index 8f66736..476a548 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbComponent.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbComponent.java
@@ -32,8 +32,12 @@ import org.apache.camel.support.DefaultComponent;
 @Component("mongodb,mongodb3")
 public class MongoDbComponent extends DefaultComponent {
 
-    public static final Set<MongoDbOperation> WRITE_OPERATIONS = new HashSet<>(Arrays.asList(MongoDbOperation.insert, MongoDbOperation.save, MongoDbOperation.update,
-                                                                                             MongoDbOperation.remove));
+    public static final Set<MongoDbOperation> WRITE_OPERATIONS = new HashSet<>(Arrays.asList(
+            MongoDbOperation.insert,
+            MongoDbOperation.save,
+            MongoDbOperation.update,
+            MongoDbOperation.remove));
+
     public MongoDbComponent() {
         this(null);
     }
@@ -53,16 +57,14 @@ public class MongoDbComponent extends DefaultComponent {
 
     @Override
     protected void doShutdown() throws Exception {
-
         super.doShutdown();
     }
 
     public static CamelMongoDbException wrapInCamelMongoDbException(Throwable t) {
         if (t instanceof CamelMongoDbException) {
-            return (CamelMongoDbException)t;
+            return (CamelMongoDbException) t;
         } else {
             return new CamelMongoDbException(t);
         }
     }
-
 }
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
index 87f660c..aee6ac7 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbConsumerType.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.mongodb3;
 
 public enum MongoDbConsumerType {
 
-    tailable
+    tailable,
+    changeStreams
     // more consumer types to be included in future versions
-
 }
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
index b07a1fe..45d077b 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbEndpoint.java
@@ -42,6 +42,7 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -80,13 +81,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private boolean dynamicity;
     @UriParam(label = "advanced")
     private boolean writeResultAsHeader;
-    // tailable cursor consumer by default
-    private MongoDbConsumerType consumerType;
+    @UriParam(label = "consumer")
+    private String consumerType;
     @UriParam(label = "advanced", defaultValue = "1000")
     private long cursorRegenerationDelay = 1000L;
     @UriParam(label = "tail")
     private String tailTrackIncreasingField;
 
+    @UriParam(label = "changeStream")
+    private String streamFilter;
+
     // persistent tail tracking
     @UriParam(label = "tail")
     private boolean persistentTailTracking;
@@ -100,14 +104,15 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private String tailTrackField;
     @UriParam(label = "common")
     private MongoDbOutputType outputType;
-    
+
+    // tailable cursor consumer by default
+    private MongoDbConsumerType dbConsumerType;
+
     private MongoDbTailTrackingConfig tailTrackingConfig;
 
     private MongoDatabase mongoDatabase;
     private MongoCollection<Document> mongoCollection;
 
-    // ======= Constructors ===============================================
-
     public MongoDbEndpoint() {
     }
 
@@ -115,9 +120,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         super(uri, component);
     }
 
-    // ======= Implementation methods =====================================
-
-    public Producer createProducer() throws Exception {
+    public Producer createProducer() {
         validateProducerOptions();
         initializeConnection();
         return new MongoDbProducer(this);
@@ -131,15 +134,25 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         initializeConnection();
 
         // select right consumer type
-        if (consumerType == null) {
-            consumerType = MongoDbConsumerType.tailable;
+        try {
+            dbConsumerType = ObjectHelper.isEmpty(consumerType)
+                    ? MongoDbConsumerType.tailable
+                    : MongoDbConsumerType.valueOf(consumerType);
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Consumer type not supported: " + consumerType, e);
         }
 
         Consumer consumer;
-        if (consumerType == MongoDbConsumerType.tailable) {
+
+        switch (dbConsumerType) {
+        case tailable:
             consumer = new MongoDbTailableCursorConsumer(this, processor);
-        } else {
-            throw new CamelMongoDbException("Consumer type not supported: " + consumerType);
+            break;
+        case changeStreams:
+            consumer = new MongoDbChangeStreamsConsumer(this, processor);
+            break;
+        default:
+            throw new CamelMongoDbException("Consumer type not supported: " + dbConsumerType);
         }
 
         configureConsumer(consumer);
@@ -147,11 +160,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Check if outputType is compatible with operation. DbCursor and
-     * DocumentList applies to findAll. Document applies to others.
+     * Check if outputType is compatible with operation.
+     * DbCursor and DocumentList applies to findAll.
+     * Document applies to others.
      */
-    @SuppressWarnings("unused") // TODO: validate Output on createProducer
-                                // method.
+    @SuppressWarnings("unused")
+    // TODO: validate Output on createProducer method.
     private void validateOutputType() {
         if (!ObjectHelper.isEmpty(outputType)) {
             if (DocumentList.equals(outputType) && !(findAll.equals(operation))) {
@@ -170,9 +184,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         // make our best effort to validate, options with defaults are checked
         // against their defaults, which is not always a guarantee that
         // they haven't been explicitly set, but it is enough
-        if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) || !ObjectHelper.isEmpty(tailTrackCollection)
+        if (!ObjectHelper.isEmpty(dbConsumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) || !ObjectHelper.isEmpty(tailTrackCollection)
             || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) {
-            throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
+            throw new IllegalArgumentException("dbConsumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
         }
     }
 
@@ -183,7 +197,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         if (!ObjectHelper.isEmpty(operation) || dynamicity || outputType != null) {
             throw new IllegalArgumentException("operation, dynamicity, outputType " + "options cannot appear on a consumer endpoint");
         }
-        if (consumerType == MongoDbConsumerType.tailable) {
+        if (dbConsumerType == MongoDbConsumerType.tailable) {
             if (tailTrackIncreasingField == null) {
                 throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint");
             }
@@ -193,9 +207,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         }
     }
 
-/**
-     * Initialises the MongoDB connection using the Mongo object provided to the
-     * endpoint
+    /**
+     * Initialises the MongoDB connection using the Mongo object provided to the endpoint
      * 
      * @throws CamelMongoDbException
      */
@@ -293,9 +306,6 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         log.debug("Resolved the connection with the name {} as {}", connectionBean, mongoConnection);
         super.doStart();
     }
-    
-    // ======= Getters and setters
-    // ===============================================
 
     public String getConnectionBean() {
         return connectionBean;
@@ -399,8 +409,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     /**
      * Set the {@link WriteConcern} for write operations on MongoDB, passing in
      * the bean ref to a custom WriteConcern which exists in the Registry. You
-     * can also use standard WriteConcerns by passing in their key. See the
-     * {@link #setWriteConcern(String) setWriteConcern} method.
+     * can also use standard WriteConcerns by passing in their key.
      * 
      * @param writeConcernRef the name of the bean in the registry that
      *            represents the WriteConcern to use
@@ -448,21 +457,32 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     /**
      * Reserved for future use, when more consumer types are supported.
      *
-     * @param consumerType key of the consumer type
-     * @throws CamelMongoDbException
+     * @param dbConsumerType key of the consumer type
+     * @throws CamelMongoDbException if consumer type is not supported
      */
-    public void setConsumerType(String consumerType) throws CamelMongoDbException {
+    public void setDbConsumerType(String dbConsumerType) throws CamelMongoDbException {
         try {
-            this.consumerType = MongoDbConsumerType.valueOf(consumerType);
+            this.dbConsumerType = MongoDbConsumerType.valueOf(dbConsumerType);
         } catch (IllegalArgumentException e) {
             throw new CamelMongoDbException("Consumer type not supported", e);
         }
     }
 
-    public MongoDbConsumerType getConsumerType() {
+    public MongoDbConsumerType getDbConsumerType() {
+        return dbConsumerType;
+    }
+
+    public String getConsumerType() {
         return consumerType;
     }
 
+    /**
+     * Consumer type.
+     */
+    public void setConsumerType(String consumerType) {
+        this.consumerType = consumerType;
+    }
+
     public String getTailTrackDb() {
         return tailTrackDb;
     }
@@ -620,4 +640,15 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoCollection<Document> getMongoCollection() {
         return mongoCollection;
     }
+
+    public String getStreamFilter() {
+        return streamFilter;
+    }
+
+    /**
+     * Filter condition for change streams consumer.
+     */
+    public void setStreamFilter(String streamFilter) {
+        this.streamFilter = streamFilter;
+    }
 }
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
index d92b5d3..d38cb5a 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
@@ -44,7 +44,7 @@ public class MongoDbTailTrackingManager {
         this.config = config;
     }
 
-    public void initialize() throws Exception {
+    public void initialize() {
         if (!config.persistent) {
             return;
         }
@@ -56,8 +56,7 @@ public class MongoDbTailTrackingManager {
             dbCol.insertOne(filter);
             trackingObj = dbCol.find(filter).first();
         }
-        // keep only the _id, the rest is useless and causes more overhead
-        // during update
+        // keep only the _id, the rest is useless and causes more overhead during update
         trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
     }
 
@@ -71,8 +70,7 @@ public class MongoDbTailTrackingManager {
         }
 
         Bson updateObj = Updates.set(config.field, lastVal);
-        FindOneAndUpdateOptions options = new FindOneAndUpdateOptions()
-        .returnDocument(ReturnDocument.AFTER);
+        FindOneAndUpdateOptions options = new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER);
         trackingObj = dbCol.findOneAndUpdate(trackingObj, updateObj, options);
     }
 
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
index d46b786..de803d8 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
@@ -27,7 +27,7 @@ import org.apache.camel.support.DefaultConsumer;
 public class MongoDbTailableCursorConsumer extends DefaultConsumer {
     private final MongoDbEndpoint endpoint;
     private ExecutorService executor;
-    private MongoDbTailingProcess tailingProcess;
+    private MongoDbTailingThread tailingThread;
 
     public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -37,8 +37,8 @@ public class MongoDbTailableCursorConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (tailingProcess != null) {
-            tailingProcess.stop();
+        if (tailingThread != null) {
+            tailingThread.stop();
         }
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
@@ -51,15 +51,14 @@ public class MongoDbTailableCursorConsumer extends DefaultConsumer {
         super.doStart();
         executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
         MongoDbTailTrackingManager trackingManager = initTailTracking();
-        tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
-        tailingProcess.initializeProcess();
-        executor.execute(tailingProcess);
+        tailingThread = new MongoDbTailingThread(endpoint, this, trackingManager);
+        tailingThread.init();
+        executor.execute(tailingThread);
     }
 
-    protected MongoDbTailTrackingManager initTailTracking() throws Exception {
+    protected MongoDbTailTrackingManager initTailTracking() {
         MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
         answer.initialize();
         return answer;
     }
-
 }
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
deleted file mode 100644
index 3e67dc1..0000000
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mongodb3;
-
-import java.util.concurrent.CountDownLatch;
-
-import com.mongodb.CursorType;
-import com.mongodb.MongoCursorNotFoundException;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import org.apache.camel.Exchange;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.mongodb.client.model.Filters.gt;
-import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
-
-public class MongoDbTailingProcess implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
-    private static final String CAPPED_KEY = "capped";
-
-    public volatile boolean keepRunning = true;
-    public volatile boolean stopped; // = false
-    private volatile CountDownLatch stoppedLatch;
-
-    private final MongoCollection<Document> dbCol;
-    private final MongoDbEndpoint endpoint;
-    private final MongoDbTailableCursorConsumer consumer;
-
-    // create local, final copies of these variables for increased performance
-    private final long cursorRegenerationDelay;
-    private final boolean cursorRegenerationDelayEnabled;
-
-    private MongoCursor<Document> cursor;
-    private MongoDbTailTrackingManager tailTracking;
-
-    public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
-        this.endpoint = endpoint;
-        this.consumer = consumer;
-        this.dbCol = endpoint.getMongoCollection();
-        this.tailTracking = tailTrack;
-        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
-        this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
-    }
-
-    public MongoCursor<Document> getCursor() {
-        return cursor;
-    }
-
-    /**
-     * Initialise the tailing process, the cursor and if persistent tail
-     * tracking is enabled, recover the cursor from the persisted point. As part
-     * of the initialisation process, the component will validate that the
-     * collection we are targeting is 'capped'.
-     *
-     * @throws Exception
-     */
-    public void initializeProcess() throws Exception {
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
-        }
-
-        if (!isCollectionCapped()) {
-            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + endpoint.getCollection() + " is not capped");
-        }
-        try {
-            // recover the last value from the store if it exists
-            tailTracking.recoverFromStore();
-            cursor = initializeCursor();
-        } catch (Exception e) {
-            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
-        }
-
-        if (cursor == null) {
-            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
-        }
-
-    }
-
-    private Boolean isCollectionCapped() {
-        return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
-    }
-
-    private Document createCollStatsCommand() {
-        return new Document("collStats", endpoint.getCollection());
-    }
-
-    /**
-     * The heart of the tailing process.
-     */
-    @Override
-    public void run() {
-        stoppedLatch = new CountDownLatch(1);
-        while (keepRunning) {
-            doRun();
-            // if the previous call didn't return because we have stopped
-            // running, then regenerate the cursor
-            if (keepRunning) {
-                cursor.close();
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay);
-                }
-
-                if (cursorRegenerationDelayEnabled) {
-                    try {
-                        Thread.sleep(cursorRegenerationDelay);
-                    } catch (InterruptedException e) {
-                        // ignore
-                    }
-                }
-
-                cursor = initializeCursor();
-            }
-        }
-
-        stopped = true;
-        stoppedLatch.countDown();
-    }
-
-    protected void stop() throws Exception {
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
-        }
-        keepRunning = false;
-        // close the cursor if it's open, so if it is blocked on hasNext() it
-        // will return immediately
-        if (cursor != null) {
-            cursor.close();
-        }
-        awaitStopped();
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
-        }
-    }
-
-    /**
-     * The heart of the tailing process.
-     */
-    private void doRun() {
-        // while the cursor has more values, keepRunning is true and the
-        // cursorId is not 0, which symbolizes that the cursor is dead
-        try {
-            while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() !=
-                                                      // 0 &&
-                Document dbObj = cursor.next();
-                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
-                try {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
-                    }
-                    consumer.getProcessor().process(exchange);
-                } catch (Exception e) {
-                    // do nothing
-                }
-                tailTracking.setLastVal(dbObj);
-            }
-        } catch (MongoCursorNotFoundException e) {
-            // we only log the warning if we are not stopping, otherwise it is
-            // expected because the stop() method kills the cursor just in case
-            // it is blocked
-            // waiting for more data to arrive
-            if (keepRunning) {
-                LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
-            }
-        } catch (IllegalStateException e) {
-            // cursor.hasNext() opens socket and waiting for data
-            // it throws exception when cursor is closed in another thread
-            // there is no way to stop hasNext() before closing cursor
-            if (keepRunning) {
-                throw e;
-            } else {
-                LOG.debug("Cursor closed exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
-            }
-        }
-
-        // the loop finished, persist the lastValue just in case we are shutting
-        // down
-        // TODO: perhaps add a functionality to persist every N records
-        tailTracking.persistToStore();
-    }
-
-    // no arguments, will ask DB what the last updated Id was (checking
-    // persistent storage)
-    private MongoCursor<Document> initializeCursor() {
-        Object lastVal = tailTracking.lastVal;
-        // lastVal can be null if we are initializing and there is no
-        // persistence enabled
-        MongoCursor<Document> answer;
-        if (lastVal == null) {
-            answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
-        } else {
-            try (MongoCursor<Document> iterator = dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) {
-                answer = iterator;
-            }
-        }
-        return answer;
-    }
-
-    private void awaitStopped() throws InterruptedException {
-        if (!stopped) {
-            LOG.info("Going to wait for stopping");
-            stoppedLatch.await();
-        }
-    }
-
-}
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java
new file mode 100644
index 0000000..3547482
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingThread.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import com.mongodb.CursorType;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCursor;
+import org.apache.camel.Exchange;
+import org.bson.Document;
+
+import static com.mongodb.client.model.Filters.gt;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+class MongoDbTailingThread extends MongoAbstractConsumerThread {
+
+    private static final String CAPPED_KEY = "capped";
+    private MongoDbTailTrackingManager tailTracking;
+
+    MongoDbTailingThread(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
+        super(endpoint, consumer);
+        this.tailTracking = tailTrack;
+    }
+
+    /**
+     * Initialise the tailing process, the cursor and if persistent tail tracking is enabled,
+     * recover the cursor from the persisted point.
+     * As part of the initialisation process,
+     * the component will validate that the collection we are targeting is 'capped'.
+     */
+    @Override
+    protected void init() {
+        if (log.isInfoEnabled()) {
+            log.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}",
+                    String.format("db: %s, col: %s", endpoint.getMongoDatabase(), endpoint.getCollection()));
+        }
+
+        if (!isCollectionCapped()) {
+            throw new CamelMongoDbException(
+                    String.format("Tailable cursors are only compatible with capped collections, and collection %s is not capped",
+                            endpoint.getCollection()));
+        }
+        try {
+            // recover the last value from the store if it exists
+            tailTracking.recoverFromStore();
+            cursor = initializeCursor();
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
+        }
+
+        if (cursor == null) {
+            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
+        }
+    }
+
+    private Boolean isCollectionCapped() {
+        return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+    }
+
+    private Document createCollStatsCommand() {
+        return new Document("collStats", endpoint.getCollection());
+    }
+
+    @Override
+    // no arguments, will ask DB what the last updated Id was (checking persistent storage)
+    protected MongoCursor<Document> initializeCursor() {
+        Object lastVal = tailTracking.lastVal;
+        // lastVal can be null if we are initializing and there is no persistence enabled
+        MongoCursor<Document> answer;
+        if (lastVal == null) {
+            answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
+        } else {
+            try (MongoCursor<Document> iterator = dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal))
+                    .cursorType(CursorType.TailableAwait)
+                    .iterator()) {
+                answer = iterator;
+            }
+        }
+        return answer;
+    }
+
+    @Override
+    protected void regeneratingCursor() {
+        if (log.isDebugEnabled()) {
+            log.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay);
+        }
+    }
+
+    /**
+     * The heart of the tailing process.
+     */
+    @Override
+    protected void doRun() {
+        // while the cursor has more values, keepRunning is true and the
+        // cursorId is not 0, which symbolizes that the cursor is dead
+        try {
+            while (cursor.hasNext() && keepRunning) {
+                Document dbObj = (Document) cursor.next();
+                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                try {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
+                    }
+                    consumer.getProcessor().process(exchange);
+                } catch (Exception e) {
+                    // do nothing
+                }
+                tailTracking.setLastVal(dbObj);
+            }
+        } catch (MongoCursorNotFoundException e) {
+            // we only log the warning if we are not stopping, otherwise it is
+            // expected because the stop() method kills the cursor just in case
+            // it is blocked
+            // waiting for more data to arrive
+            if (keepRunning) {
+                log.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
+            }
+        } catch (IllegalStateException e) {
+            // cursor.hasNext() opens socket and waiting for data
+            // it throws exception when cursor is closed in another thread
+            // there is no way to stop hasNext() before closing cursor
+            if (keepRunning) {
+                throw e;
+            } else {
+                log.debug("Cursor closed exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
+            }
+        }
+
+        // the loop finished, persist the lastValue just in case we are shutting down
+        // TODO: perhaps add a functionality to persist every N records
+        tailTracking.persistToStore();
+    }
+}
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
index a0f9f58..7f0af85 100644
--- a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
@@ -28,6 +28,9 @@ import de.flapdoodle.embed.mongo.MongodStarter;
 import de.flapdoodle.embed.mongo.config.IMongodConfig;
 import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
 import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.Storage;
+
+import org.bson.Document;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -44,9 +47,19 @@ public class EmbedMongoConfiguration {
 
     static {
         try {
-            IMongodConfig mongodConfig = new MongodConfigBuilder().version(PRODUCTION).net(new Net(PORT, localhostIsIPv6())).build();
+            IMongodConfig mongodConfig = new MongodConfigBuilder()
+                    .version(PRODUCTION)
+                    .net(new Net(PORT, localhostIsIPv6()))
+                    .replication(new Storage(null, "replicationName", 5000))
+                    .build();
+
             MongodExecutable mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
             mongodExecutable.start();
+
+            // init replica set
+            MongoClient client = new MongoClient("localhost", PORT);
+            client.getDatabase("admin").runCommand(new Document("replSetInitiate", new Document()));
+
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java
new file mode 100644
index 0000000..647c641
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.bson.Document;
+import org.junit.Test;
+
+public class MongoDbChangeStreamsConsumerTest extends AbstractMongoDbTest {
+
+    private MongoCollection<Document> mongoCollection;
+    private String collectionName;
+
+    @Override
+    public void doPostSetup() {
+        super.doPostSetup();
+
+        collectionName = "camelTest";
+        mongoCollection = db.getCollection(collectionName, Document.class);
+        mongoCollection.drop();
+
+        CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
+        db.createCollection(collectionName, collectionOptions);
+        mongoCollection = db.getCollection(collectionName, Document.class);
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(10);
+
+        String consumerRouteId = "simpleConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Test
+    public void filterTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(1);
+
+        String consumerRouteId = "filterConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+
+        Document actualDocument = mock.getExchanges().get(0).getIn().getBody(Document.class);
+        assertEquals("value2", actualDocument.get("string"));
+
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+
+    protected void addTestRoutes() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("mongodb3:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+                        .id("simpleConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
+
+                from("mongodb3:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+                        .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.string': 'value2'}]}}")
+                        .id("filterConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
+            }
+        });
+    }
+}
\ No newline at end of file


Mime
View raw message