beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raghu Angadi (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
Date Fri, 07 Oct 2016 16:42:20 GMT

    [ https://issues.apache.org/jira/browse/BEAM-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15555572#comment-15555572
] 

Raghu Angadi edited comment on BEAM-704 at 10/7/16 4:41 PM:
------------------------------------------------------------

{quote} I'd also suggest we persist the latest offset as part of the CheckpointMark so that
once latest is set, it is remembered until new messages arrive and it doesn't need to be resolved
again (and if there were new messages available they won't be missed upon failure). {quote}

The offset is the primary info stored in CheckpointMark. Can you elaborate on what is missing
now?

{quote} I think we should consider checking the offsets (could be the same for "earliest")
when running initialSplits (that's how Spark does that as well, one call from the driver for
all topic-partitions). {quote}

Can you expand on this with an example? offset is part of checkpoint. So the reader either
resumes from checkpoined offset or the 'default offset' when it is restarted from scratch.
'Default offset' is 'latest' in default config. Users can customize this (e.g. by setting
a group-id that preserves latest consumed offset on the Kafka, though this is not the same
as Beam checkpoint).



was (Author: rangadi):

.q I'd also suggest we persist the latest offset as part of the CheckpointMark so that once
latest is set, it is remembered until new messages arrive and it doesn't need to be resolved
again (and if there were new messages available they won't be missed upon failure).

The offset is the primary info stored in CheckpointMark. Can you elaborate on what is missing
now?

.q I think we should consider checking the offsets (could be the same for "earliest") when
running initialSplits (that's how Spark does that as well, one call from the driver for all
topic-partitions).

Can you expand on this with an example? offset is part of checkpoint. So the reader either
resumes from checkpoined offset or the 'default offset' when it is restarted from scratch.
'Default offset' is 'latest' in default config. Users can customize this (e.g. by setting
a group-id that preserves latest consumed offset on the Kafka, though this is not the same
as Beam checkpoint).


> KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark.
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-704
>                 URL: https://issues.apache.org/jira/browse/BEAM-704
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Amit Sela
>
> Currently, the KafkaIO (when configured to "latest") will check the latest offset on
the worker. This means that each worker sees a "different" latest for the time it checks for
the partitions assigned to it.
> This also means that if a worker fails before starting to read, and new messages were
added in between, they would be missed.
> I think we should consider checking the offsets (could be the same for "earliest") when
running initialSplits (that's how Spark does that as well, one call from the driver for all
topic-partitions).
> I'd also suggest we persist the latest offset as part of the CheckpointMark so that once
latest is set, it is remembered until new messages arrive and it doesn't need to be resolved
again (and if there were new messages available they won't be missed upon failure).
> For Spark this is even more important as state is passed in-between micro-batches and
sparse partitions may skip messages until a message finally arrives within the read time-frame.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message