flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Piotr Nowojski (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
Date Thu, 22 Jun 2017 17:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16059699#comment-16059699

Piotr Nowojski commented on FLINK-6988:

Unfortunately KafkaProducer's API is very limited. Especially it doesn't allow to implement
two phase commit protocol like it is done in BucketingSink, because it doesn't allow for neither
resuming nor committing transactions from different workers after crash (last bullet point
above). This is because every time user calls `Producer::initTransactions()`, all pending
(not committed) transactions are being automatically aborted by Kafka Server. Calling `Producer::initTransactions()`
is neccessary to obtain `producerId` and `epoch` values from the Kafka server, which are crucial
for manipulating transactions.

Fortunately there is a walk around this issue. It seems like Kafka's REST API is more flexible
and we should be possible to resume transactions. Every time we begin transaction we can store
`producerId` and `epoch` on the state. In case we need to commit pending transaction on another
worker (after crash), instead of calling `KafkaProducer::initTransactions()` we can restore
`producerId` and `epoch` from the state and commit this pending transaction using those restored

"Hacky" part is that  `producerId` and `epoch` values are hidden behind private fields in
package private classes. That means we can not overload `KafkaProducer` to obtain or set them.
That leaves as with two options. We either reimplement KafkaProducer using Kafka's REST API
(we could copy/paste most of their code) or we use JVM reflection to manually manipulate official
KafkaProducer class.

> Add Apache Kafka 0.11 connector
> -------------------------------
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
> Kafka 0.11 (it will be released very soon) add supports for transactions. Thanks to that,
Flink might be able to implement Kafka sink supporting "exactly-once" semantic. API changes
and whole transactions support is described in [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New FlinkKafkaProducer011
> * upon creation begin transaction, store transaction identifiers into the state and would
write all incoming data to output topic using that transaction
> * on `snapshotState` call, it would flush the data and write in state information that
current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we either abort
this pending transaction (if not every participant successfully saved the snapshot) or restore
and commit it. 

This message was sent by Atlassian JIRA

View raw message