pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Export Message.Topic() in Go wrapper (#3346)
Date Thu, 10 Jan 2019 04:26:35 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa45573  Export Message.Topic() in Go wrapper (#3346)
fa45573 is described below

commit fa455730f81570ccc38a22807da416fa463a57c8
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Wed Jan 9 20:26:30 2019 -0800

    Export Message.Topic() in Go wrapper (#3346)
---
 pulsar-client-cpp/include/pulsar/Message.h   | 2 +-
 pulsar-client-cpp/include/pulsar/c/message.h | 2 ++
 pulsar-client-cpp/lib/Commands.cc            | 3 ++-
 pulsar-client-cpp/lib/ConsumerImpl.cc        | 2 +-
 pulsar-client-cpp/lib/Message.cc             | 3 ++-
 pulsar-client-cpp/lib/c/c_Message.cc         | 4 ++++
 pulsar-client-go/pulsar/c_message.go         | 4 ++++
 pulsar-client-go/pulsar/consumer_test.go     | 1 +
 pulsar-client-go/pulsar/message.go           | 3 +++
 9 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 08d064a..804a44c 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -136,7 +136,7 @@ class Message {
             int32_t partition);
     /// Used for Batch Messages
     Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer&
payload,
-            proto::SingleMessageMetadata& singleMetadata);
+            proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
     friend class MultiTopicsConsumerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index 107fe6c..3955a6d 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -167,6 +167,8 @@ uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message);
  */
 uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message);
 
+const char *pulsar_message_get_topic_name(pulsar_message_t *message);
+
 #pragma GCC visibility pop
 
 #ifdef __cplusplus
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index c40c899..b048dce 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -540,7 +540,8 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage,
int32
 
     const MessageId& m = batchedMessage.impl_->messageId;
     MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
-    Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata);
+    Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata,
+                          batchedMessage.impl_->getTopicName());
     singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
 
     return singleMessage;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index d2954a1..62446d9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -279,7 +279,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx,
const proto::
 
     Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
-    m.impl_->topicName_ = &topic_;
+    m.impl_->setTopicName(topic_);
 
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
     LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index f78bcb3..cad2d91 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -73,12 +73,13 @@ Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata&
metad
 }
 
 Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer&
payload,
-                 proto::SingleMessageMetadata& singleMetadata)
+                 proto::SingleMessageMetadata& singleMetadata, const std::string&
topicName)
     : impl_(boost::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
     impl_->metadata = metadata;
     impl_->payload = payload;
     impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
+    impl_->topicName_ = &topicName;
 
     if (singleMetadata.has_partition_key()) {
         impl_->metadata.set_partition_key(singleMetadata.partition_key());
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc b/pulsar-client-cpp/lib/c/c_Message.cc
index f9288ac..34740d3 100644
--- a/pulsar-client-cpp/lib/c/c_Message.cc
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -100,3 +100,7 @@ pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message)
{
     map->map = message->message.getProperties();
     return map;
 }
+
+const char *pulsar_message_get_topic_name(pulsar_message_t *message) {
+    return message->message.getTopicName().c_str();
+}
diff --git a/pulsar-client-go/pulsar/c_message.go b/pulsar-client-go/pulsar/c_message.go
index 3db1e0b..a7ea4e3 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -151,6 +151,10 @@ func (m *message) Key() string {
 	return C.GoString(C.pulsar_message_get_partitionKey(m.ptr))
 }
 
+func (m *message) Topic() string {
+	return C.GoString(C.pulsar_message_get_topic_name(m.ptr))
+}
+
 //////// MessageID
 
 func newMessageId(msg *C.pulsar_message_t) MessageID {
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index f81ce56..6e9d2c1 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -95,6 +95,7 @@ func TestConsumer(t *testing.T) {
 		assertNotNil(t, msg)
 
 		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+		assertEqual(t, string(msg.Topic()), "persistent://public/default/my-topic")
 
 		consumer.Ack(msg)
 	}
diff --git a/pulsar-client-go/pulsar/message.go b/pulsar-client-go/pulsar/message.go
index 9b05e37..6b6a3cd 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -39,6 +39,9 @@ type ProducerMessage struct {
 }
 
 type Message interface {
+	// Get the topic from which this message originated from
+	Topic() string
+
 	// Return the properties attached to the message.
 	// Properties are application defined key/value pairs that will be attached to the message
 	Properties() map[string]string


Mime
View raw message