This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d7ec1c5 [client] add properties to producer for cpp & python client (#2420)
d7ec1c5 is described below
commit d7ec1c5051ee3f7066cd1ba1572606760a3c371e
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Wed Aug 22 09:32:46 2018 -0700
[client] add properties to producer for cpp & python client (#2420)
* [client] add properties to producer for cpp & python client
### Motivation
This is a caught-up change to enable properties for producer as java clients.
### Changes
Enable properties on producer for both cpp & python client
### Results
Properties are added as metadata for CommandProducer. However there is no way
to verify the producer properties. so I didn't add any specific tests, just
adding properties for both cpp and python clients in the tests, that should
excerise the corresponding code path.
* Add `properties` to pydoc
---
.../include/pulsar/ProducerConfiguration.h | 34 ++++++++++++++++++++++
pulsar-client-cpp/lib/Commands.cc | 11 ++++++-
pulsar-client-cpp/lib/Commands.h | 3 +-
pulsar-client-cpp/lib/ProducerConfiguration.cc | 34 +++++++++++++++++++++-
pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ProducerImpl.cc | 3 +-
pulsar-client-cpp/python/pulsar/__init__.py | 11 ++++++-
pulsar-client-cpp/python/src/config.cc | 1 +
pulsar-client-cpp/python/test_producer.py | 6 +++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++
10 files changed, 100 insertions(+), 6 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 9f7bf1f..45154c5 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -127,6 +127,40 @@ class ProducerConfiguration {
bool isEncryptionEnabled() const;
ProducerConfiguration& addEncryptionKey(std::string key);
+ /**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+ bool hasProperty(const std::string& name) const;
+
+ /**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+ const std::string& getProperty(const std::string& name) const;
+
+ /**
+ * Get all the properties attached to this producer.
+ */
+ std::map<std::string, std::string>& getProperties() const;
+
+ /**
+ * Sets a new property on a message.
+ * @param name the name of the property
+ * @param value the associated value
+ */
+ ProducerConfiguration& setProperty(const std::string& name, const std::string&
value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ ProducerConfiguration& setProperties(const std::map<std::string, std::string>&
properties);
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 8bd0128..4d9f1be 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
#include "pulsar/MessageBuilder.h"
#include "PulsarApi.pb.h"
#include "LogUtils.h"
+#include "PulsarApi.pb.h"
#include "Utils.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
@@ -231,13 +232,21 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t
requestId) {
}
SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
- const std::string& producerName, uint64_t requestId)
{
+ const std::string& producerName, uint64_t requestId,
+ const std::map<std::string, std::string>& metadata)
{
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
producer->set_topic(topic);
producer->set_producer_id(producerId);
producer->set_request_id(requestId);
+ for (std::map<std::string, std::string>::const_iterator it = metadata.begin();
it != metadata.end();
+ it++) {
+ proto::KeyValue* keyValue = proto::KeyValue().New();
+ keyValue->set_key(it->first);
+ keyValue->set_value(it->second);
+ producer->mutable_metadata()->AddAllocated(keyValue);
+ }
if (!producerName.empty()) {
producer->set_producer_name(producerName);
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index ef1e280..e669953 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -83,7 +83,8 @@ class Commands {
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
- const std::string& producerName, uint64_t requestId);
+ const std::string& producerName, uint64_t requestId,
+ const std::map<std::string, std::string>& metadata);
static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index ab70db2..9ad2cf9 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -19,6 +19,9 @@
#include <lib/ProducerConfigurationImpl.h>
namespace pulsar {
+
+const static std::string emptyString;
+
ProducerConfiguration::ProducerConfiguration() : impl_(boost::make_shared<ProducerConfigurationImpl>())
{}
ProducerConfiguration::~ProducerConfiguration() {}
@@ -36,7 +39,6 @@ ProducerConfiguration& ProducerConfiguration::setProducerName(const
std::string&
}
const std::string& ProducerConfiguration::getProducerName() const {
- static const std::string emptyString;
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
}
@@ -185,4 +187,34 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string
key)
return *this;
}
+bool ProducerConfiguration::hasProperty(const std::string& name) const {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.find(name) != m.end();
+}
+
+const std::string& ProducerConfiguration::getProperty(const std::string& name) const
{
+ if (hasProperty(name)) {
+ const std::map<std::string, std::string>& m = impl_->properties;
+ return m.at(name);
+ } else {
+ return emptyString;
+ }
+}
+
+std::map<std::string, std::string>& ProducerConfiguration::getProperties() const
{ return impl_->properties; }
+
+ProducerConfiguration& ProducerConfiguration::setProperty(const std::string& name,
const std::string& value) {
+ impl_->properties.insert(std::make_pair(name, value));
+ return *this;
+}
+
+ProducerConfiguration& ProducerConfiguration::setProperties(
+ const std::map<std::string, std::string>& properties) {
+ for (std::map<std::string, std::string>::const_iterator it = properties.begin();
it != properties.end();
+ it++) {
+ setProperty(it->first, it->second);
+ }
+ return *this;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 3f788a9..6dfaeed 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -44,6 +44,7 @@ struct ProducerConfigurationImpl {
CryptoKeyReaderPtr cryptoKeyReader;
std::set<std::string> encryptionKeys;
ProducerCryptoFailureAction cryptoFailureAction;
+ std::map<std::string, std::string> properties;
ProducerConfigurationImpl()
: sendTimeoutMs(30000),
compressionType(CompressionNone),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 7f09873..881b189 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -127,7 +127,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx)
{
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId);
+ SharedBuffer cmd =
+ Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(),
cnx, _1, _2));
}
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 222c29f..1185812 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -321,7 +321,8 @@ class Client:
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
- message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution
+ message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+ properties=None,
):
"""
Create a new producer on a given topic.
@@ -361,6 +362,9 @@ class Client:
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
+ * `properties`:
+ Sets the properties for the producer. The properties associated with a producer
+ can be used for identify a producer at broker side.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
@@ -374,6 +378,7 @@ class Client:
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
+ _check_type_or_none(dict, properties, 'properties')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
@@ -390,6 +395,10 @@ class Client:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
p = Producer()
p._producer = self._client.create_producer(topic, conf)
return p
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 7b4459a..9626049 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -122,6 +122,7 @@ void export_config() {
.def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes,
return_self<>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs,
return_value_policy<copy_const_reference>())
.def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs,
return_self<>())
+ .def("property", &ProducerConfiguration::setProperty, return_self<>())
;
class_<ConsumerConfiguration>("ConsumerConfiguration")
diff --git a/pulsar-client-cpp/python/test_producer.py b/pulsar-client-cpp/python/test_producer.py
index a3dd1f8..3bd1537 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -27,7 +27,11 @@ producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
- batching_max_publish_delay_ms=10
+ batching_max_publish_delay_ms=10,
+ properties={
+ "producer-name": "test-producer-name",
+ "producer-id": "test-producer-id"
+ }
)
for i in range(10):
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b1b05ef..e87850c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -126,6 +126,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBlockIfQueueFull(true);
+ conf.setProperty("producer-name", "test-producer-name");
+ conf.setProperty("producer-id", "test-producer-id");
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
|