pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #2972: Introduce `Client.getPartitionsForTopic()`
Date Thu, 15 Nov 2018 01:51:02 GMT
merlimat closed pull request #2972: Introduce `Client.getPartitionsForTopic()`
URL: https://github.com/apache/pulsar/pull/2972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 360f0bc550..02b46d263c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -22,6 +22,9 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -577,4 +580,24 @@ private void receiveAsync(Consumer<byte[]> consumer, int totalMessage,
int curre
         }
     }
 
+    @Test
+    public void testGetPartitionsForTopic() throws Exception {
+        int numPartitions = 4;
+        String topic = "persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis();
+
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+
+        List<String> expectedPartitions = new ArrayList<>();
+        for (int i =0; i < numPartitions; i++) {
+            expectedPartitions.add(topic + "-partition-" + i);
+        }
+
+        assertEquals(pulsarClient.getPartitionsForTopic(topic).join(), expectedPartitions);
+
+        String nonPartitionedTopic = "persistent://my-property/my-ns/my-non-partitionedtopic1";
+
+        assertEquals(pulsarClient.getPartitionsForTopic(nonPartitionedTopic).join(),
+                Collections.singletonList(nonPartitionedTopic));
+    }
+
 }
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index f30732853e..c29db9a6a6 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -42,6 +42,10 @@ if (NOT CXX_STANDARD)
     set(CXX_STANDARD "-std=c++11")
 endif(NOT CXX_STANDARD)
 
+if (NOT C_STANDARD)
+    set(C_STANDARD "-std=c11")
+endif(NOT C_STANDARD)
+
 set(CMAKE_CXX_FLAGS " -msse4.2 -mpclmul -Wno-deprecated-declarations ${CXX_STANDARD} ${CMAKE_CXX_FLAGS}")
 
 
diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index 1913851323..d77eb6d9f5 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -34,6 +34,7 @@ namespace pulsar {
 typedef boost::function<void(Result, Producer)> CreateProducerCallback;
 typedef boost::function<void(Result, Consumer)> SubscribeCallback;
 typedef boost::function<void(Result, Reader)> ReaderCallback;
+typedef boost::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
 typedef boost::function<void(Result)> CloseCallback;
 
 class ClientImpl;
@@ -159,6 +160,38 @@ class Client {
     void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback callback);
 
+    /**
+     * Get the list of partitions for a given topic.
+     *
+     * If the topic is partitioned, this will return a list of partition names. If the topic
is not
+     * partitioned, the returned list will contain the topic name itself.
+     *
+     * This can be used to discover the partitions and create Reader, Consumer or Producer
+     * instances directly on a particular partition.
+     *
+     * @param topic
+     *            the topic name
+     * @since 2.3.0
+     */
+    Result getPartitionsForTopic(const std::string& topic, std::vector<std::string>&
partitions);
+
+    /**
+     * Get the list of partitions for a given topic in asynchronous mode.
+     *
+     * If the topic is partitioned, this will return a list of partition names. If the topic
is not
+     * partitioned, the returned list will contain the topic name itself.
+     *
+     * This can be used to discover the partitions and create Reader, Consumer or Producer
+     * instances directly on a particular partition.
+     *
+     * @param topic
+     *            the topic name
+     * @param callback
+     *            the callback that will be invoked when the list of partitions is available
+     * @since 2.3.0
+     */
+    void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
+
     /**
      *
      * @return
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 4b603bbff3..06a573ac77 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -29,6 +29,7 @@
 #include <pulsar/c/producer_configuration.h>
 #include <pulsar/c/reader_configuration.h>
 #include <pulsar/c/result.h>
+#include <pulsar/c/string_list.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -38,6 +39,7 @@ extern "C" {
 
 typedef struct _pulsar_client pulsar_client_t;
 typedef struct _pulsar_producer pulsar_producer_t;
+typedef struct _pulsar_string_list pulsar_string_list_t;
 
 typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
 typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
@@ -46,6 +48,8 @@ typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_pro
 
 typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer,
void *ctx);
 typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void
*ctx);
+typedef void (*pulsar_get_partitions_callback)(pulsar_result result, pulsar_string_list_t
*partitions,
+                                               void *ctx);
 
 typedef void (*pulsar_close_callback)(pulsar_result result, void *ctx);
 
@@ -134,6 +138,12 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const
char *topi
                                        pulsar_reader_configuration_t *conf, pulsar_reader_callback
callback,
                                        void *ctx);
 
+pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic,
+                                                 pulsar_string_list_t **partitions);
+
+void pulsar_client_get_topic_partitions_async(pulsar_client_t *client, const char *topic,
+                                              pulsar_get_partitions_callback callback, void
*ctx);
+
 pulsar_result pulsar_client_close(pulsar_client_t *client);
 
 void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void
*ctx);
diff --git a/pulsar-client-cpp/include/pulsar/c/string_list.h b/pulsar-client-cpp/include/pulsar/c/string_list.h
new file mode 100644
index 0000000000..41c3f20d87
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/string_list.h
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#pragma GCC visibility push(default)
+
+typedef struct _pulsar_string_list pulsar_string_list_t;
+
+pulsar_string_list_t *pulsar_string_list_create();
+void pulsar_string_list_free(pulsar_string_list_t *list);
+
+int pulsar_string_list_size(pulsar_string_list_t *list);
+
+void pulsar_string_list_append(pulsar_string_list_t *list, const char *item);
+
+const char *pulsar_string_list_get(pulsar_string_list_t *map, int index);
+
+#pragma GCC visibility pop
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index 5cfe01f18c..2353b4b6c4 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -152,6 +152,18 @@ void Client::createReaderAsync(const std::string& topic, const MessageId&
startM
     impl_->createReaderAsync(topic, startMessageId, conf, callback);
 }
 
+Result Client::getPartitionsForTopic(const std::string& topic, std::vector<std::string>&
partitions) {
+    Promise<Result, std::vector<std::string> > promise;
+    getPartitionsForTopicAsync(topic, WaitForCallbackValue<std::vector<std::string>
>(promise));
+    Future<Result, std::vector<std::string> > future = promise.getFuture();
+
+    return future.get(partitions);
+}
+
+void Client::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback
callback) {
+    impl_->getPartitionsForTopicAsync(topic, callback);
+}
+
 Result Client::close() {
     Promise<bool, Result> promise;
     closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index c31cf8e343..5478dd7eee 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -64,6 +64,8 @@ const std::string generateRandomName() {
 }
 typedef boost::unique_lock<boost::mutex> Lock;
 
+typedef std::vector<std::string> StringList;
+
 static const std::string https("https");
 static const std::string pulsarSsl("pulsar+ssl");
 
@@ -413,6 +415,45 @@ void ClientImpl::handleNewConnection(Result result, const ClientConnectionWeakPt
     }
 }
 
+void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
+                                     TopicNamePtr topicName, GetPartitionsCallback callback)
{
+    if (result != ResultOk) {
+        LOG_ERROR("Error getting topic partitions metadata: " << result);
+        callback(result, StringList());
+        return;
+    }
+
+    StringList partitions;
+
+    if (partitionMetadata->getPartitions() > 1) {
+        for (unsigned int i = 0; i < partitionMetadata->getPartitions(); i++) {
+            partitions.push_back(topicName->getTopicPartitionName(i));
+        }
+    } else {
+        partitions.push_back(topicName->toString());
+    }
+
+    callback(ResultOk, partitions);
+}
+
+void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback
callback) {
+    TopicNamePtr topicName;
+    {
+        Lock lock(mutex_);
+        if (state_ != Open) {
+            lock.unlock();
+            callback(ResultAlreadyClosed, StringList());
+            return;
+        } else if (!(topicName = TopicName::get(topic))) {
+            lock.unlock();
+            callback(ResultInvalidTopicName, StringList());
+            return;
+        }
+    }
+    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        boost::bind(&ClientImpl::handleGetPartitions, shared_from_this(), _1, _2, topicName,
callback));
+}
+
 void ClientImpl::closeAsync(CloseCallback callback) {
     Lock lock(mutex_);
     ProducersList producers(producers_);
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 54d459d097..6a9c9d87cf 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -66,6 +66,8 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl>
{
     void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback callback);
 
+    void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);
+
     Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);
     void handleLookup(Result result, LookupDataResultPtr data,
                       Promise<Result, ClientConnectionWeakPtr> promise);
@@ -101,6 +103,9 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl>
{
                                     TopicNamePtr topicName, MessageId startMessageId,
                                     ReaderConfiguration conf, ReaderCallback callback);
 
+    void handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
+                             TopicNamePtr topicName, GetPartitionsCallback callback);
+
     void handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerWeakPtr,
                                CreateProducerCallback callback, ProducerImplBasePtr producer);
     void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerWeakPtr,
diff --git a/pulsar-client-cpp/lib/c/cStringList.cc b/pulsar-client-cpp/lib/c/cStringList.cc
new file mode 100644
index 0000000000..cfe2c26be2
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/cStringList.cc
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/string_list.h>
+
+#include "c_structs.h"
+
+pulsar_string_list_t *pulsar_string_list_create() { return new pulsar_string_list_t; }
+
+void pulsar_string_list_free(pulsar_string_list_t *list) { delete list; }
+
+int pulsar_string_list_size(pulsar_string_list_t *list) { return list->list.size(); }
+
+void pulsar_string_list_append(pulsar_string_list_t *list, const char *item) { list->list.push_back(item);
}
+
+const char *pulsar_string_list_get(pulsar_string_list_t *list, int index) {
+    return list->list[index].c_str();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index ecefe20c03..f8903050d9 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -152,6 +152,45 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const
char *topi
                                       boost::bind(&handle_reader_callback, _1, _2, callback,
ctx));
 }
 
+pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic,
+                                                 pulsar_string_list_t **partitions) {
+    std::vector<std::string> partitionsList;
+    pulsar::Result res = client->client->getPartitionsForTopic(topic, partitionsList);
+    if (res == pulsar::ResultOk) {
+        (*partitions) = pulsar_string_list_create();
+
+        for (int i = 0; i < partitionsList.size(); i++) {
+            pulsar_string_list_append(*partitions, partitionsList[i].c_str());
+        }
+
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_get_partitions_callback(pulsar::Result result,
+                                           const std::vector<std::string> &partitionsList,
+                                           pulsar_get_partitions_callback callback, void
*ctx) {
+    if (result == pulsar::ResultOk) {
+        pulsar_string_list_t *partitions = pulsar_string_list_create();
+
+        for (int i = 0; i < partitionsList.size(); i++) {
+            pulsar_string_list_append(partitions, partitionsList[i].c_str());
+        }
+
+        callback((pulsar_result)result, partitions, ctx);
+    } else {
+        callback((pulsar_result)result, NULL, ctx);
+    }
+}
+
+void pulsar_client_get_topic_partitions_async(pulsar_client_t *client, const char *topic,
+                                              pulsar_get_partitions_callback callback, void
*ctx) {
+    client->client->getPartitionsForTopicAsync(
+        topic, boost::bind(&handle_get_partitions_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close();
}
 
 static void handle_client_close(pulsar::Result result, pulsar_close_callback callback, void
*ctx) {
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
index 41e9fba668..9a82eb8d9f 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -84,3 +84,7 @@ static void handle_result_callback(pulsar::Result result, pulsar_result_callback
 struct _pulsar_string_map {
     std::map<std::string, std::string> map;
 };
+
+struct _pulsar_string_list {
+    std::vector<std::string> list;
+};
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index ae9731aa69..54fee68f98 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -596,6 +596,21 @@ def my_listener(reader, message):
         self._consumers.append(c)
         return c
 
+    def get_topic_partitions(self, topic):
+        """
+        Get the list of partitions for a given topic.
+
+        If the topic is partitioned, this will return a list of partition names. If the topic
is not
+        partitioned, the returned list will contain the topic name itself.
+
+        This can be used to discover the partitions and create Reader, Consumer or Producer
+        instances directly on a particular partition.
+        :param topic: the topic name to lookup
+        :return: a list of partition name
+        """
+        _check_type(str, topic, 'topic')
+        return self._client.get_topic_partitions(topic)
+
     def close(self):
         """
         Close the client and all the associated producers and consumers
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 7fb8f41156..86bad9a86d 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -760,6 +760,23 @@ def test_message_id(self):
         s = MessageId.latest.serialize()
         self.assertEqual(MessageId.deserialize(s), MessageId.latest)
 
+    def test_get_topics_partitions(self):
+        client = Client(self.serviceUrl)
+        topic_partitioned = 'persistent://public/default/test_get_topics_partitions'
+        topic_non_partitioned = 'persistent://public/default/test_get_topics_partitions'
+
+        url1 = self.adminUrl + '/admin/v2/persistent/public/default/test_get_topics_partitions/partitions'
+        doHttpPut(url1, '3')
+
+        self.assertEqual(client.get_topic_partitions(topic_partitioned),
+                         ['persistent://public/default/test_get_topics_partitions-partition-0',
+                          'persistent://public/default/test_get_topics_partitions-partition-1',
+                          'persistent://public/default/test_get_topics_partitions-partition-2'])
+
+        self.assertEqual(client.get_topic_partitions(topic_non_partitioned),
+                         [topic_non_partitioned])
+        client.close()
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc
index 4b6055a038..3dcbf7fc83 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -90,6 +90,24 @@ Reader Client_createReader(Client& client, const std::string& topic,
     return reader;
 }
 
+boost::python::list Client_getTopicPartitions(Client& client, const std::string&
topic) {
+    std::vector<std::string> partitions;
+    Result res;
+
+    Py_BEGIN_ALLOW_THREADS
+    res = client.getPartitionsForTopic(topic, partitions);
+    Py_END_ALLOW_THREADS
+
+    CHECK_RESULT(res);
+
+    boost::python::list pyList;
+    for (int i = 0; i < partitions.size(); i++) {
+        pyList.append(boost::python::object(partitions[i]));
+    }
+
+    return pyList;
+}
+
 void Client_close(Client& client) {
     Result res;
 
@@ -109,6 +127,7 @@ void export_client() {
             .def("subscribe_topics", &Client_subscribe_topics)
             .def("subscribe_pattern", &Client_subscribe_pattern)
             .def("create_reader", &Client_createReader)
+            .def("get_topic_partitions", &Client_getTopicPartitions)
             .def("close", &Client_close)
             .def("shutdown", &Client::shutdown)
             ;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 396eba7298..d264ab129c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2162,3 +2162,31 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic)
{
     producer.close();
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testGetTopicPartitions) {
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/testGetPartitions";
+
+    // call admin api to make it partitioned
+    std::string url = adminUrl + "admin/v2/persistent/public/default/testGetPartitions/partitions";
+    int res = makePutRequest(url, "3");
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+    std::vector<std::string> partitionsList;
+    Result result = client.getPartitionsForTopic(topicName, partitionsList);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(3, partitionsList.size());
+    ASSERT_EQ(topicName + "-partition-0", partitionsList[0]);
+    ASSERT_EQ(topicName + "-partition-1", partitionsList[1]);
+    ASSERT_EQ(topicName + "-partition-2", partitionsList[2]);
+
+    std::vector<std::string> partitionsList2;
+    result = client.getPartitionsForTopic("persistent://public/default/testGetPartitions-non-partitioned",
+                                          partitionsList2);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(1, partitionsList2.size());
+    ASSERT_EQ(partitionsList2[0], "persistent://public/default/testGetPartitions-non-partitioned");
+
+    client.shutdown();
+}
diff --git a/pulsar-client-go/pulsar/c_client.go b/pulsar-client-go/pulsar/c_client.go
index 54dcd309a7..e513b081f3 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -25,10 +25,10 @@ package pulsar
 */
 import "C"
 import (
-	"runtime"
-	"unsafe"
 	"log"
+	"runtime"
 	"strings"
+	"unsafe"
 )
 
 //export pulsarClientLoggerProxy
@@ -144,7 +144,7 @@ func authenticationFinalizer(authentication *authentication) {
 }
 
 type client struct {
-	ptr *C.pulsar_client_t
+	ptr  *C.pulsar_client_t
 	auth *authentication
 }
 
@@ -216,10 +216,64 @@ func (client *client) CreateReader(options ReaderOptions) (Reader, error)
{
 	return res.Reader, res.error
 }
 
+//export pulsarGetTopicPartitionsCallbackProxy
+func pulsarGetTopicPartitionsCallbackProxy(res C.pulsar_result, cPartitions *C.pulsar_string_list_t,
ctx unsafe.Pointer) {
+	callback := restorePointer(ctx).(func([]string, error))
+
+	if res != C.pulsar_result_Ok {
+		callback(nil, newError(res, "Failed to get partitions for topic"))
+	} else {
+		numPartitions := int(C.pulsar_string_list_size(cPartitions))
+		partitions := make([]string, numPartitions)
+		for i := 0; i < numPartitions; i++ {
+			partitions[i] = C.GoString(C.pulsar_string_list_get(cPartitions, C.int(i)))
+		}
+
+		C.pulsar_string_list_free(cPartitions)
+
+		callback(partitions, nil)
+	}
+}
+
 func (client *client) CreateReaderAsync(options ReaderOptions, callback func(Reader, error))
{
 	createReaderAsync(client, options, callback)
 }
 
+func (client *client) TopicPartitions(topic string) ([]string, error) {
+	c := make(chan struct {
+		partitions []string
+		err        error
+	})
+
+	topicPartitionsAsync(client, topic, func(partitions []string, err error) {
+		c <- struct {
+			partitions []string
+			err        error
+		}{partitions, err}
+		close(c)
+	})
+
+	res := <-c
+	return res.partitions, res.err
+}
+
+type getPartitionsCallback struct {
+	partitions []string
+	channel    chan ReaderMessage
+}
+
+func topicPartitionsAsync(client *client, topic string, callback func([]string, error)) {
+	if topic == "" {
+		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
+		return
+	}
+
+	cTopic := C.CString(topic)
+	defer C.free(unsafe.Pointer(cTopic))
+
+	C._pulsar_client_get_topic_partitions(client.ptr, cTopic, savePointer(callback))
+}
+
 func (client *client) Close() error {
 	res := C.pulsar_client_close(client.ptr)
 	if res != C.pulsar_result_Ok {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h
index 881427610e..cdbebf8771 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -130,6 +130,14 @@ static inline void _pulsar_reader_close_async(pulsar_reader_t *reader,
void *ctx
     pulsar_reader_close_async(reader, pulsarReaderCloseCallbackProxy, ctx);
 }
 
+void pulsarGetTopicPartitionsCallbackProxy(pulsar_result result, pulsar_string_list_t* partitions,
void *ctx);
+
+static inline void _pulsar_client_get_topic_partitions(pulsar_client_t *client, const char
*topic,
+                                                       void *ctx) {
+    pulsar_client_get_topic_partitions_async(client, topic, pulsarGetTopicPartitionsCallbackProxy,
ctx);
+}
+
+
 
 //// String array manipulation
 
diff --git a/pulsar-client-go/pulsar/client.go b/pulsar-client-go/pulsar/client.go
index 777b7cd8cb..405c8a396f 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -96,6 +96,16 @@ type Client interface {
 	// This method will block until the reader is created successfully.
 	CreateReader(ReaderOptions) (Reader, error)
 
+	// Fetch the list of partitions for a given topic
+	//
+	// If the topic is partitioned, this will return a list of partition names.
+	// If the topic is not partitioned, the returned list will contain the topic
+	// name itself.
+	//
+	// This can be used to discover the partitions and create {@link Reader},
+	// {@link Consumer} or {@link Producer} instances directly on a particular partition.
+	TopicPartitions(topic string) ([]string, error)
+
 	// Close the Client and free associated resources
 	Close() error
 }
diff --git a/pulsar-client-go/pulsar/client_test.go b/pulsar-client-go/pulsar/client_test.go
new file mode 100644
index 0000000000..7c200ac01a
--- /dev/null
+++ b/pulsar-client-go/pulsar/client_test.go
@@ -0,0 +1,56 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestGetTopicPartitions(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                      "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	// Create topic with 5 partitions
+	httpPut("http://localhost:8080/admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
+		5)
+
+	partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
+
+	partitions, err := client.TopicPartitions(partitionedTopic)
+	assertNil(t, err)
+	assertEqual(t, len(partitions), 5)
+	for i := 0; i < 5; i++ {
+		assertEqual(t, partitions[i],
+			fmt.Sprintf("%s-partition-%d", partitionedTopic, i))
+	}
+
+	// Non-Partitioned topic
+	topic := "persistent://public/default/TestGetTopicPartitions-nopartitions"
+
+	partitions, err = client.TopicPartitions(topic)
+	assertNil(t, err)
+	assertEqual(t, len(partitions), 1)
+	assertEqual(t, partitions[0], topic)
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 74e09e735a..3f2271a5f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -349,6 +350,22 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration
conf) t
      */
     void updateServiceUrl(String serviceUrl) throws PulsarClientException;
 
+    /**
+     * Get the list of partitions for a given topic.
+     *
+     * If the topic is partitioned, this will return a list of partition names. If the topic
is not partitioned, the
+     * returned list will contain the topic name itself.
+     *
+     * This can be used to discover the partitions and create {@link Reader}, {@link Consumer}
or {@link Producer}
+     * instances directly on a particular partition.
+     *
+     * @param topic
+     *            the topic name
+     * @return a future that will yield a list of the topic partitions
+     * @since 2.3.0
+     */
+    CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+
     /**
      * Close the PulsarClient and release all the resources.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 8a8de09d0a..5f5aa4b6ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -30,6 +30,8 @@
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Optional;
@@ -805,6 +807,22 @@ public void reloadLookUp() throws PulsarClientException {
         return metadataFuture;
     }
 
+    @Override
+    public CompletableFuture<List<String>> getPartitionsForTopic(String topic)
{
+        return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
+            if (metadata.partitions > 1) {
+                TopicName topicName = TopicName.get(topic);
+                List<String> partitions = new ArrayList<>(metadata.partitions);
+                for (int i = 0; i < metadata.partitions; i++) {
+                    partitions.add(topicName.getPartition(i).toString());
+                }
+                return partitions;
+            } else {
+                return Collections.singletonList(topic);
+            }
+        });
+    }
+
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
         ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
         return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message