pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #3354: Pass schema info to C++ client
Date Fri, 11 Jan 2019 18:25:26 GMT
merlimat closed pull request #3354: Pass schema info to C++ client
URL: https://github.com/apache/pulsar/pull/3354
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index d77eb6d9f5..e04b1b88b7 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -26,6 +26,7 @@
 #include <pulsar/Message.h>
 #include <pulsar/MessageBuilder.h>
 #include <pulsar/ClientConfiguration.h>
+#include <pulsar/Schema.h>
 #include <string>
 
 #pragma GCC visibility push(default)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 0687166d74..60ffef1f85 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -24,6 +24,7 @@
 #include <pulsar/Result.h>
 #include <pulsar/ConsumerType.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 #include <pulsar/ConsumerCryptoFailureAction.h>
 #include <pulsar/CryptoKeyReader.h>
 
@@ -51,6 +52,21 @@ class ConsumerConfiguration {
     ConsumerConfiguration(const ConsumerConfiguration&);
     ConsumerConfiguration& operator=(const ConsumerConfiguration&);
 
+    /**
+     * Declare the schema of the data that this consumer will be accepting.
+     *
+     * The schema will be checked against the schema of the topic, and the
+     * consumer creation will fail if it's not compatible.
+     *
+     * @param schemaInfo the schema definition object
+     */
+    ConsumerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this consumer
+     */
+    const SchemaInfo& getSchema() const;
+
     /**
      * Specify the consumer type. The consumer type enables
      * specifying the type of subscription. In Exclusive subscription,
diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index 407d937043..1941e15923 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -109,6 +109,16 @@ class Producer {
      */
     int64_t getLastSequenceId() const;
 
+    /**
+     * Return an identifier for the schema version that this producer was created with.
+     *
+     * When the producer is created, if a schema info was passed, the broker will
+     * determine the version of the passed schema. This identifier should be treated
+     * as an opaque identifier. In particular, even though this is represented as a string,
the
+     * version might not be ascii printable.
+     */
+    const std::string& getSchemaVersion() const;
+
     /**
      * Close the producer and release resources allocated.
      *
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 565a6ab0ea..6e3d0b4c01 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -25,6 +25,7 @@
 #include <boost/function.hpp>
 #include <pulsar/ProducerCryptoFailureAction.h>
 #include <pulsar/CryptoKeyReader.h>
+#include <pulsar/Schema.h>
 
 #include <set>
 
@@ -55,6 +56,7 @@ class ProducerConfiguration {
         BoostHash,
         JavaStringHash
     };
+
     ProducerConfiguration();
     ~ProducerConfiguration();
     ProducerConfiguration(const ProducerConfiguration&);
@@ -63,6 +65,25 @@ class ProducerConfiguration {
     ProducerConfiguration& setProducerName(const std::string& producerName);
     const std::string& getProducerName() const;
 
+    /**
+     * Declare the schema of the data that will be published by this producer.
+     *
+     * The schema will be checked against the schema of the topic, and it
+     * will fail if it's not compatible, though the client library will
+     * not perform any validation that the actual message payload are
+     * conforming to the specified schema.
+     *
+     * For all purposes, this
+     * @param schemaInfo
+     * @return
+     */
+    ProducerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this producer
+     */
+    const SchemaInfo& getSchema() const;
+
     ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
     int getSendTimeout() const;
 
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 8d365ab813..69776e32ef 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -23,6 +23,7 @@
 #include <boost/shared_ptr.hpp>
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 
 #pragma GCC visibility push(default)
 namespace pulsar {
@@ -48,6 +49,21 @@ class ReaderConfiguration {
     ReaderConfiguration(const ReaderConfiguration&);
     ReaderConfiguration& operator=(const ReaderConfiguration&);
 
+    /**
+     * Declare the schema of the data that this reader will be accepting.
+     *
+     * The schema will be checked against the schema of the topic, and the
+     * reader creation will fail if it's not compatible.
+     *
+     * @param schemaInfo the schema definition object
+     */
+    ReaderConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this consumer
+     */
+    const SchemaInfo& getSchema() const;
+
     /**
      * A message listener enables your application to configure how to process
      * messages. A listener will be called in order for every message received.
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
new file mode 100644
index 0000000000..eb2ebbbf6e
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+
+#include <iosfwd>
+#include <boost/shared_ptr.hpp>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum SchemaType
+{
+    /**
+     * No schema defined
+     */
+    NONE = 0,
+
+    /**
+     * Simple String encoding with UTF-8
+     */
+    STRING = 1,
+
+    /**
+     * A 8-byte integer.
+     */
+    INT8 = 2,
+
+    /**
+     * A 16-byte integer.
+     */
+    INT16 = 3,
+
+    /**
+     * A 32-byte integer.
+     */
+    INT32 = 4,
+
+    /**
+     * A 64-byte integer.
+     */
+    INT64 = 5,
+
+    /**
+     * A float number.
+     */
+    FLOAT = 6,
+
+    /**
+     * A double number
+     */
+    DOUBLE = 7,
+
+    /**
+     * A bytes array.
+     */
+    BYTES = 8,
+
+    /**
+     * JSON object encoding and validation
+     */
+    JSON = 9,
+
+    /**
+     * Protobuf message encoding and decoding
+     */
+    PROTOBUF = 10,
+
+    /**
+     * Serialize and deserialize via Avro
+     */
+    AVRO = 11,
+
+    /**
+     * Auto Consume Type.
+     */
+    AUTO_CONSUME = 13,
+
+    /**
+     * Auto Publish Type.
+     */
+    AUTO_PUBLISH = 14,
+
+    /**
+     * A Schema that contains Key Schema and Value Schema.
+     */
+    KEY_VALUE = 15,
+};
+
+// Return string representation of result code
+const char *strSchemaType(SchemaType schemaType);
+
+class SchemaInfoImpl;
+
+typedef std::map<std::string, std::string> StringMap;
+
+/**
+ * Encapsulates data around the schema definition
+ */
+class SchemaInfo {
+   public:
+    SchemaInfo();
+
+    /**
+     * @param schemaType the schema type
+     * @param name the name of the schema definition
+     * @param schema the schema definition as a JSON string
+     * @param properties a map of custom defined properties attached to the schema
+     */
+    SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
+               const StringMap &properties = StringMap());
+
+    /**
+     * @return the schema type
+     */
+    SchemaType getSchemaType() const;
+
+    /**
+     * @return the name of the schema definition
+     */
+    const std::string &getName() const;
+
+    /**
+     * @return the schema definition as a JSON string
+     */
+    const std::string &getSchema() const;
+
+    /**
+     * @return a map of custom defined properties attached to the schema
+     */
+    const StringMap &getProperties() const;
+
+   private:
+    typedef boost::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
+    SchemaInfoImplPtr impl_;
+};
+
+}  // namespace pulsar
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType);
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fbda62b180..0d8106fb81 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -688,7 +688,7 @@ void ClientConnection::handleIncomingCommand() {
                         pendingRequests_.erase(it);
                         lock.unlock();
 
-                        requestData.promise.setValue({"", -1});
+                        requestData.promise.setValue({});
                         requestData.timer->cancel();
                     }
                     break;
@@ -853,8 +853,13 @@ void ClientConnection::handleIncomingCommand() {
                         pendingRequests_.erase(it);
                         lock.unlock();
 
-                        requestData.promise.setValue(
-                            {producerSuccess.producer_name(), producerSuccess.last_sequence_id()});
+                        ResponseData data;
+                        data.producerName = producerSuccess.producer_name();
+                        data.lastSequenceId = producerSuccess.last_sequence_id();
+                        if (producerSuccess.has_schema_version()) {
+                            data.schemaVersion = producerSuccess.schema_version();
+                        }
+                        requestData.promise.setValue(data);
                         requestData.timer->cancel();
                     }
                     break;
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index ac8d6c50b8..5b47fb5dc9 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -68,7 +68,12 @@ class LookupDataResult;
 
 struct OpSendMsg;
 
-typedef std::pair<std::string, int64_t> ResponseData;
+// Data returned on the request operation. Mostly used on create-producer command
+struct ResponseData {
+    std::string producerName;
+    int64_t lastSequenceId;
+    std::string schemaVersion;
+};
 
 typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
 
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index b048dce587..4c8015435a 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -25,6 +25,7 @@
 #include "PulsarApi.pb.h"
 #include "Utils.h"
 #include "Url.h"
+#include <pulsar/Schema.h>
 #include "checksum/ChecksumProvider.h"
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
@@ -36,6 +37,51 @@ using namespace pulsar::proto;
 
 DECLARE_LOG_OBJECT();
 
+static inline bool isBuiltInSchema(SchemaType schemaType) {
+    switch (schemaType) {
+        case STRING:
+        case JSON:
+        case AVRO:
+        case PROTOBUF:
+            return true;
+
+        default:
+            return false;
+    }
+}
+
+static inline proto::Schema_Type getSchemaType(SchemaType type) {
+    switch (type) {
+        case SchemaType::NONE:
+            return Schema_Type_None;
+        case STRING:
+            return Schema_Type_String;
+        case JSON:
+            return Schema_Type_Json;
+        case PROTOBUF:
+            return Schema_Type_Protobuf;
+        case AVRO:
+            return Schema_Type_Avro;
+        default:
+            return Schema_Type_None;
+    }
+}
+
+static proto::Schema* getSchema(const SchemaInfo& schemaInfo) {
+    proto::Schema* schema = proto::Schema().New();
+    schema->set_name(schemaInfo.getName());
+    schema->set_schema_data(schemaInfo.getSchema());
+    schema->set_type(getSchemaType(schemaInfo.getSchemaType()));
+    for (const auto& kv : schemaInfo.getProperties()) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(kv.first);
+        keyValue->set_value(kv.second);
+        schema->mutable_properties()->AddAllocated(keyValue);
+    }
+
+    return schema;
+}
+
 SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
     size_t cmdSize = cmd.ByteSize();
     size_t frameSize = 4 + cmdSize;
@@ -189,7 +235,8 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const
std::string&
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType
subType,
                                     const std::string& consumerName, SubscriptionMode
subscriptionMode,
                                     Optional<MessageId> startMessageId, bool readCompacted,
-                                    const std::map<std::string, std::string>& metadata)
{
+                                    const std::map<std::string, std::string>& metadata,
+                                    const SchemaInfo& schemaInfo) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -201,6 +248,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const
std::string&
     subscribe->set_consumer_name(consumerName);
     subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
     subscribe->set_read_compacted(readCompacted);
+
+    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+        subscribe->set_allocated_schema(getSchema(schemaInfo));
+    }
+
     if (startMessageId.is_present()) {
         MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
         messageIdData.set_ledgerid(startMessageId.value().ledgerId());
@@ -233,7 +285,8 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId)
{
 
 SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
                                    const std::string& producerName, uint64_t requestId,
-                                   const std::map<std::string, std::string>& metadata)
{
+                                   const std::map<std::string, std::string>& metadata,
+                                   const SchemaInfo& schemaInfo) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
@@ -248,6 +301,10 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t
producerId
         producer->mutable_metadata()->AddAllocated(keyValue);
     }
 
+    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+        producer->set_allocated_schema(getSchema(schemaInfo));
+    }
+
     if (!producerName.empty()) {
         producer->set_producer_name(producerName);
     }
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index e66995334b..15611a8097 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/Authentication.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
@@ -78,13 +79,15 @@ class Commands {
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, const std::string&
consumerName,
                                      SubscriptionMode subscriptionMode, Optional<MessageId>
startMessageId,
-                                     bool readCompacted, const std::map<std::string, std::string>&
metadata);
+                                     bool readCompacted, const std::map<std::string, std::string>&
metadata,
+                                     const SchemaInfo& schemaInfo);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
 
     static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
                                     const std::string& producerName, uint64_t requestId,
-                                    const std::map<std::string, std::string>& metadata);
+                                    const std::map<std::string, std::string>& metadata,
+                                    const SchemaInfo& schemaInfo);
 
     static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 4014ad2e55..4a7e73a84d 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -33,6 +33,13 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat
     return *this;
 }
 
+ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo)
{
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ConsumerConfiguration::getSchema() const { return impl_->schemaInfo;
}
+
 long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const {
     return impl_->brokerConsumerStatsCacheTimeInMs;
 }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 16e91c8e75..bf9ceb5005 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -24,6 +24,7 @@
 
 namespace pulsar {
 struct ConsumerConfigurationImpl {
+    SchemaInfo schemaInfo;
     long unAckedMessagesTimeoutMs;
     ConsumerType consumerType;
     MessageListener messageListener;
@@ -38,7 +39,8 @@ struct ConsumerConfigurationImpl {
     int patternAutoDiscoveryPeriod;
     std::map<std::string, std::string> properties;
     ConsumerConfigurationImpl()
-        : unAckedMessagesTimeoutMs(0),
+        : schemaInfo(),
+          unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
           messageListener(),
           hasMessageListener(false),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 62446d90c3..35a18db45a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -137,9 +137,9 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx)
{
 
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
-    SharedBuffer cmd =
-        Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_,
-                               subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties());
+    SharedBuffer cmd = Commands::newSubscribe(topic_, subscription_, consumerId_, requestId,
getSubType(),
+                                              consumerName_, subscriptionMode_, startMessageId_,
+                                              readCompacted_, config_.getProperties(), config_.getSchema());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(),
cnx, _1));
 }
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0f2780e20d..bd27b0e142 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -147,6 +147,12 @@ const std::string& PartitionedProducerImpl::getProducerName() const
{
     return producers_[0]->getProducerName();
 }
 
+const std::string& PartitionedProducerImpl::getSchemaVersion() const {
+    // Since the schema is atomically assigned on the partitioned-topic,
+    // it's guaranteed that all the partitions will have the same schema version.
+    return producers_[0]->getSchemaVersion();
+}
+
 int64_t PartitionedProducerImpl::getLastSequenceId() const {
     int64_t currentMax = -1L;
     for (int i = 0; i < producers_.size(); i++) {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 0a3d949fba..283d273541 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -59,6 +59,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual int64_t getLastSequenceId() const;
 
+    virtual const std::string& getSchemaVersion() const;
+
     virtual void start();
 
     virtual void shutdown();
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index bee213e6f9..1659a21e78 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -60,6 +60,8 @@ const std::string& Producer::getProducerName() const { return impl_->getProducer
 
 int64_t Producer::getLastSequenceId() const { return impl_->getLastSequenceId(); }
 
+const std::string& Producer::getSchemaVersion() const { return impl_->getSchemaVersion();
}
+
 Result Producer::close() {
     Promise<bool, Result> promise;
     closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ad2cf90a3..a216fc150f 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -187,6 +187,13 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string
key)
     return *this;
 }
 
+ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo)
{
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ProducerConfiguration::getSchema() const { return impl_->schemaInfo;
}
+
 bool ProducerConfiguration::hasProperty(const std::string& name) const {
     const std::map<std::string, std::string>& m = impl_->properties;
     return m.find(name) != m.end();
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index cc5b9b7fe6..7ec09c83f6 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -27,6 +27,7 @@
 namespace pulsar {
 
 struct ProducerConfigurationImpl {
+    SchemaInfo schemaInfo;
     Optional<std::string> producerName;
     Optional<int64_t> initialSequenceId;
     int sendTimeoutMs;
@@ -46,7 +47,8 @@ struct ProducerConfigurationImpl {
     ProducerCryptoFailureAction cryptoFailureAction;
     std::map<std::string, std::string> properties;
     ProducerConfigurationImpl()
-        : sendTimeoutMs(30000),
+        : schemaInfo(),
+          sendTimeoutMs(30000),
           compressionType(CompressionNone),
           maxPendingMessages(1000),
           maxPendingMessagesAcrossPartitions(50000),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 29d081b340..4c5c56d428 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -102,6 +102,8 @@ const std::string& ProducerImpl::getProducerName() const { return
producerName_;
 
 int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; }
 
+const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
+
 void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
     if (ec) {
         LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -127,8 +129,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx)
{
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd =
-        Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties());
+    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
+                                             conf_.getProperties(), conf_.getSchema());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(),
cnx, _1, _2));
 }
@@ -150,20 +152,19 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr&
cnx, Result r
     if (result == ResultOk) {
         // We are now reconnected to broker and clear to send messages. Re-send all pending
messages and
         // set the cnx pointer so that new messages will be sent immediately
-        const std::string& producerName = responseData.first;
-        int64_t lastSequenceId = responseData.second;
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
 
         Lock lock(mutex_);
         cnx->registerProducer(producerId_, shared_from_this());
-        producerName_ = producerName;
+        producerName_ = responseData.producerName;
+        schemaVersion_ = responseData.schemaVersion;
         producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
         if (batchMessageContainer) {
             batchMessageContainer->producerName_ = producerName_;
         }
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1)
{
-            lastSequenceIdPublished_ = lastSequenceId;
+            lastSequenceIdPublished_ = responseData.lastSequenceId;
             msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
         }
         resendMessages(cnx);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index aa22abc668..00f7f31d6f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -81,6 +81,8 @@ class ProducerImpl : public HandlerBase,
 
     int64_t getLastSequenceId() const;
 
+    const std::string& getSchemaVersion() const;
+
     uint64_t getProducerId() const;
 
     virtual void start();
@@ -147,6 +149,7 @@ class ProducerImpl : public HandlerBase,
     BatchMessageContainerPtr batchMessageContainer;
 
     volatile int64_t lastSequenceIdPublished_;
+    std::string schemaVersion_;
 
     typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
     TimerPtr sendTimer_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h
index 3dd92a4618..25b4db3a01 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -34,6 +34,7 @@ class ProducerImplBase {
     virtual const std::string& getProducerName() const = 0;
 
     virtual int64_t getLastSequenceId() const = 0;
+    virtual const std::string& getSchemaVersion() const = 0;
 
     virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
     virtual void closeAsync(CloseCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index a649b33e50..1f13c02758 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -31,6 +31,13 @@ ReaderConfiguration& ReaderConfiguration::operator=(const ReaderConfiguration&
x
     return *this;
 }
 
+ReaderConfiguration& ReaderConfiguration::setSchema(const SchemaInfo& schemaInfo)
{
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ReaderConfiguration::getSchema() const { return impl_->schemaInfo;
}
+
 ReaderConfiguration& ReaderConfiguration::setReaderListener(ReaderListener readerListener)
{
     impl_->readerListener = readerListener;
     impl_->hasReaderListener = true;
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 5dca8e3cdd..73082ea2b2 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -24,6 +24,7 @@
 
 namespace pulsar {
 struct ReaderConfigurationImpl {
+    SchemaInfo schemaInfo;
     ReaderListener readerListener;
     bool hasReaderListener;
     int receiverQueueSize;
@@ -31,7 +32,8 @@ struct ReaderConfigurationImpl {
     std::string subscriptionRolePrefix;
     bool readCompacted;
     ReaderConfigurationImpl()
-        : hasReaderListener(false),
+        : schemaInfo(),
+          hasReaderListener(false),
           receiverQueueSize(1000),
           readerName(),
           subscriptionRolePrefix(),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 9507a0b25a..ce2ef215b3 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -33,6 +33,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
     consumerConf.setConsumerType(ConsumerExclusive);
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
     consumerConf.setReadCompacted(readerConf_.isReadCompacted());
+    consumerConf.setSchema(readerConf_.getSchema());
 
     if (readerConf_.getReaderName().length() > 0) {
         consumerConf.setConsumerName(readerConf_.getReaderName());
diff --git a/pulsar-client-cpp/lib/Schema.cc b/pulsar-client-cpp/lib/Schema.cc
new file mode 100644
index 0000000000..487736908e
--- /dev/null
+++ b/pulsar-client-cpp/lib/Schema.cc
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Schema.h>
+
+#include <iostream>
+#include <map>
+#include <boost/make_shared.hpp>
+#include <include/pulsar/Schema.h>
+
+#pragma GCC visibility push(default)
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType) {
+    return s << strSchemaType(schemaType);
+}
+
+namespace pulsar {
+
+const char *strSchemaType(SchemaType schemaType) {
+    switch (schemaType) {
+        case NONE:
+            return "NONE";
+        case STRING:
+            return "STRING";
+        case INT8:
+            return "INT8";
+        case INT16:
+            return "INT16";
+        case INT32:
+            return "INT32";
+        case INT64:
+            return "INT64";
+        case FLOAT:
+            return "FLOAT";
+        case DOUBLE:
+            return "DOUBLE";
+        case BYTES:
+            return "BYTES";
+        case JSON:
+            return "JSON";
+        case PROTOBUF:
+            return "PROTOBUF";
+        case AVRO:
+            return "AVRO";
+        case AUTO_CONSUME:
+            return "AUTO_CONSUME";
+        case AUTO_PUBLISH:
+            return "AUTO_PUBLISH";
+        case KEY_VALUE:
+            return "KEY_VALUE";
+    };
+    // NOTE : Do not add default case in the switch above. In future if we get new cases
for
+    // Schema and miss them in the switch above we would like to get notified. Adding
+    // return here to make the compiler happy.
+    return "UnknownSchemaType";
+}
+
+class SchemaInfoImpl {
+   public:
+    const std::string name_;
+    const std::string schema_;
+    const SchemaType type_;
+    const std::map<std::string, std::string> properties_;
+
+    SchemaInfoImpl() : name_("BYTES"), schema_(), type_(BYTES), properties_() {}
+
+    SchemaInfoImpl(SchemaType schemaType, const std::string &name, const std::string
&schema,
+                   const StringMap &properties)
+        : type_(schemaType), name_(name), schema_(schema), properties_(properties) {}
+};
+
+SchemaInfo::SchemaInfo() : impl_(boost::make_shared<SchemaInfoImpl>()) {}
+
+SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std::string
&schema,
+                       const StringMap &properties)
+    : impl_(boost::make_shared<SchemaInfoImpl>(schemaType, name, schema, properties))
{}
+
+SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
+
+const std::string &SchemaInfo::getName() const { return impl_->name_; }
+
+const std::string &SchemaInfo::getSchema() const { return impl_->schema_; }
+
+const std::map<std::string, std::string> &SchemaInfo::getProperties() const { return
impl_->properties_; }
+
+}  // namespace pulsar
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc
new file mode 100644
index 0000000000..7ad7b9e892
--- /dev/null
+++ b/pulsar-client-cpp/tests/SchemaTest.cc
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+static const std::string exampleSchema =
+    "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+    "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+TEST(SchemaTest, testSchema) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    Result res;
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+    res = client.createProducer("topic-avro", producerConf, producer);
+    producer.close();
+
+    ASSERT_EQ(ResultOk, res);
+
+    // Creating producer with no schema on same topic should fail
+    producerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+    res = client.createProducer("topic-avro", producerConf, producer);
+    ASSERT_EQ(ResultIncompatibleSchema, res);
+
+    // Creating producer with no schema on same topic should succeed
+    // because standalone broker is configured by default to not
+    // require the schema to be set
+    res = client.createProducer("topic-avro", producer);
+    ASSERT_EQ(ResultOk, res);
+
+    ConsumerConfiguration consumerConf;
+    Consumer consumer;
+    // Subscribing with no schema will still succeed
+    res = client.subscribe("topic-avro", "sub-1", consumerConf, consumer);
+    ASSERT_EQ(ResultOk, res);
+
+    // Subscribing with same Avro schema will succeed
+    consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+    ASSERT_EQ(ResultOk, res);
+
+    // Subscribing with different schema type will fail
+    consumerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+    ASSERT_EQ(ResultIncompatibleSchema, res);
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message