flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject flink git commit: [FLINK-5512] [doc] Improve RabbitMQ documentation
Date Thu, 19 Jan 2017 09:37:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 586f81813 -> bf72a7c02


[FLINK-5512] [doc] Improve RabbitMQ documentation

This closes #3136.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf72a7c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf72a7c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf72a7c0

Branch: refs/heads/master
Commit: bf72a7c028cdb189c8646720ae6f7bbd1d301749
Parents: 586f818
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Tue Jan 17 11:38:50 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Thu Jan 19 10:36:22 2017 +0100

----------------------------------------------------------------------
 docs/dev/connectors/rabbitmq.md | 148 +++++++++++++++++++++--------------
 1 file changed, 89 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf72a7c0/docs/dev/connectors/rabbitmq.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md
index 1b621c0..7f117c6 100644
--- a/docs/dev/connectors/rabbitmq.md
+++ b/docs/dev/connectors/rabbitmq.md
@@ -40,88 +40,118 @@ Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.co
 
 #### RabbitMQ Source
 
-A class which provides an interface for receiving data from RabbitMQ.
-
-The followings have to be provided for the `RMQSource(…)` constructor in order:
-
-- RMQConnectionConfig.
-- queueName: The RabbitMQ queue name.
-- usesCorrelationId: `true` when correlation ids should be used, `false` otherwise (default
is `false`).
-- deserializationSchema: Deserialization schema to turn messages into Java objects.
-
-This source can be operated in three different modes:
-
-1. Exactly-once (when checkpointed) with RabbitMQ transactions and messages with
-    unique correlation IDs.
-2. At-least-once (when checkpointed) with RabbitMQ transactions but no deduplication mechanism
-    (correlation id is not set).
-3. No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
-
-Correlation ids are a RabbitMQ application feature. You have to set it in the message properties
-when injecting messages into RabbitMQ. If you set `usesCorrelationId` to true and do not
supply
-unique correlation ids, the source will throw an exception (if the correlation id is null)
or ignore
-messages with non-unique correlation ids. If you set `usesCorrelationId` to false, then you
don't
-have to supply correlation ids.
-
-Example:
+This connector provides a `RMQSource` class to consume messages from a RabbitMQ
+queue. This source provides three different levels of guarantees, depending
+on how it is configured with Flink:
+
+1. **Exactly-once**: In order to achieve exactly-once guarantees with the
+RabbitMQ source, the following is required -
+ - *Enable checkpointing*: With checkpointing enabled, messages are only
+ acknowledged (hence, removed from the RabbitMQ queue) when checkpoints
+ are completed.
+ - *Use correlation ids*: Correlation ids are a RabbitMQ application feature.
+ You have to set it in the message properties when injecting messages into RabbitMQ.
+ The correlation id is used by the source to deduplicate any messages that
+ have been reproccessed when restoring from a checkpoint.
+ - *Non-parallel source*: The source must be non-parallel (parallelism set
+ to 1) in order to achieve exactly-once. This limitation is mainly due to
+ RabbitMQ's approach to dispatching messages from a single queue to multiple
+ consumers.
+
+
+2. **At-least-once**: When checkpointing is enabled, but correlation ids
+are not used or the source is parallel, the source only provides at-least-once
+guarantees.
+
+3. **No guarantee**: If checkpointing isn't enabled, the source does not
+have any strong delivery guarantees. Under this setting, instead of
+collaborating with Flink's checkpointing, messages will be automatically
+acknowledged once the source receives and processes them.
+
+Below is a code example for setting up an exactly-once RabbitMQ source.
+Inline comments explain which parts of the configuration can be ignored
+for more relaxed guarantees.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-DataStream<String> streamWithoutCorrelationIds = env
-	.addSource(new RMQSource<String>(connectionConfig, "hello", new SimpleStringSchema()))
-	.print
-
-DataStream<String> streamWithCorrelationIds = env
-	.addSource(new RMQSource<String>(connectionConfig, "hello", true, new SimpleStringSchema()))
-	.print
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+// checkpointing is required for exactly-once or at-least-once guarantees
+env.enableCheckpointing(...);
+
+final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build();
+    
+final DataStream<String> stream = env
+    .addSource(new RMQSource<String>(
+        connectionConfig,            // config for the RabbitMQ connection
+        "queueName",                 // name of the RabbitMQ queue to consume
+        true,                        // use correlation ids; can be false if only at-least-once
is required
+        new SimpleStringSchema()))   // deserialization schema to turn messages into Java
objects
+    .setParallelism(1);              // non-parallel source is only required for exactly-once
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+// checkpointing is required for exactly-once or at-least-once guarantees
+env.enableCheckpointing(...)
+
 val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-streamWithoutCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", new SimpleStringSchema))
-    .print
-
-streamWithCorrelationIds = env
-    .addSource(new RMQSource[String](connectionConfig, "hello", true, new SimpleStringSchema))
-    .print
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build
+    
+val stream = env
+    .addSource(new RMQSource[String](
+        connectionConfig,            // config for the RabbitMQ connection
+        "queueName",                 // name of the RabbitMQ queue to consume
+        true,                        // use correlation ids; can be false if only at-least-once
is required
+        new SimpleStringSchema))     // deserialization schema to turn messages into Java
objects
+    .setParallelism(1)               // non-parallel source is only required for exactly-once
 {% endhighlight %}
 </div>
 </div>
 
 #### RabbitMQ Sink
-A class providing an interface for sending data to RabbitMQ.
-
-The followings have to be provided for the `RMQSink(…)` constructor in order:
-
-1. RMQConnectionConfig
-2. The queue name
-3. Serialization schema
-
-Example:
+This connector provides a `RMQSink` class for sending messages to a RabbitMQ
+queue. Below is a code example for setting up a RabbitMQ sink.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build();
-stream.addSink(new RMQSink<String>(connectionConfig, "hello", new SimpleStringSchema()));
+final DataStream<String> stream = ...
+
+final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build();
+    
+stream.addSink(new RMQSink<String>(
+    connectionConfig,            // config for the RabbitMQ connection
+    "queueName",                 // name of the RabbitMQ queue to send messages to
+    new SimpleStringSchema()));  // serialization schema to turn Java objects to messages
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
+val stream: DataStream[String] = ...
+
 val connectionConfig = new RMQConnectionConfig.Builder()
-.setHost("localhost").setPort(5000).setUserName(..)
-.setPassword(..).setVirtualHost("/").build()
-stream.addSink(new RMQSink[String](connectionConfig, "hello", new SimpleStringSchema))
+    .setHost("localhost")
+    .setPort(5000)
+    ...
+    .build
+    
+stream.addSink(new RMQSink[String](
+    connectionConfig,         // config for the RabbitMQ connection
+    "queueName",              // name of the RabbitMQ queue to send messages to
+    new SimpleStringSchema))  // serialization schema to turn Java objects to messages
 {% endhighlight %}
 </div>
 </div>


Mime
View raw message