pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] sijie closed pull request #2423: [client] add properties to consumer for cpp & python client
Date Wed, 22 Aug 2018 09:41:02 GMT
sijie closed pull request #2423: [client] add properties to consumer for cpp & python client
URL: https://github.com/apache/incubator-pulsar/pull/2423
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 36e580897b..0687166d74 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -162,6 +162,40 @@ class ConsumerConfiguration {
     void setPatternAutoDiscoveryPeriod(int periodInSeconds);
     int getPatternAutoDiscoveryPeriod() const;
 
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ConsumerConfiguration& setProperty(const std::string& name, const std::string&
value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ConsumerConfiguration& setProperties(const std::map<std::string, std::string>&
properties);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 8a1933bde7..8bd0128e6d 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -20,6 +20,7 @@
 #include "MessageImpl.h"
 #include "Version.h"
 #include "pulsar/MessageBuilder.h"
+#include "PulsarApi.pb.h"
 #include "LogUtils.h"
 #include "Utils.h"
 #include "Url.h"
@@ -27,6 +28,7 @@
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
 
+using namespace pulsar;
 namespace pulsar {
 
 using namespace pulsar::proto;
@@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication,
const
 SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscription,
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType
subType,
                                     const std::string& consumerName, SubscriptionMode
subscriptionMode,
-                                    Optional<MessageId> startMessageId, bool readCompacted)
{
+                                    Optional<MessageId> startMessageId, bool readCompacted,
+                                    const std::map<std::string, std::string>& metadata)
{
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const
std::string&
             messageIdData.set_batch_index(startMessageId.value().batchIndex());
         }
     }
+    for (std::map<std::string, std::string>::const_iterator it = metadata.begin();
it != metadata.end();
+         it++) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(it->first);
+        keyValue->set_value(it->second);
+        subscribe->mutable_metadata()->AddAllocated(keyValue);
+    }
 
     return writeMessageWithSize(cmd);
 }
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index d9b8589fe2..ef1e28026e 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -78,7 +78,7 @@ class Commands {
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, const std::string&
consumerName,
                                      SubscriptionMode subscriptionMode, Optional<MessageId>
startMessageId,
-                                     bool readCompacted);
+                                     bool readCompacted, const std::map<std::string, std::string>&
metadata);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 058ca57e1d..4014ad2e55 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -20,6 +20,8 @@
 
 namespace pulsar {
 
+const static std::string emptyString;
+
 ConsumerConfiguration::ConsumerConfiguration() : impl_(boost::make_shared<ConsumerConfigurationImpl>())
{}
 
 ConsumerConfiguration::~ConsumerConfiguration() {}
@@ -111,4 +113,34 @@ void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds)
{
 
 int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod;
}
 
+bool ConsumerConfiguration::hasProperty(const std::string& name) const {
+    const std::map<std::string, std::string>& m = impl_->properties;
+    return m.find(name) != m.end();
+}
+
+const std::string& ConsumerConfiguration::getProperty(const std::string& name) const
{
+    if (hasProperty(name)) {
+        const std::map<std::string, std::string>& m = impl_->properties;
+        return m.at(name);
+    } else {
+        return emptyString;
+    }
+}
+
+std::map<std::string, std::string>& ConsumerConfiguration::getProperties() const
{ return impl_->properties; }
+
+ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& name,
const std::string& value) {
+    impl_->properties.insert(std::make_pair(name, value));
+    return *this;
+}
+
+ConsumerConfiguration& ConsumerConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (std::map<std::string, std::string>::const_iterator it = properties.begin();
it != properties.end();
+         it++) {
+        setProperty(it->first, it->second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 0cc0c72838..16e91c8e75 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -36,6 +36,7 @@ struct ConsumerConfigurationImpl {
     ConsumerCryptoFailureAction cryptoFailureAction;
     bool readCompacted;
     int patternAutoDiscoveryPeriod;
+    std::map<std::string, std::string> properties;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -47,7 +48,8 @@ struct ConsumerConfigurationImpl {
           cryptoKeyReader(),
           cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
           readCompacted(false),
-          patternAutoDiscoveryPeriod(60) {}
+          patternAutoDiscoveryPeriod(60),
+          properties() {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index f0a9cca721..06eb43fef7 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -139,7 +139,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx)
{
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd =
         Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_,
-                               subscriptionMode_, startMessageId_, readCompacted_);
+                               subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(),
cnx, _1));
 }
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index f3b560b747..222c29fef0 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -402,7 +402,8 @@ def subscribe(self, topic, subscription_name,
                   consumer_name=None,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
-                  is_read_compacted=False
+                  is_read_compacted=False,
+                  properties=None
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -455,6 +456,9 @@ def my_listener(consumer, message):
         * `broker_consumer_stats_cache_time_ms`:
           Sets the time duration for which the broker-side consumer stats will
           be cached in the client.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with a consumer
+          can be used for identify a consumer at broker side.
         """
         _check_type(str, topic, 'topic')
         _check_type(str, subscription_name, 'subscription_name')
@@ -466,6 +470,7 @@ def my_listener(consumer, message):
         _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
         _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -479,6 +484,10 @@ def my_listener(consumer, message):
         if unacked_messages_timeout_ms:
             conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
         conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
         c = Consumer()
         c._consumer = self._client.subscribe(topic, subscription_name, conf)
         c._client = self
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 9deee9af78..7b4459a5d1 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -140,6 +140,7 @@ void export_config() {
             .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
             .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
             .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
+            .def("property", &ConsumerConfiguration::setProperty, return_self<>())
             ;
 
     class_<ReaderConfiguration>("ReaderConfiguration")
diff --git a/pulsar-client-cpp/python/test_consumer.py b/pulsar-client-cpp/python/test_consumer.py
index dd0f937ab0..495dfc0188 100755
--- a/pulsar-client-cpp/python/test_consumer.py
+++ b/pulsar-client-cpp/python/test_consumer.py
@@ -22,7 +22,11 @@
 import pulsar
 
 client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', "my-subscription")
+consumer = client.subscribe('my-topic', "my-subscription",
+                            properties={
+                                "consumer-name": "test-consumer-name",
+                                "consumer-id": "test-consumer-id"
+                            })
 
 while True:
     msg = consumer.receive()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4c1df80c0..b1b05ef778 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -134,8 +134,12 @@ TEST(BasicEndToEndTest, testBatchMessages) {
     ASSERT_EQ(ResultOk, result);
 
     Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
     Promise<Result, Consumer> consumerPromise;
-    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
     Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
     result = consumerFuture.get(consumer);
     ASSERT_EQ(ResultOk, result);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message