flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
Date Sun, 10 Jan 2016 10:42:40 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15090974#comment-15090974
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 10:42 AM:
----------------------------------------------------------------------

Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis
with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that KCL's own design
of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector
as a whole: each instance of a KCL application is a "worker", which consumes records from
multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles
which shards are assigned to which workers, and can do this dynamically in a sense that when
new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as
the instance pool of a single KCL application. The implementation will simply be to instantiate
and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment
after each consumer task starts running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private access.
2. The logic of where a Worker instance continues reading from a shard after restore is deep
in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed
checkpoint table (the DynamoDB "leash table").

These two problems eventually led me to the conclusion that it isn't possible for KCL to work
with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for
hacky ways to bypass the above issues, but have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the
KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing
processing engine for Kinesis currently available on the table, I think it is worth the effort.
Tasks that we will have to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a wish-list feature
for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split.
Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe
for higher throughput), will also need to handle Flink-side parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly
with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis
with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that KCL's own design
of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector
as a whole: each instance of a KCL application is a "worker", which consumes records from
multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles
which shards are assigned to which workers, and can do this dynamically in a sense that when
new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as
the instance pool of a single KCL application. The implementation will simply be to instantiate
and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment
after each consumer task starts running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private access.
2. The logic of where a Worker instance continues reading from a shard after restore is deep
in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed
checkpoint table (the DynamoDB "leash table").

These two problems eventually led me to the conclusion that it isn't possible for KCL to work
with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for
hacky ways to bypass the above issues, but have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the
KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing
processing engine for Kinesis currently available on the table, I think it is worth the effort.
Tasks that we will have to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually a wish-list feature for
KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split.
Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe
for higher throughput), will also need to handle Flink-side parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly
with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime.

> Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing
mechanics
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-3211
>                 URL: https://issues.apache.org/jira/browse/FLINK-3211
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>    Affects Versions: 1.0.0
>            Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a cloud service
version of Apache Kafka. Support for AWS Kinesis will be a great addition to the handful of
Flink's streaming connectors to external systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level AWS SDK [1],
or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can be used to consume Kinesis
data, including stream read beginning from a specific offset (or "record sequence number"
in Kinesis terminology). On the other hand, AWS officially recommends using KCL, which offers
a higher-level of abstraction that also comes with checkpointing and failure recovery by using
a KCL-managed AWS DynamoDB "leash table" as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the partition-to-task
(or "shard" in Kinesis terminology) determination and checkpointing to allow the user to focus
only on streaming application logic. This leads to the understanding that we can not use the
KCL to implement the Kinesis streaming connector if we are aiming for a deep integration of
Flink with Kinesis that provides exactly once guarantees (KCL promises only at-least-once).
Therefore, AWS SDK will be the way to go for the implementation of this feature.
> With the ability to read from specific offsets, and also the fact that Kinesis and Kafka
share a lot of similarities, the basic principles of the implementation of Flink's Kinesis
streaming connector will very much resemble the Kafka connector. We can basically follow the
outlines described in [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector implementation
[4]. A few tweaks due to some of Kinesis v.s. Kafka differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I currently don't
think this is a good idea for Kinesis streams (a Kinesis Stream is logically equivalent to
a Kafka topic). Kinesis streams can exist in different AWS regions, and each Kinesis stream
under the same AWS user account may have completely independent access settings with different
authorization keys. Overall, a Kinesis stream feels like a much more consolidated resource
compared to Kafka topics. It would be great to hear more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only partitioning abstraction
for AWS Kinesis is "shards". Therefore, in contrast to the Kafka connector having per broker
connections where the connections can handle multiple Kafka partitions, the Kinesis connector
will only need to have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we were to
implement this feature like the Kafka connector with Kafka / ZK to sync outside view of progress,
we probably could use ZK or DynamoDB like the way KCL works. More thoughts on this part will
be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis Producer Library)
[5]. However, for higher code consistency with the proposed Kinesis Consumer, I think it will
be better to stick with the AWS SDK for the implementation. The implementation should be straight
forward, being almost if not completely the same as the Kafka sink.
> References:
> [1] http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message