kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "kic (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6302) Topic can not be recreated after it is deleted
Date Sun, 03 Dec 2017 17:06:01 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

kic updated KAFKA-6302:
-----------------------
    Description: 
I use an embedded kafka for unit test. My application relies on the ability to recreate topics
programmatically. Currently it is not possible to re-create a topic after it has been deleted.

{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 'org.apache.kafka:kafka-clients:1.0.0'
package kic.kafka.embedded

import java.util.Properties

import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._

import scala.collection.JavaConverters._

class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
  val props = new Properties()
  val testTopic = "test-topic"

  "The admin client" should "be able to create, delete and re-create topics" in {
    props.setProperty("bootstrap.servers", "localhost:10001")
    props.setProperty("delete.enable.topic", "true")
    props.setProperty("group.id", "test-client")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("clinet.id", "test-client")
    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    EmbeddedKafaJavaWrapper.start(10001, 10002, props)

    try {
      implicit val admin = AdminClient.create(props)

      // create topic and confirm it exists
      createTopic(testTopic)
      val topics = listTopics()
      info(s"topics: $topics")
      topics should contain(testTopic)

      // now we should be able to send something to this topic
      // TODO create producer and send something

      // delete topic
      deleteTopic(testTopic)
      listTopics() shouldNot contain(testTopic)

      // recreate topic
      createTopic(testTopic)
      // listTopics() should contain(testTopic)

      // and finally consume from the topic and expect to get 0 entries
      // TODO create consumer and poll once
    } finally {
      EmbeddedKafaJavaWrapper.stop()
    }

  }

  def listTopics()(implicit admin: AdminClient) =
    admin.listTopics().names().get()

  def createTopic(topic: String)(implicit admin: AdminClient) =
    admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)

  def deleteTopic(topic: String)(implicit admin: AdminClient) =
    admin.deleteTopics(Seq("test-topic").asJava).all().get()

}
{code}

Btw, what happens to connected producers/consumers when I delete a topic? 

  was:
I use an embedded kafka for unit test. My application relies on the ability to recreate topics
programmatically. Currently it is not possible to re-create a topic after it has been deleted.

{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
package kic.kafka.embedded

import java.util.Properties

import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._

import scala.collection.JavaConverters._

class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
  val props = new Properties()
  val testTopic = "test-topic"

  "The admin client" should "be able to create, delete and re-create topics" in {
    props.setProperty("bootstrap.servers", "localhost:10001")
    props.setProperty("delete.enable.topic", "true")
    props.setProperty("group.id", "test-client")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("clinet.id", "test-client")
    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    EmbeddedKafaJavaWrapper.start(10001, 10002, props)

    try {
      implicit val admin = AdminClient.create(props)

      // create topic and confirm it exists
      createTopic(testTopic)
      val topics = listTopics()
      info(s"topics: $topics")
      topics should contain(testTopic)

      // now we should be able to send something to this topic
      // TODO create producer and send something

      // delete topic
      deleteTopic(testTopic)
      listTopics() shouldNot contain(testTopic)

      // recreate topic
      createTopic(testTopic)
      // listTopics() should contain(testTopic)

      // and finally consume from the topic and expect to get 0 entries
      // TODO create consumer and poll once
    } finally {
      EmbeddedKafaJavaWrapper.stop()
    }

  }

  def listTopics()(implicit admin: AdminClient) =
    admin.listTopics().names().get()

  def createTopic(topic: String)(implicit admin: AdminClient) =
    admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)

  def deleteTopic(topic: String)(implicit admin: AdminClient) =
    admin.deleteTopics(Seq("test-topic").asJava).all().get()

}
{code}

Btw, what happens to connected producers/consumers when I delete a topic? 


> Topic can not be recreated after it is deleted
> ----------------------------------------------
>
>                 Key: KAFKA-6302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6302
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin, clients
>    Affects Versions: 1.0.0
>            Reporter: kic
>
> I use an embedded kafka for unit test. My application relies on the ability to recreate
topics programmatically. Currently it is not possible to re-create a topic after it has been
deleted.
> {code}
> // needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0' and 'org.apache.kafka:kafka-clients:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
>   val props = new Properties()
>   val testTopic = "test-topic"
>   "The admin client" should "be able to create, delete and re-create topics" in {
>     props.setProperty("bootstrap.servers", "localhost:10001")
>     props.setProperty("delete.enable.topic", "true")
>     props.setProperty("group.id", "test-client")
>     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
>     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
>     props.setProperty("clinet.id", "test-client")
>     props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
>     props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
>     EmbeddedKafaJavaWrapper.start(10001, 10002, props)
>     try {
>       implicit val admin = AdminClient.create(props)
>       // create topic and confirm it exists
>       createTopic(testTopic)
>       val topics = listTopics()
>       info(s"topics: $topics")
>       topics should contain(testTopic)
>       // now we should be able to send something to this topic
>       // TODO create producer and send something
>       // delete topic
>       deleteTopic(testTopic)
>       listTopics() shouldNot contain(testTopic)
>       // recreate topic
>       createTopic(testTopic)
>       // listTopics() should contain(testTopic)
>       // and finally consume from the topic and expect to get 0 entries
>       // TODO create consumer and poll once
>     } finally {
>       EmbeddedKafaJavaWrapper.stop()
>     }
>   }
>   def listTopics()(implicit admin: AdminClient) =
>     admin.listTopics().names().get()
>   def createTopic(topic: String)(implicit admin: AdminClient) =
>     admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
>   def deleteTopic(topic: String)(implicit admin: AdminClient) =
>     admin.deleteTopics(Seq("test-topic").asJava).all().get()
> }
> {code}
> Btw, what happens to connected producers/consumers when I delete a topic? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message