flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Streaming data from MongoDB using Flink
Date Thu, 16 Feb 2017 10:20:05 GMT
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides additional access
to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the cursor. In
the `run()` method, you should essentially have a while loop that polls the MongoDB cursor
and emits the fetched documents using the `SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s checkpointing
for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I
go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction<Document>() {
            ​​ @Override
            public void run(SourceFunction.SourceContext<Document> ctx) throws
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's
cursor in any of this methods (I suppose the most adequate would be the "run" method) in a
way it would allow me to return a new MongoDB document as it arrived to the database from
another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB
clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonteiro@gmail.com) wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro


Mime
View raw message