kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-6302) Topic can not be recreated after it is deleted
Date Tue, 05 Dec 2017 19:23:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279074#comment-16279074
] 

Matthias J. Sax edited comment on KAFKA-6302 at 12/5/17 7:22 PM:
-----------------------------------------------------------------

Note the JavaDoc:

{quote}
It may take several seconds after AdminClient#deleteTopics returns success for all the brokers
to become aware that the topics are gone.
{quote}

Thus, if your create request goes to a different broker than your delete request, the broker
getting the create request still might have stale metadata and thinks the topic is still there.
The sentence actually means "after the future completes".


was (Author: mjsax):
Note the JavaDoc:

{quote}
It may take several seconds after AdminClient#deleteTopics returns success for all the brokers
to become aware that the topics are gone.
{quote}

Thus, if your create request goes to a different broker than your delete request, the broker
getting the create request still might have stale metadata and thinks the topic is still there.

> Topic can not be recreated after it is deleted
> ----------------------------------------------
>
>                 Key: KAFKA-6302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6302
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin, clients
>    Affects Versions: 1.0.0
>            Reporter: kic
>
> I use an embedded kafka for unit test. My application relies on the ability to recreate
topics programmatically. Currently it is not possible to re-create a topic after it has been
deleted.
> {code}
> // needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 'org.apache.kafka:kafka-clients:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
>   val props = new Properties()
>   val testTopic = "test-topic"
>   "The admin client" should "be able to create, delete and re-create topics" in {
>     props.setProperty("bootstrap.servers", "localhost:10001")
>     props.setProperty("delete.enable.topic", "true")
>     props.setProperty("group.id", "test-client")
>     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
>     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
>     props.setProperty("clinet.id", "test-client")
>     props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
>     props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     EmbeddedKafaJavaWrapper.start(10001, 10002, props)
>     try {
>       implicit val admin = AdminClient.create(props)
>       // create topic and confirm it exists
>       createTopic(testTopic)
>       val topics = listTopics()
>       info(s"topics: $topics")
>       topics should contain(testTopic)
>       // now we should be able to send something to this topic
>       // TODO create producer and send something
>       // delete topic
>       deleteTopic(testTopic)
>       listTopics() shouldNot contain(testTopic)
>       // recreate topic
>       createTopic(testTopic)
>       // listTopics() should contain(testTopic)
>       // and finally consume from the topic and expect to get 0 entries
>       // TODO create consumer and poll once
>     } finally {
>       EmbeddedKafaJavaWrapper.stop()
>     }
>   }
>   def listTopics()(implicit admin: AdminClient) =
>     admin.listTopics().names().get()
>   def createTopic(topic: String)(implicit admin: AdminClient) =
>     admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
>   def deleteTopic(topic: String)(implicit admin: AdminClient) =
>     admin.deleteTopics(Seq("test-topic").asJava).all().get()
> }
> {code}
> Btw, what happens to connected producers/consumers when I delete a topic? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message