beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Daniel Halperin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-990) KafkaIO does not commit offsets to Kafka
Date Thu, 01 Dec 2016 21:49:58 GMT

     [ https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Daniel Halperin updated BEAM-990:
---------------------------------
    Component/s: sdk-java-extensions

> KafkaIO does not commit offsets to Kafka
> ----------------------------------------
>
>                 Key: BEAM-990
>                 URL: https://issues.apache.org/jira/browse/BEAM-990
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Alban Perillat-Merceroz
>              Labels: KafkaIO
>
> I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in
the {{__consumer_offsets}} topic).
> I'm configuring the Kafka reader with 
> {code:java}
> .updateConsumerProperties(ImmutableMap.of(
>               ConsumerConfig.GROUP_ID_CONFIG, "my-group",
>               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
>               ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with
default value either (5000ms)
>             ))
> {code}
> But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job
will restart at latest offset).
> I can't find in the code where the offsets are supposed to be committed.
> I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:
> {code:java}
> private void consumerPollLoop() {
>             // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
>             while (!closed.get()) {
>                 try {
>                     ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
>                     if (!records.isEmpty() && !closed.get()) {
>                         availableRecordsQueue.put(records); // blocks until dequeued.
>                         // Manual commit
>                         consumer.commitSync();
>                     }
>                 } catch (InterruptedException e) {
>                     LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
>                     break;
>                 } catch (WakeupException e) {
>                     break;
>                 }
>             }
>             LOG.info("{}: Returning from consumer pool loop", this);
>         }
> {code}
> Is this a bug in KafkaIO or am I misconfiguring something?
> Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
but I'm confident the code is similar for this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message