kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maysam Yabandeh (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up
Date Wed, 11 May 2016 16:49:12 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280411#comment-15280411
] 

Maysam Yabandeh commented on KAFKA-3693:
----------------------------------------

[~junrao] based on controller logs the following seems to be the faulty scenario that breaks
the assumption that controller sends to a new broker LeadAndIsr requests that contains all
the partitions: (note: the manual investigation process is error-prone and subject to mistakes)

# KafkaController::shutdownBroker
{code}
INFO controller.KafkaController: [Controller 17]: Shutting down broker 20
{code}
{code}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,

 topicAndPartition.partition, id)), OfflineReplica)
{code} Note here that input set has only one member
# ReplicaStateMachine::handleStateChanges(Set\[PartitionAndReplica\], ReplicaState, Callbacks)
{code}
INFO controller.ReplicaStateMachine: [Replica state machine on controller 17]: Invoking state
change to OfflineReplica for replicas [Topic=topic-xyz,Partition=134,Replica=20]
{code}
{code}
brokerRequestBatch.newBatch()

partitions.foreach { topicAndPartition =>
  handleStateChange(topicAndPartition.topic,
topicAndPartition.partition, targetState, leaderSelector, callbacks)
}

brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
{code}
# ReplicaStateMachine::handleStateChange(PartitionAndReplica, ReplicaState, Callbacks)
{code}
TRACE change.logger: Controller 17 epoch 269 changed state of replica 20 for partition [topic-xyz,134]
from OnlineReplica to OfflineReplica
{code}
{code}
// send the shrunk ISR state change request to all the remaining alive replicas of the partition
...
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ ==
replicaId),
    topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
{code}
# ControllerStateManager::sendRequestsToBrokers
{code}
TRACE change.logger: Controller 17 epoch 269 sending become-leader LeaderAndIsr request (Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269)
to broker 16 for partition [topic-xyz,134]
{code}
{code}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava,
leaders.asJava)
{code}

This particular case aside, since the consequences of this assumption being violated (either
by an existing bug or a future bug) are catastrophic, I guess it is wise to have the broker
code defensive against the corner cases that this assumption could be violated.

> Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest
at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write replication-offset-checkpoint
file and handleLeaderAndIsrRequest thread reading from it causes the highwatermark for some
partitions to be reset to 0. In the good case, this results the replica to truncate its entire
log to 0 and hence initiates fetching of terabytes of data from the lead broker, which sometimes
leads to hours of downtime. We observed the bad cases that the reset offset can propagate
to recovery-point-offset-checkpoint file, making a lead broker to truncate the file. This
seems to have the potential to lead to data loss if the truncation happens at both follower
and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably because
other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the allPartitions with
the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) allReplicas'
high watermark into replication-offset-checkpoint file {code}  def checkpointHighWatermarks()
{
>     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case
Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read the (now
partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic,
partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a subset
of partitions is critical in making this race condition manifest or not. But it is an important
detail since it clarifies that a solution based on not letting the highwatermark-checkpoint
thread jumping in the middle of processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the partitions
listed in the replication-offset-checkpoint (and perhaps recovery-point-offset-checkpoint
file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message