flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-4033] Polish up Kinesis connector documentation
Date Wed, 29 Jun 2016 11:41:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master fa42cdabf -> 256c9c4da


[FLINK-4033] Polish up Kinesis connector documentation

Includes:
1. Scala examples for consumer and producer
2. Add information about AWS Kinesis service usage
3. Add Kinesis connecter to the fault tolerance guarantees table
4. Minor typo fix in Kafka documentation

This closes #2181


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/256c9c4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/256c9c4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/256c9c4d

Branch: refs/heads/master
Commit: 256c9c4daf11349128eaa2e71a434f609e57053c
Parents: fa42cda
Author: Gordon Tai <tzulitai@gmail.com>
Authored: Wed Jun 29 18:29:08 2016 +0800
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Jun 29 13:41:35 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md   |  2 +-
 docs/apis/streaming/connectors/kinesis.md | 63 ++++++++++++++++++++++----
 docs/apis/streaming/fault_tolerance.md    | 16 +++++--
 3 files changed, 68 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md
index 9bd70be..e7cd05b 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -236,7 +236,7 @@ properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id", "test");
 
-val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties);
+val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
 myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
 stream = env
     .addSource(myConsumer)

http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md
index 66c078a..db3a9c4 100644
--- a/docs/apis/streaming/connectors/kinesis.md
+++ b/docs/apis/streaming/connectors/kinesis.md
@@ -52,12 +52,16 @@ mvn clean install -Pinclude-kinesis -DskipTests
 
 
 Note that the streaming connectors are not part of the binary distribution. 
-See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+See how to link with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-#### Usage of Consumer
+### Using the Amazon Kinesis Streams Service
+Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
+to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read
/ write to the Kinesis streams.
+
+### Kinesis Consumer
 
 The `FlinkKinesisConsumer` can be used to pull data from multiple Kinesis streams within
the same AWS region in parallel.
-It participates in Flink's distributed snapshot checkpointing and provides exactly-once processing
guarantees. Note
+It participates in Flink's distributed snapshot checkpointing and provides exactly-once user-defined
state update guarantees. Note
 that the current version can not handle resharding of Kinesis streams. When Kinesis streams
are resharded, the consumer
 will fail and the Flink streaming job must be resubmitted.
 
@@ -78,10 +82,28 @@ kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP
 
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
 
-DataStream<String> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
     "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig))
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val kinesisConsumerConfig = new Properties();
+kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+kinesisConsumerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
+    "aws_access_key_id_here");
+kinesisConsumerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
+    "aws_secret_key_here");
+kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST");
+
+val env = StreamExecutionEnvironment.getEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
+{% endhighlight %}
+</div>
 </div>
 
 The above is a simple example of using the consumer. Configuration for the consumer is supplied
with a `java.util.Properties`
@@ -92,13 +114,15 @@ the AWS access key ID and secret key are directly supplied in the configuration
 from the newest position in the Kinesis stream (the other option will be setting `KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest
record possible).
 
-#### Usage of Producer
+Other optional configuration keys can be found in `KinesisConfigConstants`.
+
+### Kinesis Producer
 
-The `FlinkKinesisProducer` is used for putting data from a Flink stream onto a Kinesis stream.
Note that the producer is not participating in 
+The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream.
Note that the producer is not participating in
 Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of
a failure, data will be written again
 to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
 
-To put data onto a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS
dashboard.
+To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS
dashboard.
 
 For the monitoring to work, the user accessing the stream needs access to the Cloud watch
service.
 
@@ -113,16 +137,37 @@ kinesisProducerConfig.put(
 kinesisProducerConfig.put(
     KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
     "aws_secret_key_here");
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(),
kinesisProducerConfig);
 
+FlinkKinesisProducer<String> kinesis =
+    new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig);
 kinesis.setFailOnError(true);
-kinesis.setDefaultStream("test-flink");
+kinesis.setDefaultStream("kinesis_stream_name");
 kinesis.setDefaultPartition("0");
 
 DataStream<String> simpleStringStream = ...;
 simpleStringStream.addSink(kinesis);
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val kinesisProducerConfig = new Properties();
+kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
+kinesisProducerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
+    "aws_access_key_id_here");
+kinesisProducerConfig.put(
+    KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
+    "aws_secret_key_here");
+
+val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, kinesisProducerConfig);
+kinesis.setFailOnError(true);
+kinesis.setDefaultStream("kinesis_stream_name");
+kinesis.setDefaultPartition("0");
+
+val simpleStringStream = ...;
+simpleStringStream.addSink(kinesis);
+{% endhighlight %}
+</div>
 </div>
 
 The above is a simple example of using the producer. Configuration for the producer with
the mandatory configuration values is supplied with a `java.util.Properties`

http://git-wip-us.apache.org/repos/asf/flink/blob/256c9c4d/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
index 8426f11..3edc65d 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -103,7 +103,7 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 ### Fault Tolerance Guarantees of Data Sources and Sinks
 
 Flink can guarantee exactly-once state updates to user-defined state only when the source
participates in the
-snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number
generators), but
+snapshotting mechanism. This is currently guaranteed for the Kafka source and AWS Kinesis
Streams source (and internal number generators), but
 not for other sources. The following table lists the state update guarantees of Flink coupled
with the bundled sources:
 
 <table class="table table-bordered">
@@ -121,6 +121,11 @@ not for other sources. The following table lists the state update guarantees
of
             <td>Use the appropriate Kafka connector for your version</td>
         </tr>
         <tr>
+            <td>AWS Kinesis Streams</td>
+            <td>exactly once</td>
+            <td>Current version does not handle stream resharding</td>
+        </tr>
+        <tr>
             <td>RabbitMQ</td>
             <td>at most once (v 0.10) / exactly once (v 1.0) </td>
             <td></td>
@@ -178,8 +183,13 @@ state updates) of Flink coupled with bundled sinks:
     </tr>
     <tr>
         <td>Cassandra sink</td>
-        <td>at-least-once / exactly-once</td>
-        <td>exactly-once only for idempotent updates</td>
+        <td>at least once / exactly once</td>
+        <td>exactly once only for idempotent updates</td>
+    </tr>
+    <tr>
+        <td>AWS Kinesis Streams</td>
+        <td>at least once</td>
+        <td></td>
     </tr>
     <tr>
         <td>File sinks</td>


Mime
View raw message