Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F0361200C01 for ; Thu, 19 Jan 2017 10:37:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EEC73160B54; Thu, 19 Jan 2017 09:37:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1CC74160B42 for ; Thu, 19 Jan 2017 10:37:06 +0100 (CET) Received: (qmail 86376 invoked by uid 500); 19 Jan 2017 09:37:06 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 86367 invoked by uid 99); 19 Jan 2017 09:37:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 09:37:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 21FECDFB86; Thu, 19 Jan 2017 09:37:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: tzulitai@apache.org To: commits@flink.apache.org Message-Id: <025ec5247a0949829ba2fc1dc54c3c83@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5512] [doc] Improve RabbitMQ documentation Date: Thu, 19 Jan 2017 09:37:06 +0000 (UTC) archived-at: Thu, 19 Jan 2017 09:37:08 -0000 Repository: flink Updated Branches: refs/heads/master 586f81813 -> bf72a7c02 [FLINK-5512] [doc] Improve RabbitMQ documentation This closes #3136. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf72a7c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf72a7c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf72a7c0 Branch: refs/heads/master Commit: bf72a7c028cdb189c8646720ae6f7bbd1d301749 Parents: 586f818 Author: Tzu-Li (Gordon) Tai Authored: Tue Jan 17 11:38:50 2017 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Thu Jan 19 10:36:22 2017 +0100 ---------------------------------------------------------------------- docs/dev/connectors/rabbitmq.md | 148 +++++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bf72a7c0/docs/dev/connectors/rabbitmq.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md index 1b621c0..7f117c6 100644 --- a/docs/dev/connectors/rabbitmq.md +++ b/docs/dev/connectors/rabbitmq.md @@ -40,88 +40,118 @@ Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.co #### RabbitMQ Source -A class which provides an interface for receiving data from RabbitMQ. - -The followings have to be provided for the `RMQSource(…)` constructor in order: - -- RMQConnectionConfig. -- queueName: The RabbitMQ queue name. -- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default is `false`). -- deserializationSchema: Deserialization schema to turn messages into Java objects. - -This source can be operated in three different modes: - -1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with - unique correlation IDs. -2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism - (correlation id is not set). -3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode. - -Correlation ids are a RabbitMQ application feature. You have to set it in the message properties -when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not supply -unique correlation ids, the source will throw an exception (if the correlation id is null) or ignore -messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you don't -have to supply correlation ids. - -Example: +This connector provides a `RMQSource` class to consume messages from a RabbitMQ +queue. This source provides three different levels of guarantees, depending +on how it is configured with Flink: + +1. **Exactly-once**: In order to achieve exactly-once guarantees with the +RabbitMQ source, the following is required - + - *Enable checkpointing*: With checkpointing enabled, messages are only + acknowledged (hence, removed from the RabbitMQ queue) when checkpoints + are completed. + - *Use correlation ids*: Correlation ids are a RabbitMQ application feature. + You have to set it in the message properties when injecting messages into RabbitMQ. + The correlation id is used by the source to deduplicate any messages that + have been reproccessed when restoring from a checkpoint. + - *Non-parallel source*: The source must be non-parallel (parallelism set + to 1) in order to achieve exactly-once. This limitation is mainly due to + RabbitMQ's approach to dispatching messages from a single queue to multiple + consumers. + + +2. **At-least-once**: When checkpointing is enabled, but correlation ids +are not used or the source is parallel, the source only provides at-least-once +guarantees. + +3. **No guarantee**: If checkpointing isn't enabled, the source does not +have any strong delivery guarantees. Under this setting, instead of +collaborating with Flink's checkpointing, messages will be automatically +acknowledged once the source receives and processes them. + +Below is a code example for setting up an exactly-once RabbitMQ source. +Inline comments explain which parts of the configuration can be ignored +for more relaxed guarantees.
{% highlight java %} -RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build(); -DataStream streamWithoutCorrelationIds = env - .addSource(new RMQSource(connectionConfig, "hello", new SimpleStringSchema())) - .print - -DataStream streamWithCorrelationIds = env - .addSource(new RMQSource(connectionConfig, "hello", true, new SimpleStringSchema())) - .print +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// checkpointing is required for exactly-once or at-least-once guarantees +env.enableCheckpointing(...); + +final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost") + .setPort(5000) + ... + .build(); + +final DataStream stream = env + .addSource(new RMQSource( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to consume + true, // use correlation ids; can be false if only at-least-once is required + new SimpleStringSchema())) // deserialization schema to turn messages into Java objects + .setParallelism(1); // non-parallel source is only required for exactly-once {% endhighlight %}
{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment +// checkpointing is required for exactly-once or at-least-once guarantees +env.enableCheckpointing(...) + val connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build() -streamWithoutCorrelationIds = env - .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema)) - .print - -streamWithCorrelationIds = env - .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema)) - .print + .setHost("localhost") + .setPort(5000) + ... + .build + +val stream = env + .addSource(new RMQSource[String]( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to consume + true, // use correlation ids; can be false if only at-least-once is required + new SimpleStringSchema)) // deserialization schema to turn messages into Java objects + .setParallelism(1) // non-parallel source is only required for exactly-once {% endhighlight %}
#### RabbitMQ Sink -A class providing an interface for sending data to RabbitMQ. - -The followings have to be provided for the `RMQSink(…)` constructor in order: - -1. RMQConnectionConfig -2. The queue name -3. Serialization schema - -Example: +This connector provides a `RMQSink` class for sending messages to a RabbitMQ +queue. Below is a code example for setting up a RabbitMQ sink.
{% highlight java %} -RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build(); -stream.addSink(new RMQSink(connectionConfig, "hello", new SimpleStringSchema())); +final DataStream stream = ... + +final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("localhost") + .setPort(5000) + ... + .build(); + +stream.addSink(new RMQSink( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to send messages to + new SimpleStringSchema())); // serialization schema to turn Java objects to messages {% endhighlight %}
{% highlight scala %} +val stream: DataStream[String] = ... + val connectionConfig = new RMQConnectionConfig.Builder() -.setHost("localhost").setPort(5000).setUserName(..) -.setPassword(..).setVirtualHost("/").build() -stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema)) + .setHost("localhost") + .setPort(5000) + ... + .build + +stream.addSink(new RMQSink[String]( + connectionConfig, // config for the RabbitMQ connection + "queueName", // name of the RabbitMQ queue to send messages to + new SimpleStringSchema)) // serialization schema to turn Java objects to messages {% endhighlight %}