kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "William Greer (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1
Date Thu, 20 Jun 2019 21:34:00 GMT
William Greer created KAFKA-8574:

             Summary: EOS race condition during task transition leads to LocalStateStore truncation
in Kafka Streams 2.0.1
                 Key: KAFKA-8574
                 URL: https://issues.apache.org/jira/browse/KAFKA-8574
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.0.1
            Reporter: William Greer

While using EOS in Kafka Stream there is a race condition where the checkpoint file is written
by the previous owning thread (Thread A) after the new owning thread (Thread B) reads the
checkpoint file. Thread B then starts a restoration since no checkpoint file was found. A
re-balance occurs before Thread B completes the restoration and a third Thread (Thread C)
becomes the owning thread (Thread C) reads the checkpoint file written by Thread A which
does not correspond to the current state of the RocksDB state store. When this race condition
occurs the state store will have the most recent records and some amount of the oldest records
but will be missing some amount of records in between. If A->Z represents the entire changelog
to the present then when this scenario occurs the state store would contain records [A->K
and Y->Z] where the state store is missing records K->Y.
This race condition is possible due to dirty writes and dirty reads of the checkpoint file.
Thread refers to a Kafka Streams StreamThread [0]
Thread A, B and C are running in the same JVM in the same streams application.
Thread-A is in RUNNING state and up to date on partition 1.
Thread-A is suspended on 1. This does not write a checkpoint file because EOS is enabled [1]
Thread-B is assigned to 1
Thread-B does not find checkpoint in StateManager [2]
Thread-A is assigned a different partition. Task writes suspended tasks checkpoints to disk.
Checkpoint for 1 is written. [3]
Thread-B deletes LocalStore and starts restoring. The deletion of the LocalStore does not
delete checkpoint file. [4]
Thread-C is revoked
Thread-A is revoked
Thread-B is revoked from the assigned status. Does not write a checkpoint file
- Note Thread-B never reaches the running state, it remains in the PARTITIONS_ASSIGNED state
until it transitions to the PARTITIONS_REVOKED state
Thread-C is assigned 1
Thread-C finds checkpoint in StateManager. This checkpoint corresponds to where Thread-A left
the state store for partition 1 at and not where Thread-B left the state store at.
Thread-C begins restoring from checkpoint. The state store is missing an unknown number of
records at this point
Thread-B is assigned does not write a checkpoint file for partition 1, because it had not
reached a running status before being revoked
[0] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
[1] https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553
[2] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98
[3] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105
& https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331
[4] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228
& https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123
Specifically https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119
is where the state store is deleted but the checkpoint file is not.
*How we recovered:*
1. Deleted the impacted state store. This triggered multiple exceptions and initiated a re-balance.
*Possible approaches to address this issue:*
1. Add a collection of global task locks for concurrency protection of the checkpoint file.
With the lock for suspended tasks being released after closeNonAssignedSuspendedTasks and
the locks being acquired after lock release for the assigned tasks.
2. Delete checkpoint file in EOS when partitions are revoked. This doesn't address the race
condition but would make it so that the checkpoint file would never be ahead of the LocalStore
in EOS, this would increase the likelihood of triggering a full restoration of a LocalStore
on partition movement between threads on one host.
3. Configure task stickiness for StreamThreads. E.G. if a host with multiple StreamThreads
is assigned a task the host had before prefer to assign the task to the thread on the host
that had the task before.
4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up previous assignment
step and a bootstrap new assignment. This would require all valid threads to complete the
clean up step before any thread could progress into the bootstrap new assignment step.
5. Force a checkpoint of the current position during PARTITIONS_REVOKED. I don't think this
addresses the race condition but I think it mitigates the truncation scenario.
*Possibly mitigated by KAFKA-7672*
It seems the fix for https://issues.apache.org/jira/browse/KAFKA-7672 introduces a forced
checkpoint during EOS so this truncation scenario may be mitigated for 2.2.0 but not for earlier
versions, The change-set for KAFKA-7672 doesn't address the race condition's around reading
and writing the checkpoint files.

This message was sent by Atlassian JIRA

View raw message