beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwilym Evans <>
Subject Re: Kafka offset management
Date Fri, 30 Jun 2017 04:19:29 GMT
Hi JingsongLee,

Thanks for the reply.

What I'm trying to avoid are lost / skipped messages due to two situations:

1. Lost offsets, or
2. Premature offset commits

I've researched snapshot checkpoints, and from what I understand these are
only maintained in Dataflow when a job is updated. If a job crashes, is
cancelled, or is drained, then the checkpoints are lost. This is situation
(1) above.

>From what I understand about auto-commit offsets, it's where the Kafka
client periodically commits offsets it has polled automatically. In the
case of Beam and Dataflow, this would be even if the offsets it is
committing has not yet been fully processed by the pipeline. This is
situation (2) above.

So far I'm not seeing a way to avoid data loss besides resetting to the
earliest offset when a job starts. But, given we retain data in our Kafka
topics for up to 7 days, that is not feasible from a performance point of

Can anyone confirm / deny my understanding here?


On 30 June 2017 at 02:59, JingsongLee <> wrote:

> Hi Gwilym
> KafkaIO uses the save offset to the snapshot (checkpoint) instead of commit offsets to
> Kafka for restarting.
> You can use a kafka client configuration to open auto commit of offsets.
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ("") = true
> Hope this helps.
> Best, JingsongLee
> ------------------------------------------------------------------
> From:Gwilym Evans <>
> Time:2017 Jun 29 (Thu) 15:32
> To:user <>
> Subject:Kafka offset management
> Hi list,
> I was playing around with KafkaIO today to understand how it behaves in a
> failure or restart scenario (be it crash, cancel, or drain), and I found it
> "lost" (or skipped) Kafka messages in these cases. That is, it resumed from
> the latest offsets rather than the last successfully processed offsets, and
> a gap in messages was observed as a result.
> My KafkaIO transform looks like:
> KafkaIO.<Long, String>readBytes()
>                 .withConsumerFactoryFn(new KafkaConsumerFactoryFn())
>                 .withBootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
>                 .withTopics(ImmutableList.of(KAFKA_TOPIC))
> The consumer factory is used because of some runtime SSL key/truststore
> setup:
>             final Map<String, Object> config = Maps.newHashMap();
>             config.put("auto.offset.reset", "latest");
>             config.put("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
>             config.put("", KAFKA_GROUP_ID);
>             config.put("key.deserializer", ByteArrayDeserializer.class.
> getName());
>             config.put("security.protocol", "SSL");
>             config.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1");
>             config.put("ssl.truststore.location",
> truststore.toPath().toString());
>             config.put("ssl.truststore.password",
> kafkaTruststorePassword);
>             config.put("value.deserializer", ByteArrayDeserializer.class.
> getName());
>             return new KafkaConsumer<>(config);
> So, I am setting a, and I know KafkaIO is using the new consumer
> because our Zookeeper is not accessible from Dataflow.
> When I look on the Kafka cluster, there is no record of the consumer
> group's offsets. So I take it KafkaIO is not committing offsets to Kafka.
> Can this be changed to commit offsets when a "batch" of streaming messages
> are seen as processed OK? I am fine with at-least-once.
> I am using Dataflow, Beam 2.0, Kafka
> Cheers,
> Gwilym

View raw message