beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alban Perillat-Merceroz (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-990) KafkaIO does not commit offsets to Kafka
Date Thu, 01 Dec 2016 22:11:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Alban Perillat-Merceroz updated BEAM-990:
-----------------------------------------
    Description: 
I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the
{{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default
value either (5000ms)
            ))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will
restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:

{code:java}
private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until dequeued.
                        // Manual commit
                        consumer.commitSync();
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
                    break;
                } catch (WakeupException e) {
                    break;
                }
            }

            LOG.info("{}: Returning from consumer pool loop", this);
        }
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
but I'm confident the code is similar for this case.

Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch.
I'm currently testing it and will be able to open a pull request soon:

{code:java}
// KafkaCheckpointMark.java

    /**
     * Optional consumer that will be used to commit offsets into Kafka when finalizeCheckpoint()
is called
     */
    @Nullable
    private final Consumer consumer;

    public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer)
{
        this.partitions = partitions;
        this.consumer = consumer;
    }

    /**
     * Commit synchronously into Kafka offsets that have been passed downstream.
     */
    @Override
    public void finalizeCheckpoint() throws IOException {
        if (consumer == null) {
            LOG.warn("finalizeCheckpoint(): no consumer provided, will not commit anything.");
            return;
        }
        if (partitions.size() == 0) {
            LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
            return;
        }

        final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
        String committedOffsets = "";
        for (PartitionMark partition : partitions) {
            TopicPartition topicPartition = partition.getTopicPartition();
            offsets.put(topicPartition, new OffsetAndMetadata(partition.offset));
            committedOffsets += topicPartition.topic() + "-" + topicPartition.partition()
+ ":" + partition.offset + ",";
        }

        final String printableOffsets = committedOffsets.substring(0, committedOffsets.length()
- 1);
        try {
            consumer.commitSync(offsets);
            LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", printableOffsets);
        } catch (Exception e) {
            LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka offsets [{}]",
                    e.getClass().getSimpleName(),
                    printableOffsets);
        }
    }
{code}

  was:
I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the
{{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default
value either (5000ms)
            ))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will
restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:

{code:java}
private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until dequeued.
                        // Manual commit
                        consumer.commitSync();
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
                    break;
                } catch (WakeupException e) {
                    break;
                }
            }

            LOG.info("{}: Returning from consumer pool loop", this);
        }
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
but I'm confident the code is similar for this case.


> KafkaIO does not commit offsets to Kafka
> ----------------------------------------
>
>                 Key: BEAM-990
>                 URL: https://issues.apache.org/jira/browse/BEAM-990
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Alban Perillat-Merceroz
>              Labels: KafkaIO
>
> I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in
the {{__consumer_offsets}} topic).
> I'm configuring the Kafka reader with 
> {code:java}
> .updateConsumerProperties(ImmutableMap.of(
>               ConsumerConfig.GROUP_ID_CONFIG, "my-group",
>               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
>               ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with
default value either (5000ms)
>             ))
> {code}
> But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job
will restart at latest offset).
> I can't find in the code where the offsets are supposed to be committed.
> I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:
> {code:java}
> private void consumerPollLoop() {
>             // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
>             while (!closed.get()) {
>                 try {
>                     ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
>                     if (!records.isEmpty() && !closed.get()) {
>                         availableRecordsQueue.put(records); // blocks until dequeued.
>                         // Manual commit
>                         consumer.commitSync();
>                     }
>                 } catch (InterruptedException e) {
>                     LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
>                     break;
>                 } catch (WakeupException e) {
>                     break;
>                 }
>             }
>             LOG.info("{}: Returning from consumer pool loop", this);
>         }
> {code}
> Is this a bug in KafkaIO or am I misconfiguring something?
> Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
but I'm confident the code is similar for this case.
> Edit: I found the correct method where KafkaIO is supposed to commit at the end of a
batch. I'm currently testing it and will be able to open a pull request soon:
> {code:java}
> // KafkaCheckpointMark.java
>     /**
>      * Optional consumer that will be used to commit offsets into Kafka when finalizeCheckpoint()
is called
>      */
>     @Nullable
>     private final Consumer consumer;
>     public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer
consumer) {
>         this.partitions = partitions;
>         this.consumer = consumer;
>     }
>     /**
>      * Commit synchronously into Kafka offsets that have been passed downstream.
>      */
>     @Override
>     public void finalizeCheckpoint() throws IOException {
>         if (consumer == null) {
>             LOG.warn("finalizeCheckpoint(): no consumer provided, will not commit anything.");
>             return;
>         }
>         if (partitions.size() == 0) {
>             LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
>             return;
>         }
>         final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
>         String committedOffsets = "";
>         for (PartitionMark partition : partitions) {
>             TopicPartition topicPartition = partition.getTopicPartition();
>             offsets.put(topicPartition, new OffsetAndMetadata(partition.offset));
>             committedOffsets += topicPartition.topic() + "-" + topicPartition.partition()
+ ":" + partition.offset + ",";
>         }
>         final String printableOffsets = committedOffsets.substring(0, committedOffsets.length()
- 1);
>         try {
>             consumer.commitSync(offsets);
>             LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", printableOffsets);
>         } catch (Exception e) {
>             LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka offsets [{}]",
>                     e.getClass().getSimpleName(),
>                     printableOffsets);
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message