This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ffde1bc [client] add properties to consumer for cpp & python client (#2423)
ffde1bc is described below
commit ffde1bcd45c73e94d5f41a05fc6fb2c6f31d3764
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Wed Aug 22 02:41:00 2018 -0700
[client] add properties to consumer for cpp & python client (#2423)
* [client] add properties to consumer for cpp & python client
### Motivation
This is a caught-up change to enable properties for consumer as java clients.
### Changes
Enable properties on consumer for both cpp & python client
### Results
Properties are added as metadata for CommandSubscribe. However there is no way
to verify the consumer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.
* remove "make format"
---
.../include/pulsar/ConsumerConfiguration.h | 34 ++++++++++++++++++++++
pulsar-client-cpp/lib/Commands.cc | 12 +++++++-
pulsar-client-cpp/lib/Commands.h | 2 +-
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 32 ++++++++++++++++++++
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 ++-
pulsar-client-cpp/lib/ConsumerImpl.cc | 2 +-
pulsar-client-cpp/python/pulsar/__init__.py | 11 ++++++-
pulsar-client-cpp/python/src/config.cc | 1 +
pulsar-client-cpp/python/test_consumer.py | 6 +++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 6 +++-
10 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 36e5808..0687166 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -162,6 +162,40 @@ class ConsumerConfiguration {
void setPatternAutoDiscoveryPeriod(int periodInSeconds);
int getPatternAutoDiscoveryPeriod() const;
+ /**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+ bool hasProperty(const std::string& name) const;
+
+ /**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+ const std::string& getProperty(const std::string& name) const;
+
+ /**
+ * Get all the properties attached to this producer.
+ */
+ std::map<std::string, std::string>& getProperties() const;
+
+ /**
+ * Sets a new property on a message.
+ * @param name the name of the property
+ * @param value the associated value
+ */
+ ConsumerConfiguration& setProperty(const std::string& name, const std::string&
value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ ConsumerConfiguration& setProperties(const std::map<std::string, std::string>&
properties);
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 8a1933b..8bd0128 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -20,6 +20,7 @@
#include "MessageImpl.h"
#include "Version.h"
#include "pulsar/MessageBuilder.h"
+#include "PulsarApi.pb.h"
#include "LogUtils.h"
#include "Utils.h"
#include "Url.h"
@@ -27,6 +28,7 @@
#include <algorithm>
#include <boost/thread/mutex.hpp>
+using namespace pulsar;
namespace pulsar {
using namespace pulsar::proto;
@@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication,
const
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType
subType,
const std::string& consumerName, SubscriptionMode
subscriptionMode,
- Optional<MessageId> startMessageId, bool readCompacted)
{
+ Optional<MessageId> startMessageId, bool readCompacted,
+ const std::map<std::string, std::string>& metadata)
{
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const
std::string&
messageIdData.set_batch_index(startMessageId.value().batchIndex());
}
}
+ for (std::map<std::string, std::string>::const_iterator it = metadata.begin();
it != metadata.end();
+ it++) {
+ proto::KeyValue* keyValue = proto::KeyValue().New();
+ keyValue->set_key(it->first);
+ keyValue->set_value(it->second);
+ subscribe->mutable_metadata()->AddAllocated(keyValue);
+ }
return writeMessageWithSize(cmd);
}
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index d9b8589..ef1e280 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -78,7 +78,7 @@ class Commands {
uint64_t consumerId, uint64_t requestId,
proto::CommandSubscribe_SubType subType, const std::string&
consumerName,
SubscriptionMode subscriptionMode, Optional<MessageId>
startMessageId,
- bool readCompacted);
+ bool readCompacted, const std::map<std::string, std::string>&
metadata);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 058ca57..4014ad2 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -20,6 +20,8 @@
namespace pulsar {
+const static std::string emptyString;
+
ConsumerConfiguration::ConsumerConfiguration() : impl_(boost::make_shared<ConsumerConfigurationImpl>())
{}
ConsumerConfiguration::~ConsumerConfiguration() {}
@@ -111,4 +113,34 @@ void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds)
{
int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod;
}
+bool ConsumerConfiguration::hasProperty(const std::string& name) const {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.find(name) != m.end();
+}
+
+const std::string& ConsumerConfiguration::getProperty(const std::string& name) const
{
+ if (hasProperty(name)) {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.at(name);
+ } else {
+ return emptyString;
+ }
+}
+
+std::map<std::string, std::string>& ConsumerConfiguration::getProperties() const
{ return impl_->properties; }
+
+ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& name,
const std::string& value) {
+ impl_->properties.insert(std::make_pair(name, value));
+ return *this;
+}
+
+ConsumerConfiguration& ConsumerConfiguration::setProperties(
+ const std::map<std::string, std::string>& properties) {
+ for (std::map<std::string, std::string>::const_iterator it = properties.begin();
it != properties.end();
+ it++) {
+ setProperty(it->first, it->second);
+ }
+ return *this;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 0cc0c72..16e91c8 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -36,6 +36,7 @@ struct ConsumerConfigurationImpl {
ConsumerCryptoFailureAction cryptoFailureAction;
bool readCompacted;
int patternAutoDiscoveryPeriod;
+ std::map<std::string, std::string> properties;
ConsumerConfigurationImpl()
: unAckedMessagesTimeoutMs(0),
consumerType(ConsumerExclusive),
@@ -47,7 +48,8 @@ struct ConsumerConfigurationImpl {
cryptoKeyReader(),
cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
readCompacted(false),
- patternAutoDiscoveryPeriod(60) {}
+ patternAutoDiscoveryPeriod(60),
+ properties() {}
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index f0a9cca..06eb43f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -139,7 +139,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx)
{
uint64_t requestId = client->newRequestId();
SharedBuffer cmd =
Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_,
- subscriptionMode_, startMessageId_, readCompacted_);
+ subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(),
cnx, _1));
}
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index f3b560b..222c29f 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -402,7 +402,8 @@ class Client:
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
- is_read_compacted=False
+ is_read_compacted=False,
+ properties=None
):
"""
Subscribe to the given topic and subscription combination.
@@ -455,6 +456,9 @@ class Client:
* `broker_consumer_stats_cache_time_ms`:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.
+ * `properties`:
+ Sets the properties for the consumer. The properties associated with a consumer
+ can be used for identify a consumer at broker side.
"""
_check_type(str, topic, 'topic')
_check_type(str, subscription_name, 'subscription_name')
@@ -466,6 +470,7 @@ class Client:
_check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
_check_type(bool, is_read_compacted, 'is_read_compacted')
+ _check_type_or_none(dict, properties, 'properties')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
@@ -479,6 +484,10 @@ class Client:
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
c = Consumer()
c._consumer = self._client.subscribe(topic, subscription_name, conf)
c._client = self
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 9deee9a..7b4459a 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -140,6 +140,7 @@ void export_config() {
.def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
.def("read_compacted", &ConsumerConfiguration::isReadCompacted)
.def("read_compacted", &ConsumerConfiguration::setReadCompacted)
+ .def("property", &ConsumerConfiguration::setProperty, return_self<>())
;
class_<ReaderConfiguration>("ReaderConfiguration")
diff --git a/pulsar-client-cpp/python/test_consumer.py b/pulsar-client-cpp/python/test_consumer.py
index dd0f937..495dfc0 100755
--- a/pulsar-client-cpp/python/test_consumer.py
+++ b/pulsar-client-cpp/python/test_consumer.py
@@ -22,7 +22,11 @@
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', "my-subscription")
+consumer = client.subscribe('my-topic', "my-subscription",
+ properties={
+ "consumer-name": "test-consumer-name",
+ "consumer-id": "test-consumer-id"
+ })
while True:
msg = consumer.receive()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4c1df8..b1b05ef 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -134,8 +134,12 @@ TEST(BasicEndToEndTest, testBatchMessages) {
ASSERT_EQ(ResultOk, result);
Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setProperty("consumer-name", "test-consumer-name");
+ consumerConfig.setProperty("consumer-id", "test-consumer-id");
Promise<Result, Consumer> consumerPromise;
- client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+ client.subscribeAsync(topicName, subName, consumerConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
|