pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: [Issue #3436][pulsar-broker] Creating REST Endpoint for non-partitioned topic creation (#3625)
Date Mon, 04 Mar 2019 01:18:56 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 548c726  [Issue #3436][pulsar-broker] Creating REST Endpoint for non-partitioned
topic creation (#3625)
548c726 is described below

commit 548c726b8e7f0e163b1132c9ada6ba83d6bec572
Author: Richard Yu <yohan.richard.yu@gmail.com>
AuthorDate: Sun Mar 3 17:18:50 2019 -0800

    [Issue #3436][pulsar-broker] Creating REST Endpoint for non-partitioned topic creation
(#3625)
    
    We are adding a REST endpoint which allows the admin to create non-partitioned topics
    through PersistentTopics.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 12 ++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 16 +++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  5 +++-
 .../org/apache/pulsar/client/admin/Topics.java     | 19 +++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 19 +++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 14 +++++++++++
 site2/docs/admin-api-partitioned-topics.md         | 27 ++++++++++++++++++++++
 site2/docs/reference-pulsar-admin.md               | 10 +++++++-
 9 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index be5badb..2a117ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -398,6 +398,18 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalCreateNonPartitionedTopic(boolean authoritative) {
+    	validateAdminAccessForTenant(topicName.getTenant());
+    	
+    	try {
+    		getOrCreateTopic(topicName);
+    		log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
+    	} catch (Exception e) {
+    		log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName,
e);
+    		throw new RestException(e);
+    	}
+    }
+
     /**
      * It updates number of partitions of an existing non-global partitioned topic. It requires
partitioned-topic to
      * already exist and number of new partitions must be greater than existing number of
partitions. Decrementing
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 30cb4b3..a9c9369 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -140,6 +140,22 @@ public class PersistentTopics extends PersistentTopicsBase {
         internalCreatePartitionedTopic(numPartitions, authoritative);
     }
 
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}")
+    @ApiOperation(value="Create a non-partitioned topic.", notes = "This is the only REST
endpoint from which non-partitioned topics could be created.")
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 409, message = "Partitioned topic already exist"),
+        @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace
does not have any clusters configured"),
+        @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace")
String namespace,
+            @PathParam("topic") @Encoded String encodedTopic, 
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateGlobalNamespaceOwnership(tenant,namespace);
+        internalCreateNonPartitionedTopic(authoritative);
+    }
+
     /**
      * It updates number of partitions of an existing non-global partitioned topic. It requires
partitioned-topic to be
      * already exist and number of new partitions must be greater than existing number of
partitions. Decrementing
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2d23f50..fdefd46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -117,7 +117,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest
{
     }
 
     @Test
-    public void testGetSubscriptionsWithAutoTopicCreationDisabled() {
+    public void testNonPartitionedTopics() {
     	pulsar.getConfiguration().setAllowAutoTopicCreation(false);
     	final String nonPartitionTopic = "non-partitioned-topic";
     	persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test",
true, (MessageIdImpl) MessageId.latest);
@@ -126,5 +126,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest
{
     	} catch (RestException exc) {
     		Assert.assertTrue(exc.getMessage().contains("zero partitions"));
     	}
+    	final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
+    	persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2,
true);
+    	Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant, testNamespace,
nonPartitionTopic, true).partitions, 0);
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 9c442c7..691d0ab 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -203,6 +203,18 @@ public interface Topics {
     void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException;
 
     /**
+     * Create a non-partitioned topic.
+     * 
+     * <p>
+     * Create a non-partitioned topic. 
+     * <p>
+     * 
+     * @param topic Topic name
+     * @throws PulsarAdminException
+     */
+    void createNonPartitionedTopic(String topic) throws PulsarAdminException;
+
+    /**
      * Create a partitioned topic asynchronously.
      * <p>
      * Create a partitioned topic asynchronously. It needs to be called before creating a
producer for a partitioned
@@ -218,6 +230,13 @@ public interface Topics {
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);
 
     /**
+     * Create a non-partitioned topic asynchronously.
+     * 
+     * @param topic Topic name
+     */
+    CompletableFuture<Void> createNonPartitionedTopicAsync(String topic);
+
+    /**
      * Update number of partitions of a non-global partitioned topic.
      * <p>
      * It requires partitioned-topic to be already exist and number of new partitions must
be greater than existing
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 763d221..91991c6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -207,6 +207,25 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
     }
 
     @Override
+    public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
+    	try {
+    		createNonPartitionedTopicAsync(topic).get();
+    	} catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e.getCause());
+        }
+    }
+    
+    @Override
+    public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic){
+    	TopicName tn = validateTopic(topic);
+    	WebTarget path = topicPath(tn);
+    	return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+    
+    @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions)
{
         checkArgument(numPartitions > 1, "Number of partitions should be more than 1");
         TopicName tn = validateTopic(topic);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d4c2cd3..e852786 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -606,6 +606,9 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions
32"));
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32);
 
+        cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
         verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index ceaebf4..9fdce5e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -82,6 +82,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions());
 
         jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd());
+        jcommander.addCommand("create", new CreateNonPartitionedCmd());
         jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
         jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
 
@@ -213,6 +214,19 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Create a non-partitioned topic.")
+    private class CreateNonPartitionedCmd extends CliCommand {
+    	
+    	@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
+    	private java.util.List<String> params;
+    	
+    	@Override
+    	void run() throws Exception {
+    		String topic = validateTopicName(params);
+    		topics.createNonPartitionedTopic(topic);
+    	}
+    }
+    
     @Parameters(commandDescription = "Update existing non-global partitioned topic. \n"
             + "\t\tNew updating number of partitions must be greater than existing number
of partitions.")
     private class UpdatePartitionedCmd extends CliCommand {
diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md
index 34670bb..d86a256 100644
--- a/site2/docs/admin-api-partitioned-topics.md
+++ b/site2/docs/admin-api-partitioned-topics.md
@@ -44,6 +44,33 @@ int numPartitions = 4;
 admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
 ```
 
+## Nonpartitioned topics resources
+
+### Create
+
+Nonpartitioned topics in Pulsar must be explicitly created if allowAutoTopicCreation or createIfMissing
is disabled.
+When creating a non-partitioned topic, you need to provide a topic name.
+
+#### pulsar-admin
+
+You can create non-partitioned topics using the [`create`](reference-pulsar-admin.md#create)
+command and specifying the topic name as an argument. This is an example command:
+
+```shell
+$ bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic
+``` 
+
+#### REST API
+
+{@inject: endpoint|PUT|admin/v2/persistent/:tenant/:namespace/:topic|operation/createNonPartitionedTopic}
+
+#### Java
+
+```java
+String topicName = "persistent://my-tenant/my-namespace/my-topic";
+admin.topics().createNonPartitionedTopic(topicName);
+```
+
 ### Get metadata
 
 Partitioned topics have metadata associated with them that you can fetch as a JSON object.
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index 65356ae..62a8d38 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1684,6 +1684,7 @@ Subcommands
 * `offload-status`
 * `create-partitioned-topic`
 * `delete-partitioned-topic`
+* `create`
 * `get-partitioned-topic-metadata`
 * `update-partitioned-topic`
 * `list`
@@ -1773,7 +1774,6 @@ Options
 |---|---|---|
 |`-p`, `--partitions`|The number of partitions for the topic|0|
 
-
 ### `delete-partitioned-topic`
 Delete a partitioned topic. This will also delete all the partitions of the topic if they
exist.
 
@@ -1782,6 +1782,14 @@ Usage
 $ pulsar-admin topics delete-partitioned-topic {persistent|non-persistent}
 ```
 
+### `create`
+Creates a non-partitioned topic. A non-partitioned topic must explicitly be created by the
user if allowAutoTopicCreation or createIfMissing is disabled.
+
+Usage
+```bash
+$ pulsar-admin topics create {persistent|non-persistent}://tenant/namespace/topic
+```
+
 ### `get-partitioned-topic-metadata`
 Get the partitioned topic metadata. If the topic is not created or is a non-partitioned topic,
this will return an empty topic with zero partitions.
 


Mime
View raw message