beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sushil Ks <>
Subject Re: KafkaIO reading from latest offset when pipeline fails on FlinkRunner
Date Thu, 01 Feb 2018 09:29:59 GMT
       Apologies for delay in my reply,

@Raghu Angadi
            This checkpoints 20 mins, as you mentioned before any
checkpoint is created and if the pipeline restarts, it's reading from the
latest offset.

        Thanks a lot for sharing your learnings, However in case of any
*UserCodeException* while processing the element as part of ParDo after
materializing the window, the pipeline drops the unprocessed elements and
restarts. Is this expected from Beam?

On Wed, Jan 17, 2018 at 2:13 AM, Kenneth Knowles <> wrote:

> Is there a JIRA filed for this? I think this discussion should live in a
> ticket.
> Kenn
> On Wed, Jan 10, 2018 at 11:00 AM, Mingmin Xu <> wrote:
>> @Sushil, I have several jobs running on KafkaIO+FlinkRunner, hope my
>> experience can help you a bit.
>> For short, `ENABLE_AUTO_COMMIT_CONFIG` doesn't meet your requirement, you
>> need to leverage exactly-once checkpoint/savepoint in Flink. The reason
>> is,  with `ENABLE_AUTO_COMMIT_CONFIG` KafkaIO commits offset after data is
>> read, and once job is restarted KafkaIO reads from last_committed_offset.
>> In my jobs, I enable external(external should be optional I think?)
>> checkpoint on exactly-once mode in Flink cluster. When the job auto-restart
>> on failures it doesn't lost data. In case of manually redeploy the job, I
>> use savepoint to cancel and launch the job.
>> Mingmin
>> On Wed, Jan 10, 2018 at 10:34 AM, Raghu Angadi <>
>> wrote:
>>> How often does your pipeline checkpoint/snapshot? If the failure happens
>>> before the first checkpoint, the pipeline could restart without any state,
>>> in which case KafkaIO would read from latest offset. There is probably some
>>> way to verify if pipeline is restarting from a checkpoint.
>>> On Sun, Jan 7, 2018 at 10:57 PM, Sushil Ks <> wrote:
>>>> HI Aljoscha,
>>>>                    The issue is let's say I consumed 100 elements in 5
>>>> mins Fixed Window with *GroupByKey* and later I applied *ParDO* for
>>>> all those elements. If there is an issue while processing element 70 in
>>>> *ParDo *and the pipeline restarts with *UserCodeException *it's
>>>> skipping the rest 30 elements. Wanted to know if this is expected? In case
>>>> if you still having doubt let me know will share a code snippet.
>>>> Regards,
>>>> Sushil Ks
>> --
>> ----
>> Mingmin

View raw message