kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bart Vercammen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6000) streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
Date Mon, 02 Oct 2017 19:13:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188651#comment-16188651
] 

Bart Vercammen commented on KAFKA-6000:
---------------------------------------

Ok, I'll try to collect all client- and server-logs from the platform and post them here.

Some partitions are indeed successfully restored.  I have not seen a direct pattern here yet,
but at first glance it looks like the partitions that contain less than 100,000 records or
so successfully recover, and the partitions with (a lot) more records fail.  But as said,
this is what I notice at first glance, but still need to investigate/test further to be sure
this is the pattern.

I'm also trying to reproduce this in a fully controlled unit-test, but currently this is still
work in progress ...
Once I catch this issue in a unit-test, I'll also post it here as a reference.


> streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
> -----------------------------------------------------------
>
>                 Key: KAFKA-6000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6000
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, streams
>    Affects Versions: 0.10.2.1, 0.11.0.0
>            Reporter: Bart Vercammen
>            Priority: Blocker
>
> Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)
> {noformat}
> 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] Registering state
store lateststate to its state manager 
> 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] Restoring state
store lateststate from changelog topic scratch.lateststate.dsh 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset for partition
scratch.lateststate.dsh-2 to latest offset. 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition scratch.lateststate.dsh-2
is unknown for fetching offset, wait for metadata refresh 
> 11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending ListOffsetRequest
(type=ListOffsetRequest, replicaId=-1, partitionTimestamps={scratch.lateststate.dsh-2=-1},
minVersion=0) to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received ListOffsetResponse
{responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling ListOffsetResponse
response for scratch.lateststate.dsh-2. Fetched offset 1773763, timestamp -1 
> 11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset for partition
scratch.lateststate.dsh-2 to earliest offset. 
> 11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending ListOffsetRequest
(type=ListOffsetRequest, replicaId=-1, partitionTimestamps={scratch.lateststate.dsh-2=-2},
minVersion=0) to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received ListOffsetResponse
{responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling ListOffsetResponse
response for scratch.lateststate.dsh-2. Fetched offset 0, timestamp -1 
> 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring partition scratch.lateststate.dsh-2
from offset 0 to endOffset 1773763 
> 11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request for partition
scratch.lateststate.dsh-2 at offset 0 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002
rack: null) 
> 11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for partitions
[scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack:
null) 
> 11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for partition
scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched record for
partition scratch.lateststate.dsh-2 with offset 0 to buffered record list 
> 11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records in fetch
response for partition scratch.lateststate.dsh-2 with offset 0 
> 11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched records
at offset 0 for assigned partition scratch.lateststate.dsh-2 and update position to 1586527

> 11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched records for
scratch.lateststate.dsh-2 at offset 0 since the current position is 1586527 
> 11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request for partition
scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for partitions
[scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack:
null) 
> 11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for partition
scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.499 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched record for
partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list 
> 11:24:16.500 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records in fetch
response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.501 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request for partition
scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.502 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for partitions
[scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack:
null) 
> 11:24:16.511 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for partition
scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched record for
partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records in fetch
response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.513 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request for partition
scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.515 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for partitions
[scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack:
null) 
> 11:24:16.517 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for partition
scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.518 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched record for
partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list 
> 11:24:16.519 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records in fetch
response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.520 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request for partition
scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.520 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for partitions
[scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack:
null) 
> 11:24:16.522 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for partition
scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091
(id: 1002 rack: null) 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched record for
partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records in fetch
response for partition scratch.lateststate.dsh-2 with offset 1586527 
> {noformat}
> In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a KafkaStreams
application running version 0.10.2.1.  The streams application uses an underlying statestore
(`scratch.lateststate.dsh`).  The problem I've seen is that when the kafka streams application
(re)starts when quite some data is already present in the state-stores, it does not restore
the state.  KafkaStreams remains in {{REBALANCING}} state, and never exits the {{restoreActiveState}}
function in {{ProcessorStateManager}}.
> Now, what I also noticed is that sometimes the state-restore seems to work when the number
of records in the changelog-topic is below 100K (or something like that).  I've seen a successful
restore when the restore-consumer-lag was below 100K records.
> When running the exact same application on a 0.10.2.1 Kafka cluster the issue never occures.
 It only happens when I run the 0.10.2.1 KafkaStreams application against a 0.11 Kafka cluster.
> The logs above are a snippet when restoring the changelog that 'hangs'.
> It also shows FetchResponses returning 0 records all the time which look awkward to me.
> For what I can tell, in KafkaStreams, the code is stuck in this loop in {{restoreActiveState}}
because the offset does not increment anymore  : 
> {code}
>             while (true) {
>                 long offset = 0L;
>                 for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition))
{
>                     offset = record.offset();
>                     if (offset >= limit) break;
>                     stateRestoreCallback.restore(record.key(), record.value());
>                 }
>                 if (offset >= limit) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) == endOffset) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) > endOffset) {
>                     // For a logging enabled changelog (no offset limit),
>                     // the log end offset should not change while restoring since it
is only written by this thread.
>                     throw new IllegalStateException(String.format("%s Log end offset
of %s should not change while restoring: old end offset %d, current offset %d",
>                             logPrefix, storePartition, endOffset, restoreConsumer.position(storePartition)));
>                 }
>             }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message