kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Swapnil Ghike (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-330) Add delete topic support
Date Wed, 03 Apr 2013 10:27:15 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13620816#comment-13620816
] 

Swapnil Ghike commented on KAFKA-330:
-------------------------------------

Patch v1 attached. 

How topics are deleted: 
1. The DeleteTopicsCommand writes to /admin/delete_topics in zk and exits.
2. The DeleteTopicsCommand complains if a topic that is being deleted is absent in zookeeper.
It won't run even if at least one of the topics specified is actually present in the zookeeper.

3. A DeleteTopicsListener is triggered in controller. It moves the replicas and partitions
to Offline->NonExistent states, deletes the partitions from controller's memory, sends
StopReplicaRequests with deletePartition=true.
4. Brokers on receiving the StopReplicaRequest remove the partition from their own memory
and delete the logs.
5. If all the partitions were successfully deleted, the topic path is deleted from zookeeper.
6. Controller always deletes the admin/delete_topics path at the end. It checks in removeFromTopicsBeingDeleted()
whether each topic has been deleted from zookeeper, at which point it declares victory or
logs a warning of shame.


How to validate that the topics have been deleted:
1. Rerun the DeleteTopicsCommand, it should complain that the topics are absent in zookeeper.


Special comments:
A. TopicChangeListener:
1. I think that we should not handle deleted topics here. We should rather modify the controller's
memory in NonExistentPartition state change. This is because the controller will release its
lock between DeleteTopics listener and TopicChangeListener, we should want the controller's
memory to be up-to-date when the lock is released with the completion of DeleteTopics listener.
2. Probably there is no need to add the new topics' partititon-replica assignment to controllerContext.partitionReplicaAssignment,
because onNewTopicCreation() will do that. I put a TODO there. Please correct if I am wrong.


Handling failures:

A. What happens when controller fails:
1. Before OfflineReplica state change: New controller context will be initialized and initializeAndMaybeTriggerTopicDeletion()
will delete the topics.
2. After OfflineReplica state change and before OfflinePartition state change: Initialization
of controller context will re-insert replicas into ISR, and initializeAndMaybeTriggerTopicDeletion()
will delete the topics.
3. After OfflinePartition state change and before NonExistentReplica state change: Ditto as
2.
4. After NonExistentReplica state change and before NonExistentPartition state change: The
replicas that were deleted will be restarted on individual brokers, then the topics will be
deleted.
5. After NonExistentPartition state change and before deleting topics from zk: Ditto as 3.
(The NonExistentPartition state change in partition state machine currently does not delete
the partitions from zk, it assumes that the controller will delete them, which is similar
to what we do for some other state changes as of now).
I think the deletion should proceed smoothly even if the controller fails over in the middle
of 1,2,3,4 or 5 above.

B. What happens if a topic is deleted when a broker that has a replica of that topic's partition
is down? =>
i. When the broker comes back up and the topic has been deleted from zk, the controller can
only tell the broker which topics are currently alive. The broker should delete the dead logs
when it receives the first leaderAndIsr request. This can be done just before starting the
hw checkpointing thread. 
ii. This will also be useful in replica reassignment for a partition. When the replica reassignment
algorithms sends a StopReplica request with delete=true, the receiving broker could be down.
After the broker is back up, it will realize that it needs to delete the logs for certain
partitions that are no longer assigned to it.


Possible corner cases:
1. What happens to hw checkpointing for deleted partitions? => checkpointHighWatermarks()
reads the current allPartitions() on a broker and writes the hw. So the hw for deleted partitions
will disappear.

2. What happens to Produce/Fetch requests in purgatory? => 
i. After the topics have been deleted, produce requests in purgatory will expire because there
will no fetchers, fetch requests will expire because producer requests would fail in appendToLocalLog()
and no more data will be appended.
ii. Expiration of producer requests is harmless. 
iii. Expiration of fetch requests will try to send whatever data is remaining, but it will
not be able to send any data because the replica would be dead. We could think of forcing
the delayed fetch requests to expire before the replica is deleted and remove the expired
requests from the delayed queue, but that would probably require synchronizing on the delayed
queue. Thoughts?


Other unrelated changes: 
A. ReplicaStateMachine
1. Moved NonExistentReplica to the bottom of cases to maintain the same order as PartitionStateMachine.
2. Deleted a redundant replicaState.put(replica,OnlineReplica) statement.
3. Even if a replica is not in the ISR, it should always be moved to OfflineReplica state.

B. Utils.scala:
1. Bug fix in seqToJson().  

Testing done:
1. Bring up one broker, create topics, delete topics, verify zk, verify that logs are gone.

2. Bring up two brokers, create topics, delete topics, verify zk, verify that logs are gone
from both brokers.
3. Repeat the above 1 and 2 with more than one partition per topic.
4. Write to admin/delete_paths, bring up the controller, watch the topic and logs get deleted.
5. Bring up two brokers, create two topics with replication factor of two, verify that the
logs get created. Now, shut down broker 1 and delete a topic. Verify that the topic disappears
from zk and logs of broker 0. Bring up broker 1, verify that the topic disappears from the
logs of broker 1 because controller (broker 0) will send leaderAndIsr request for the remaining
topic.
6. Validate error inputs.
7. Validate that the tool prints error when a non-existent topic is being deleted.

Is it ok if I write unit tests after this patch is checked in, in case there are modifications?
                
> Add delete topic support 
> -------------------------
>
>                 Key: KAFKA-330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-330
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Swapnil Ghike
>            Priority: Blocker
>              Labels: features, kafka-0.8, p2, project
>
> One proposal of this API is here - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message