kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6782) GlobalStateStore never finishes restoring when consuming transactional messages
Date Thu, 12 Apr 2018 20:20:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436252#comment-16436252
] 

Matthias J. Sax commented on KAFKA-6782:
----------------------------------------

Thanks for reporting this issue. Make sense to me -- feel free to open an PR. I would assume
that it affects GlobalKTables, too? Thus, we should add tests for both cases.

> GlobalStateStore never finishes restoring when consuming transactional messages
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6782
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Lingxiao WANG
>            Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his proposition
:
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> doesn't work for me. In my situation, there is chance to have several transaction markers
appear in sequence in one partition. In this case, the consumer is blocked and can't poll
any records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity
to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' outside
of the cycle to guarantee that event if no records are polled, the offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message