beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rangadi <>
Subject [GitHub] beam pull request #3612: Kafka exactly-once sink.
Date Fri, 21 Jul 2017 06:46:44 GMT
GitHub user rangadi opened a pull request:

    Kafka exactly-once sink.

    Implementation of an exactly-once sink for Kafka, making use of transactions added in
Kafka 0.11. This requires exact-once semantics for runners similar to Dataflow. 
    This is not ready for merge. Will distribute more once it goes through initial feedback.
    There are a few minor TODOs for implementation and of course need to add more sink tests.
    This uses consumer group id for the topic to store metadata. It seems to work well, but
it is not very convenient for a user to manage. E.g. if the user wants to restart a job from
scratch using the same group_id, existing metadata prevents it from starting. User has to
use new group (which is perfectly fine) or needs to clear the metadata programatically. This
sink could help users manage this better (e.g. an option to discard metatada). will see.
    How this works : from a comment in he code:
        // Dataflow ensures at-least once processing for side effects like sinks. In order
to provide
        // exactly-once semantics, a sink needs to be idempotent or it should avoid writing
        // that have already been written. This snk does the latter. All the the records are
        // across a fixed number of shards and records in each shard are written in order.
It drops
        // any records that are already written and buffers those arriving out of order.
        //  // Exactly once sink involves two shuffles of the records:
        //            A -- GBK --> B -- GBK --> C
        // Processing guarantees also require deterministic processing within user transforms.
        // in this case that implies the order of the records seen by C should not be affected
        // restarts in upstream stages link B & A.
        // A : Assigns a random shard for message. Note that there are no ordering guarantees
        //     writing user records to Kafka. User can still control partitioning among topic
        //     partitions as with regular sink (of course, there are no ordering guarantees
        //     regular Kafka sink either).
        // B : Assigns an id sequentially for each messages within a shard.
        // C : Writes each shard to Kafka in sequential id order. In Dataflow, when C sees
a record
        //     and id, it implies that record and the associated id are checkpointed to persistent
        //     storage and this record will always have same id, even in retries.
        //     Exactly-once semantics are achieved by writing records in the strict order
        //     these checkpointed sequence ids.
        // Parallelism for B and C is fixed to 'numShards', which defaults to number of partitions
        // for the topic. A few reasons for that:
        //  - B & C implement their functionality using per-key state. Shard id makes
it independent
        //    of cardinality of user key.
        //  - We create one producer per shard, and its 'transactional id' is based on shard
id. This
        //    requires that number of shards to be finite. This also helps with batching.
and avoids
        //    initializing producers and transactions.
        //  - Most importantly, each of sharded writers stores 'next message id' in partition
        //    metadata, which is committed atomically with Kafka transactions. This is critical
        //    to handle retries of C correctly. Initial testing showed number of shards could
        //    larger than number of partitions for the topic.
        // Number of shards can change across multiple runs of a pipeline (job upgrade in

You can merge this pull request into a Git repository by running:

    $ git pull eo_sink

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3612
commit 76bab2d8ec83ccc2826c929a50fb13d62bb4685b
Author: Raghu Angadi <>
Date:   2017-07-21T06:26:49Z

    Kafka exactly-once sink.
    Tested manually with direct runner and on dataflow.


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

View raw message