beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amit Sela (JIRA)" <>
Subject [jira] [Commented] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
Date Sun, 09 Oct 2016 07:18:20 GMT


Amit Sela commented on BEAM-704:

I forgot about consumer-group..
I didn't mean that Beam currently asks "start-checkpoint" for UnboundedSources, but I suggested
that it would be possible, so instead of assigning topic/partitions and asking for offsets
on the worker, it would have all the information (topic-partition-offset) prior to splitting
and they would be a property of the Source instead of the Reader (if there is a previous CheckpointMark
the Reader would ignore them). I do understand how this might be problematic if the driver
may run in an environment that can't connect with the Kafka cluster (Spark can run the driver
on cluster as well).   

> KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
> -------------------------------------------------------------------------------------------
>                 Key: BEAM-704
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Amit Sela
>            Assignee: Raghu Angadi
> Currently, the KafkaIO (when configured to "latest") will check the latest offset on
the worker. This means that each worker sees a "different" latest for the time it checks for
the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new messages were
added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for "earliest") when
running initialSplits (that's how Spark does that as well, one call from the driver for all
> I'd also suggest we persist the latest offset as part of the CheckpointMark so that once
latest is set, it is remembered until new messages arrive and it doesn't need to be resolved
again (and if there were new messages available they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between micro-batches and
sparse partitions may skip messages until a message finally arrives within the read time-frame.

This message was sent by Atlassian JIRA

View raw message