kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Neha Narkhede (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-330) Add delete topic support
Date Wed, 03 Apr 2013 19:01:17 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621189#comment-13621189
] 

Neha Narkhede commented on KAFKA-330:
-------------------------------------

Thanks for the patch! Some suggestions -

1. In controller, it is important to not let a long delete topics operation block critical
state changes like elect leader. To make this possible, relinquish the lock between the deletes
for individual topics
2. If you do relinquish the lock like I suggested above, you need to now take care of avoid
leader elections for partitions being deleted
3. Since now you will handle topic deletion for individual topics, it might be worth changing
the zookeeper structure for delete topics so status on individual topic deletes gets reported
accordingly. One way to do this is to introduce a path to indicate that the admin tool has
initiated delete operation for some topics (/admin/delete_topics_updated), and create child
nodes under /admin/delete_topics, one per topic. As you complete individual topic deletion,
you delete the /admin/delete_topics/<topic> path. Admin tool creates the /admin/delete_topics/<topic>
path and updates /admin/delete_topics_updated. Controller only registers a data change watcher
on /admin/delete_topics_updated. When this watcher fires, it reads the children of /admin/delete_topics
and starts topic deletion. 
4. On startup/failover, the controller registers a data change watch on /admin/delete_topics_updated,
and then reads the list of topics under /admin/delete_topics.
5. Admin tool never errors out since it just adds to the list of deleted topics

On the broker side, there are a few things to be done correctly -

1. KafkaApis
After receiving stop replica request, request handler should reject produce/fetch requests
for partitions to be deleted by returning PartitionBeingDeleted error code. Once the delete
is complete, the partition can be removed from this list. In that case, it will return UnknownTopicOrPartition
error code

2. ReplicaManager
2.1 Remove unused variable leaderBrokerId from makeFollower()
2.2 Fix the comment inside recordFollowerPosition to say "partition hasn't been created or
has been deleted"
2.3 Let the partition do the delete() operation. This will ensure that the leaderAndIsrUpdateLock
is acquired for the duration of the delete. This will avoid interleaving leader/isr requests
with stop replica requests and simplify the reasoning of log truncate/highwatermark update
operations

3. Partition - Introduce a new delete() API that works like this -
1. Acquire leaderIsrUpdateLock so that create log does not interfere with delete log. Also
remove/add fetcher does not interfere with delete log.
2. Removes fetcher for the partition
3. Invoke delete() on the log. Be careful how current read/write requests will be affected.

4. LogManager
1. When deleteLogs() is invoked, remove logs from allLogs. This will prevent flush being invoked
on the log to be deleted.
2. Invoke log.delete() on every individual log.
3. log.markDeletedWhile(_ => true) will leave an extra rolled over segment in the in memory
segment list

5. Log
1. Log delete should acquire "lock" to prevent interleaving with append/truncate/roll/flush
etc
Following steps need to be taken during log.delete()
2. Invoke log.close()
3. Invoke segmentList.delete(), where SegmentList.delete() only does contents.set(new Array[T](0))
4. Invoke segment.delete()
5. Update a flag deleted = true

Few questions to be thought about -

- Are any changes required to roll(). If deleted flag is true, then skip roll().
- Are any changes required to markDeletedWhile(). Same as roll. If deleted flag is true, skip
- Are any changes required to flush() ? This can be invoked either during roll or by append.
It cannot be invoked by the flush thread since that is disabled for logs to be deleted. This
needs to be handled by using lastOption. 
- See what to do with truncateTo(). This is used during make follower in Partition. This won't
interfere with delete since Partition's delete acquires the leaderIsrUpdateLock. Another place
that uses truncateTo() is the handleOffsetOutOfRange on the follower. This won't interleave
since the replica fetcher was already removed before attempting to delete the log
- See what to do with truncateAndStartWithNewOffset(). This won't interleave with delete log
since the replica fetcher was already removed before attempting to delete the log
- What if the broker is writing from the log when stop replica is deleting it ? Since log.delete()
acquires the "lock", either append starts before or after the delete. If it starts after,
then the changed mentioned in #7 and #9 should be made. 
- What if the broker is about to write to the log that is under deletion ? Same as above
- What if the broker is reading from the log that is being deleted ? It will get a ClosedChannelException,
I think. This needs to be conformed. The test can run a consumer that is consuming data from
beginning of a log and you can invoke delete topic. 
- What if the broker about to read from the log that is being deleted ? It will try reading
from a file channel that is closed. This will run into ClosedChannelException. Should we catch
ClosedChannelException and log an appropriate error and send PartitionDeleted error code when
that happens ?
- What happens to the partition entry from the high watermark file when it is being deleted
? When partition is removed from allPartitions, the next high watermark checkpoint removes
the partition's entry from the high watermark file.
- What happens to requests in the purgatory when partition has been deleted ? When a partition
has been removed from allPartitions, then the requests in the purgatory will send UnknownTopicOrPartitionCode
back to the client.

6. Log.read()
val first = view.head.start
This needs to change to headOption. Return empty message set when this returns None

7. Log.flush()
segments.view.last.flush()
Need to change the above to segments.view.lastOption. If that returns None, then return without
flushing. 

8. SegmentList.delete()
contents.set(new Array[T](0))

9. Log.append()
Fix this to use lastOption - val segment = maybeRoll(segments.view.last)
If None, then return (-2,-2) to signify that the log was deleted





                
> Add delete topic support 
> -------------------------
>
>                 Key: KAFKA-330
>                 URL: https://issues.apache.org/jira/browse/KAFKA-330
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Swapnil Ghike
>            Priority: Blocker
>              Labels: features, kafka-0.8, p2, project
>             Fix For: 0.8
>
>         Attachments: kafka-330-v1.patch
>
>
> One proposal of this API is here - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Deletetopic

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message