pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #1656: Add admin api to delete topic forcefully
Date Tue, 01 May 2018 21:53:28 GMT
merlimat closed pull request #1656: Add admin api to delete topic forcefully
URL: https://github.com/apache/incubator-pulsar/pull/1656
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 106e8c4e7c..e1014fd0bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,6 +38,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -285,6 +286,17 @@ protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction>
acti
         }
     }
 
+    protected void internalDeleteTopicForcefully(boolean authoritative) {
+        validateAdminOperationOnTopic(true);
+        Topic topic = getTopicReference(topicName);
+        try {
+            topic.deleteForcefully().get();
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete topic forcefully {}", clientAppId(), topicName,
e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalRevokePermissionsOnTopic(String role) {
         // This operation should be reading from zookeeper and it should be allowed without
having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -394,7 +406,7 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean
author
         return metadata;
     }
 
-    protected void internalDeletePartitionedTopic(boolean authoritative) {
+    protected void internalDeletePartitionedTopic(boolean authoritative, boolean force) {
         validateAdminAccessForTenant(topicName.getTenant());
         PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName,
authoritative);
         int numPartitions = partitionMetadata.partitions;
@@ -404,7 +416,7 @@ protected void internalDeletePartitionedTopic(boolean authoritative) {
             try {
                 for (int i = 0; i < numPartitions; i++) {
                     TopicName topicNamePartition = topicName.getPartition(i);
-                    pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString())
+                    pulsar().getAdminClient().persistentTopics().deleteAsync(topicNamePartition.toString(),
force)
                             .whenComplete((r, ex) -> {
                                 if (ex != null) {
                                     if (ex instanceof NotFoundException) {
@@ -465,6 +477,14 @@ protected void internalUnloadTopic(boolean authoritative) {
         unloadTopic(topicName, authoritative);
     }
 
+    protected void internalDeleteTopic(boolean authoritative, boolean force) {
+        if (force) {
+            internalDeleteTopicForcefully(authoritative);
+        } else {
+            internalDeleteTopic(authoritative);
+        }
+    }
+    
     protected void internalDeleteTopic(boolean authoritative) {
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 470ee45214..33f5bf9ffd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -181,9 +181,10 @@ public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property")
St
             @ApiResponse(code = 404, message = "Partitioned topic does not exist") })
     public void deletePartitionedTopic(@PathParam("property") String property, @PathParam("cluster")
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String
encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeletePartitionedTopic(authoritative);
+        internalDeletePartitionedTopic(authoritative, force);
     }
 
     @PUT
@@ -200,15 +201,17 @@ public void unloadTopic(@PathParam("property") String property, @PathParam("clus
 
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/{topic}")
-    @ApiOperation(hidden = true, value = "Delete a topic.", notes = "The topic cannot be
deleted if there's any active subscription or producer connected to the it.")
+    @ApiOperation(hidden = true, value = "Delete a topic.", notes = "The topic cannot be
deleted if delete is not forcefully and there's any active "
+            + "subscription or producer connected to the it. Force delete ignores connected
clients and deletes topic by explicitly closing them.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Topic has active producers/subscriptions")
})
     public void deleteTopic(@PathParam("property") String property, @PathParam("cluster")
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String
encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteTopic(authoritative);
+        internalDeleteTopic(authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e1a98b1a7..2256d88e02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -176,9 +176,10 @@ public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("tenant")
Stri
             @ApiResponse(code = 404, message = "Partitioned topic does not exist") })
     public void deletePartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace")
String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeletePartitionedTopic(authoritative);
+        internalDeletePartitionedTopic(authoritative, force);
     }
 
     @PUT
@@ -195,15 +196,16 @@ public void unloadTopic(@PathParam("tenant") String tenant, @PathParam("namespac
 
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}")
-    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if there's
any active subscription or producer connected to the it.")
+    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if delete
is not forcefully and there's any active "
+            + "subscription or producer connected to the it. Force delete ignores connected
clients and deletes topic by explicitly closing them.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Topic has active producers/subscriptions")
})
     public void deleteTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false")
boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeleteTopic(authoritative);
+        internalDeleteTopic(authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fbf3c65de3..c684520fe9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -132,4 +132,6 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
     CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 
     CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);
+
+    CompletableFuture<Void> deleteForcefully();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e45016c309..96cb5dee94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -410,10 +410,20 @@ void removeSubscription(String subscriptionName) {
 
     @Override
     public CompletableFuture<Void> delete() {
-        return delete(false);
+        return delete(false, false);
     }
 
-    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+    /**
+     * Forcefully close all producers/consumers/replicators and deletes the topic.
+     * 
+     * @return
+     */
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
+        return delete(false, true);
+    }
+    
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean
closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -423,36 +433,62 @@ void removeSubscription(String subscriptionName) {
                 deleteFuture.completeExceptionally(new TopicFencedException("Topic is already
fenced"));
                 return deleteFuture;
             }
-            if (USAGE_COUNT_UPDATER.get(this) == 0) {
-                isFenced = true;
 
+            CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
+            if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+                producers.forEach(producer -> futures.add(producer.disconnect()));
+                subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+                FutureUtil.waitForAll(futures).thenRun(() -> {
+                    closeClientFuture.complete(null);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Error closing clients", topic, ex);
+                    isFenced = false;
+                    closeClientFuture.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                closeClientFuture.complete(null);
+            }
 
-                if (failIfHasSubscriptions) {
-                    if (!subscriptions.isEmpty()) {
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(new TopicBusyException("Topic
has subscriptions"));
-                        return deleteFuture;
-                    }
-                } else {
-                    subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
-                }
+            closeClientFuture.thenAccept(delete -> {
+
+                if (USAGE_COUNT_UPDATER.get(this) == 0) {
+                    isFenced = true;
+
+                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Error deleting topic", topic, ex);
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(ex);
+                    if (failIfHasSubscriptions) {
+                        if (!subscriptions.isEmpty()) {
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(new TopicBusyException("Topic
has subscriptions"));
+                            return;
+                        }
                     } else {
-                        brokerService.removeTopicFromCache(topic);
-                        log.info("[{}] Topic deleted", topic);
-                        deleteFuture.complete(null);
+                        subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
                     }
-                });
-            } else {
-                deleteFuture.completeExceptionally(new TopicBusyException(
-                        "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
-            }
+
+                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+                        if (ex != null) {
+                            log.error("[{}] Error deleting topic", topic, ex);
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(ex);
+                        } else {
+                            brokerService.removeTopicFromCache(topic);
+                            log.info("[{}] Topic deleted", topic);
+                            deleteFuture.complete(null);
+                        }
+                    });
+                } else {
+                    deleteFuture.completeExceptionally(new TopicBusyException(
+                            "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
+                }
+            }).exceptionally(ex -> {
+                deleteFuture.completeExceptionally(
+                        new TopicBusyException("Failed to close clients before deleting topic."));
+                return null;
+            });
         } finally {
             lock.writeLock().unlock();
         }
@@ -890,7 +926,7 @@ public void checkGC(int gcIntervalInSeconds) {
                                 gcIntervalInSeconds);
                     }
 
-                    stopReplProducers().thenCompose(v -> delete(true))
+                    stopReplProducers().thenCompose(v -> delete(true, false))
                             .thenRun(() -> log.info("[{}] Topic deleted successfully due
to inactivity", topic))
                             .exceptionally(e -> {
                                 if (e.getCause() instanceof TopicBusyException) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c59bd9839..c3db95732e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -673,7 +673,8 @@ void removeSubscription(String subscriptionName) {
      * 
      * @return
      */
-    private CompletableFuture<Void> deleteForcefully() {
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
         return delete(false, true);
     }
     
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index adf5ce7297..4fb54dc689 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -58,11 +59,6 @@ void shutdown() throws Exception {
         super.shutdown();
     }
 
-    @DataProvider(name = "partitionedTopic")
-    public Object[][] partitionedTopicProvider() {
-        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
-    }
-
     /**
      * If local cluster is removed from the global namespace then all topics under that namespace
should be deleted from
      * the cluster.
@@ -105,6 +101,33 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception
{
         client2.close();
     }
 
+    @Test
+    public void testForcefullyTopicDeletion() throws Exception {
+        log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
+
+        final String namespace = "pulsar/removeClusterTest";
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+
+        final String topicName = "persistent://" + namespace + "/topic";
+
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
+                .build();
+
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
+                .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        producer1.close();
+
+        admin1.persistentTopics().delete(topicName, true);
+
+        MockedPulsarServiceBaseTest
+                .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName),
5, 150);
+
+        Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+
+        client1.close();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index b65ffd09e1..6fee2ff83d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -18,10 +18,5 @@
  */
 package org.apache.pulsar.client.admin;
 
-/**
- * @deprecated since 2.0. See {@link Topics}
- */
 @Deprecated
-public interface PersistentTopics extends Topics {
-
-}
+public interface PersistentTopics extends Topics {}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2d0d5e5dbc..2d629c3b4a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -283,6 +283,22 @@
      *
      * @param topic
      *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *            
+     * @throws PulsarAdminException
+     */
+    void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException;
+    
+    /**
+     * Delete a partitioned topic.
+     * <p>
+     * It will also delete all the partitions of the topic if it exists.
+     * <p>
+     *
+     * @param topic
+     *            Topic name
+     *            
      * @throws PulsarAdminException
      */
     void deletePartitionedTopic(String topic) throws PulsarAdminException;
@@ -295,10 +311,36 @@
      *
      * @param topic
      *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *            
      * @return a future that can be used to track when the partitioned topic is deleted
      */
-    CompletableFuture<Void> deletePartitionedTopicAsync(String topic);
+    CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force);
+    
+    /**
+     * Delete a topic.
+     * <p>
+     * Delete a topic. The topic cannot be deleted if force flag is disable and there's any
active subscription or producer connected to the it. Force flag deletes topic forcefully by
closing all active producers and consumers.
+     * <p>
+     *
+     * @param topic
+     *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PreconditionFailedException
+     *             Topic has active subscriptions or producers
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void delete(String topic, boolean force) throws PulsarAdminException;
 
+    
     /**
      * Delete a topic.
      * <p>
@@ -318,20 +360,23 @@
      *             Unexpected error
      */
     void delete(String topic) throws PulsarAdminException;
-
+    
     /**
      * Delete a topic asynchronously.
      * <p>
-     * Delete a topic asynchronously. The topic cannot be deleted if there's any active subscription
or producer
-     * connected to the it.
+     * Delete a topic asynchronously. The topic cannot be deleted if force flag is disable
and there's any active
+     * subscription or producer connected to the it. Force flag deletes topic forcefully
by closing all active producers
+     * and consumers.
      * <p>
      *
      * @param topic
      *            topic name
-     *
+     * @param force
+     *            Delete topic forcefully
+     * 
      * @return a future that can be used to track when the topic is deleted
      */
-    CompletableFuture<Void> deleteAsync(String topic);
+    CompletableFuture<Void> deleteAsync(String topic, boolean force);
 
     /**
      * Unload a topic.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8749d6ae6f..4ef20d83d2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -75,6 +75,8 @@
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicStats;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -270,8 +272,13 @@ public void failed(Throwable throwable) {
 
     @Override
     public void deletePartitionedTopic(String topic) throws PulsarAdminException {
+        deletePartitionedTopic(topic, false);
+    }
+
+    @Override
+    public void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException
{
         try {
-            deletePartitionedTopicAsync(topic).get();
+            deletePartitionedTopicAsync(topic, force).get();
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -281,16 +288,22 @@ public void deletePartitionedTopic(String topic) throws PulsarAdminException
{
     }
 
     @Override
-    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
+    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean
force) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitions");
+        path = path.queryParam("force", force);
         return asyncDeleteRequest(path);
     }
 
     @Override
     public void delete(String topic) throws PulsarAdminException {
+        delete(topic, false);
+    }
+
+    @Override
+    public void delete(String topic, boolean force) throws PulsarAdminException {
         try {
-            deleteAsync(topic).get();
+            deleteAsync(topic, force).get();
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -300,9 +313,10 @@ public void delete(String topic) throws PulsarAdminException {
     }
 
     @Override
-    public CompletableFuture<Void> deleteAsync(String topic) {
+    public CompletableFuture<Void> deleteAsync(String topic, boolean force) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn);
+        path = path.queryParam("force", Boolean.toString(force));
         return asyncDeleteRequest(path);
     }
 
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9f10be3556..a5b1581164 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -505,7 +505,7 @@ void topics() throws Exception {
         CmdTopics cmdTopics = new CmdTopics(admin);
 
         cmdTopics.run(split("delete persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
 
         cmdTopics.run(split("unload persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
@@ -556,7 +556,7 @@ void topics() throws Exception {
         verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
 
         cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
         verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
@@ -588,7 +588,7 @@ void persistentTopics() throws Exception {
         CmdPersistentTopics topics = new CmdPersistentTopics(admin);
 
         topics.run(split("delete persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
 
         topics.run(split("unload persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
@@ -639,7 +639,7 @@ void persistentTopics() throws Exception {
         verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
 
         topics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
 
         topics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
         verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 5b652561a6..266ebc4fab 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -239,11 +239,14 @@ void run() throws Exception {
 
         @Parameter(description = "persistent://property/cluster/namespace/topic\n", required
= true)
         private java.util.List<String> params;
+        
+        @Parameter(names = "--force", description = "Close all producer/consumer/replicator
and delete topic forcefully")
+        private boolean force = false;
 
         @Override
         void run() throws Exception {
             String persistentTopic = validatePersistentTopic(params);
-            persistentTopics.deletePartitionedTopic(persistentTopic);
+            persistentTopics.deletePartitionedTopic(persistentTopic, force);
         }
     }
 
@@ -253,10 +256,13 @@ void run() throws Exception {
         @Parameter(description = "persistent://property/cluster/namespace/topic\n", required
= true)
         private java.util.List<String> params;
 
+        @Parameter(names = "--force", description = "Close all producer/consumer/replicator
and delete topic forcefully")
+        private boolean force = false;
+        
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            persistentTopics.delete(persistentTopic);
+            persistentTopics.delete(persistentTopic, force);
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 04d96902fa..2d486aa19d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -246,10 +246,14 @@ void run() throws Exception {
         @Parameter(description = "persistent://property/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Close all producer/consumer/replicator and delete
topic forcefully")
+        private boolean force = false;
+
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            topics.deletePartitionedTopic(topic);
+            topics.deletePartitionedTopic(topic, force);
         }
     }
 
@@ -259,10 +263,14 @@ void run() throws Exception {
         @Parameter(description = "persistent://property/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Close all producer/consumer/replicator and delete
topic forcefully")
+        private boolean force = false;
+
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
-            topics.delete(topic);
+            topics.delete(topic, force);
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message