pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar] branch branch-2.1 updated: Cpp client: add multiTopicsConsumer (#1996)
Date Mon, 27 Aug 2018 08:51:57 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 500e393  Cpp client: add multiTopicsConsumer (#1996)
500e393 is described below

commit 500e3936d520b1854a096e4ef71be7cb7edfc502
Author: Jia Zhai <zhaijia03@gmail.com>
AuthorDate: Tue Jul 24 05:25:45 2018 +0800

    Cpp client: add multiTopicsConsumer (#1996)
    
    In PR #1103, we add Multi-Topics-Consumer in java client.  This is a catch up work to add it in cpp client.
---
 pulsar-client-cpp/include/pulsar/Client.h          |   9 +
 pulsar-client-cpp/include/pulsar/Consumer.h        |   1 +
 pulsar-client-cpp/include/pulsar/Message.h         |   1 +
 pulsar-client-cpp/include/pulsar/MessageId.h       |  12 +
 pulsar-client-cpp/lib/Client.cc                    |  24 +
 pulsar-client-cpp/lib/ClientImpl.cc                |  36 ++
 pulsar-client-cpp/lib/ClientImpl.h                 |   3 +
 pulsar-client-cpp/lib/MessageId.cc                 |   4 +
 pulsar-client-cpp/lib/MessageIdImpl.h              |  17 +-
 pulsar-client-cpp/lib/MessageImpl.cc               |  10 +-
 pulsar-client-cpp/lib/MessageImpl.h                |  11 +
 .../lib/MultiTopicsBrokerConsumerStatsImpl.cc      | 158 +++++
 .../lib/MultiTopicsBrokerConsumerStatsImpl.h       |  92 +++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 644 +++++++++++++++++++++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    | 133 +++++
 pulsar-client-cpp/lib/NamespaceName.cc             |   5 +
 pulsar-client-cpp/lib/NamespaceName.h              |   5 +
 pulsar-client-cpp/lib/TopicName.cc                 |   3 +
 pulsar-client-cpp/lib/TopicName.h                  |   1 +
 .../lib/UnAckedMessageTrackerDisabled.h            |   1 +
 .../lib/UnAckedMessageTrackerEnabled.cc            |  20 +
 .../lib/UnAckedMessageTrackerEnabled.h             |   1 +
 .../lib/UnAckedMessageTrackerInterface.h           |   3 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 173 +++++-
 pulsar-client-cpp/tests/NamespaceNameTest.cc       |   1 +
 25 files changed, 1364 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index 07b4355..6a9e487 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,15 @@ class Client {
     void subscribeAsync(const std::string& topic, const std::string& consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback callback);
 
+    Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                     Consumer& consumer);
+    Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                     const ConsumerConfiguration& conf, Consumer& consumer);
+    void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                        SubscribeCallback callback);
+    void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                        const ConsumerConfiguration& conf, SubscribeCallback callback);
+
     /**
      * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
      * topic.
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index 4272166..4486515 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -243,6 +243,7 @@ class Consumer {
     friend class PulsarFriend;
     friend class PulsarWrapper;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class ConsumerImpl;
     friend class ClientImpl;
     friend class ConsumerTest;
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index aff0d94..a3b9af0 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -134,6 +134,7 @@ class Message {
             proto::SingleMessageMetadata& singleMetadata);
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class MessageBuilder;
     friend class ConsumerImpl;
     friend class ProducerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index dfe3a51..e9ff133 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -22,6 +22,7 @@
 #include <iosfwd>
 #include <stdint.h>
 #include <boost/shared_ptr.hpp>
+//#include <lib/MessageIdImpl.h>
 
 #pragma GCC visibility push(default)
 
@@ -51,6 +52,16 @@ class MessageId {
     void serialize(std::string& result) const;
 
     /**
+     * Get the topic Name
+     */
+    const std::string& getTopicName() const;
+
+    /**
+     * Set the topicName
+     */
+    void setTopicName(const std::string& topicName);
+
+    /**
      * Deserialize a message id from a binary string
      */
     static MessageId deserialize(const std::string& serializedMessageId);
@@ -71,6 +82,7 @@ class MessageId {
     friend class Commands;
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
+    friend class MultiTopicsConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
     friend class BatchAcknowledgementTracker;
     friend class PulsarWrapper;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index 5936e48..bba3520 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -90,6 +90,30 @@ void Client::subscribeAsync(const std::string& topic, const std::string& consume
     impl_->subscribeAsync(topic, consumerName, conf, callback);
 }
 
+Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                         Consumer& consumer) {
+    return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer);
+}
+
+Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                         const ConsumerConfiguration& conf, Consumer& consumer) {
+    Promise<Result, Consumer> promise;
+    subscribeAsync(topics, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise));
+    Future<Result, Consumer> future = promise.getFuture();
+
+    return future.get(consumer);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                            SubscribeCallback callback) {
+    subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback);
+}
+
+void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
+                            const ConsumerConfiguration& conf, SubscribeCallback callback) {
+    impl_->subscribeAsync(topics, subscriptionName, conf, callback);
+}
+
 Result Client::createReader(const std::string& topic, const MessageId& startMessageId,
                             const ReaderConfiguration& conf, Reader& reader) {
     Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 1d46cd9..3768926 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -24,6 +24,7 @@
 #include "ReaderImpl.h"
 #include "PartitionedProducerImpl.h"
 #include "PartitionedConsumerImpl.h"
+#include "MultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
 #include "Log4CxxLogger.h"
 #include <boost/bind.hpp>
@@ -33,6 +34,7 @@
 #include "boost/date_time/posix_time/posix_time.hpp"
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
+#include <algorithm>
 
 DECLARE_LOG_OBJECT()
 
@@ -210,6 +212,40 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
     consumers_.push_back(reader->getConsumer());
 }
 
+void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName,
+                                const ConsumerConfiguration& conf, SubscribeCallback callback) {
+    TopicNamePtr topicNamePtr;
+
+    Lock lock(mutex_);
+    if (state_ != Open) {
+        lock.unlock();
+        callback(ResultAlreadyClosed, Consumer());
+        return;
+    } else {
+        if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) {
+            lock.unlock();
+            callback(ResultInvalidTopicName, Consumer());
+            return;
+        }
+    }
+
+    if (topicNamePtr) {
+        std::string randomName = generateRandomName();
+        std::stringstream consumerTopicNameStream;
+        consumerTopicNameStream << topicNamePtr->toString() << "-TopicsConsumerFakeName-" << randomName;
+        topicNamePtr = TopicName::get(consumerTopicNameStream.str());
+    }
+
+    ConsumerImplBasePtr consumer = boost::make_shared<MultiTopicsConsumerImpl>(
+        shared_from_this(), topics, consumerName, topicNamePtr, conf, lookupServicePtr_);
+
+    consumer->getConsumerCreatedFuture().addListener(
+        boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), _1, _2, callback, consumer));
+    consumers_.push_back(consumer);
+    lock.unlock();
+    consumer->start();
+}
+
 void ClientImpl::subscribeAsync(const std::string& topic, const std::string& consumerName,
                                 const ConsumerConfiguration& conf, SubscribeCallback callback) {
     TopicNamePtr topicName;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 5283b58..550298b 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -57,6 +57,9 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
     void subscribeAsync(const std::string& topic, const std::string& consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback callback);
 
+    void subscribeAsync(const std::vector<std::string>& topics, const std::string& consumerName,
+                        const ConsumerConfiguration& conf, SubscribeCallback callback);
+
     void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback callback);
 
diff --git a/pulsar-client-cpp/lib/MessageId.cc b/pulsar-client-cpp/lib/MessageId.cc
index c5314d8..53946f8 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -130,5 +130,9 @@ bool MessageId::operator==(const MessageId& other) const {
 
 bool MessageId::operator!=(const MessageId& other) const { return !(*this == other); }
 
+const std::string& MessageId::getTopicName() const { return impl_->getTopicName(); }
+
+void MessageId::setTopicName(const std::string& topicName) { return impl_->setTopicName(topicName); }
+
 #pragma GCC visibility pop
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageIdImpl.h b/pulsar-client-cpp/lib/MessageIdImpl.h
index a3fc171..ae33da4 100644
--- a/pulsar-client-cpp/lib/MessageIdImpl.h
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -25,12 +25,25 @@ namespace pulsar {
 
 class MessageIdImpl {
    public:
-    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1) {}
+    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1), topicName_() {}
     MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
-        : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), batchIndex_(batchIndex) {}
+        : ledgerId_(ledgerId),
+          entryId_(entryId),
+          partition_(partition),
+          batchIndex_(batchIndex),
+          topicName_() {}
     const int64_t ledgerId_;
     const int64_t entryId_;
     const int32_t partition_;
     const int32_t batchIndex_;
+
+    const std::string& getTopicName() { return *topicName_; }
+    void setTopicName(const std::string& topicName) { topicName_ = &topicName; }
+
+   private:
+    const std::string* topicName_;
+    friend class MessageImpl;
+    friend class MultiTopicsConsumerImpl;
+    friend class UnAckedMessageTrackerEnabled;
 };
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index 569a30a..9b59eff 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -20,7 +20,7 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0) {}
+MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_() {}
 
 const Message::StringMap& MessageImpl::properties() {
     if (properties_.size() == 0) {
@@ -78,4 +78,12 @@ void MessageImpl::setPartitionKey(const std::string& partitionKey) {
 }
 
 void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { metadata.set_event_time(eventTimestamp); }
+
+void MessageImpl::setTopicName(const std::string& topicName) {
+    topicName_ = &topicName;
+    messageId.setTopicName(topicName);
+}
+
+const std::string& MessageImpl::getTopicName() { return *topicName_; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index f3753b1..0ef63e8 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -43,6 +43,7 @@ class MessageImpl {
     SharedBuffer payload;
     MessageId messageId;
     ClientConnection* cnx_;
+    const std::string* topicName_;
 
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
@@ -50,6 +51,16 @@ class MessageImpl {
     uint64_t getPublishTimestamp() const;
     uint64_t getEventTimestamp() const;
 
+    /**
+     * Get a valid topicName
+     */
+    const std::string& getTopicName();
+
+    /**
+     * Set a valid topicName
+     */
+    void setTopicName(const std::string& topicName);
+
     friend class PulsarWrapper;
     friend class MessageBuilder;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
new file mode 100644
index 0000000..5220307
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.cc
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <boost/date_time/local_time/local_time.hpp>
+#include <algorithm>
+#include <numeric>
+
+using namespace pulsar;
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::DELIMITER = ";";
+
+MultiTopicsBrokerConsumerStatsImpl::MultiTopicsBrokerConsumerStatsImpl(size_t size) {
+    statsList_.resize(size);
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isValid() const {
+    bool isValid = true;
+    for (int i = 0; i < statsList_.size(); i++) {
+        isValid = isValid && statsList_[i].isValid();
+    }
+    return isValid;
+}
+
+std::ostream& operator<<(std::ostream& os, const MultiTopicsBrokerConsumerStatsImpl& obj) {
+    os << "\nMultiTopicsBrokerConsumerStatsImpl ["
+       << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut()
+       << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
+       << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
+       << ", consumerName_ = " << obj.getConsumerName()
+       << ", availablePermits_ = " << obj.getAvailablePermits()
+       << ", unackedMessages_ = " << obj.getUnackedMessages()
+       << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
+       << ", address_ = " << obj.getAddress() << ", connectedSince_ = " << obj.getConnectedSince()
+       << ", type_ = " << obj.getType() << ", msgRateExpired_ = " << obj.getMsgRateExpired()
+       << ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
+    return os;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateOut() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateOut();
+    }
+    return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgThroughputOut() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgThroughputOut();
+    }
+    return sum;
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateRedeliver() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateRedeliver();
+    }
+    return sum;
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConsumerName() const {
+    std::string str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str += statsList_[i].getConsumerName() + DELIMITER;
+    }
+    return str;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getAvailablePermits() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getAvailablePermits();
+    }
+    return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getUnackedMessages() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getUnackedMessages();
+    }
+    return sum;
+}
+
+bool MultiTopicsBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const {
+    if (statsList_.size() == 0) {
+        return false;
+    }
+
+    return isValid();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getAddress() const {
+    std::stringstream str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str << statsList_[i].getAddress() << DELIMITER;
+    }
+    return str.str();
+}
+
+const std::string MultiTopicsBrokerConsumerStatsImpl::getConnectedSince() const {
+    std::stringstream str;
+    for (int i = 0; i < statsList_.size(); i++) {
+        str << statsList_[i].getConnectedSince() << DELIMITER;
+    }
+    return str.str();
+}
+
+const ConsumerType MultiTopicsBrokerConsumerStatsImpl::getType() const {
+    if (!statsList_.size()) {
+        return ConsumerExclusive;
+    }
+    return statsList_[0].getType();
+}
+
+double MultiTopicsBrokerConsumerStatsImpl::getMsgRateExpired() const {
+    double sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgRateExpired();
+    }
+    return sum;
+}
+
+uint64_t MultiTopicsBrokerConsumerStatsImpl::getMsgBacklog() const {
+    uint64_t sum = 0;
+    for (int i = 0; i < statsList_.size(); i++) {
+        sum += statsList_[i].getMsgBacklog();
+    }
+    return sum;
+}
+
+BrokerConsumerStats MultiTopicsBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) {
+    return statsList_[index];
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int index) {
+    statsList_[index] = stats;
+}
+
+void MultiTopicsBrokerConsumerStatsImpl::clear() { statsList_.clear(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
new file mode 100644
index 0000000..568cda1
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsBrokerConsumerStatsImpl.h
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+#define PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
+
+#include <string.h>
+#include <iostream>
+#include <vector>
+#include <pulsar/Result.h>
+#include <boost/function.hpp>
+#include <boost/date_time/microsec_time_clock.hpp>
+#include <lib/BrokerConsumerStatsImpl.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
+#pragma GCC visibility push(default)
+namespace pulsar {
+class MultiTopicsBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
+   private:
+    std::vector<BrokerConsumerStats> statsList_;
+    static const std::string DELIMITER;
+
+   public:
+    MultiTopicsBrokerConsumerStatsImpl(size_t size);
+
+    /** Returns true if the Stats are still valid **/
+    virtual bool isValid() const;
+
+    /** Returns the rate of messages delivered to the consumer. msg/s */
+    virtual double getMsgRateOut() const;
+
+    /** Returns the throughput delivered to the consumer. bytes/s */
+    virtual double getMsgThroughputOut() const;
+
+    /** Returns the rate of messages redelivered by this consumer. msg/s */
+    virtual double getMsgRateRedeliver() const;
+
+    /** Returns the Name of the consumer */
+    virtual const std::string getConsumerName() const;
+
+    /** Returns the Number of available message permits for the consumer */
+    virtual uint64_t getAvailablePermits() const;
+
+    /** Returns the Number of unacknowledged messages for the consumer */
+    virtual uint64_t getUnackedMessages() const;
+
+    /** Returns true if the consumer is blocked due to unacked messages.  */
+    virtual bool isBlockedConsumerOnUnackedMsgs() const;
+
+    /** Returns the Address of this consumer */
+    virtual const std::string getAddress() const;
+
+    /** Returns the Timestamp of connection */
+    virtual const std::string getConnectedSince() const;
+
+    /** Returns Whether this subscription is Exclusive or Shared or Failover */
+    virtual const ConsumerType getType() const;
+
+    /** Returns the rate of messages expired on this subscription. msg/s */
+    virtual double getMsgRateExpired() const;
+
+    /** Returns the Number of messages in the subscription backlog */
+    virtual uint64_t getMsgBacklog() const;
+
+    /** Returns the BrokerConsumerStatsImpl at of ith partition */
+    BrokerConsumerStats getBrokerConsumerStats(int index);
+
+    void add(BrokerConsumerStats stats, int index);
+
+    void clear();
+
+    friend std::ostream &operator<<(std::ostream &os, const MultiTopicsBrokerConsumerStatsImpl &obj);
+};
+typedef boost::shared_ptr<MultiTopicsBrokerConsumerStatsImpl> MultiTopicsBrokerConsumerStatsPtr;
+}  // namespace pulsar
+#pragma GCC visibility pop
+#endif  // PULSAR_CPP_MULTITOPICSBROKERCONSUMERSTATSIMPL_H
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000..7be197c
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
+                                                 const std::string& subscriptionName, TopicNamePtr topicName,
+                                                 const ConsumerConfiguration& conf,
+                                                 const LookupServicePtr lookupServicePtr)
+    : client_(client),
+      subscriptionName_(subscriptionName),
+      topic_(topicName ? topicName->toString() : "EmptyTopics"),
+      conf_(conf),
+      state_(Pending),
+      messages_(1000),
+      listenerExecutor_(client->getListenerExecutorProvider()->get()),
+      messageListener_(conf.getMessageListener()),
+      namespaceName_(topicName ? topicName->getNamespaceName() : boost::shared_ptr<NamespaceName>()),
+      lookupServicePtr_(lookupServicePtr),
+      numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)),
+      topics_(topics) {
+    std::stringstream consumerStrStream;
+    consumerStrStream << "[Muti Topics Consumer: "
+                      << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
+    consumerStr_ = consumerStrStream.str();
+
+    if (conf.getUnAckedMessagesTimeoutMs() != 0) {
+        unAckedMessageTrackerPtr_.reset(
+            new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+    } else {
+        unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
+    }
+}
+
+void MultiTopicsConsumerImpl::start() {
+    if (topics_.empty()) {
+        if (compareAndSetState(Pending, Ready)) {
+            LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
+            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            return;
+        } else {
+            LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_);
+            multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+            return;
+        }
+    }
+
+    // start call subscribeOneTopicAsync for each single topic
+    int topicsNumber = topics_.size();
+    boost::shared_ptr<std::atomic<int>> topicsNeedCreate = boost::make_shared<std::atomic<int>>(topicsNumber);
+    // subscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
+        subscribeOneTopicAsync(*itr).addListener(
+            boost::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed, shared_from_this(), _1, _2, *itr,
+                        topicsNeedCreate));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
+                                                       const std::string& topic,
+                                                       boost::shared_ptr<std::atomic<int>> topicsNeedCreate) {
+    int previous = topicsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
+    }
+
+    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
+
+    if (topicsNeedCreate->load() == 0) {
+        if (compareAndSetState(Pending, Ready)) {
+            LOG_INFO("Successfully Subscribed to Topics");
+            if (!namespaceName_) {
+                namespaceName_ = TopicName::get(topic)->getNamespaceName();
+            }
+            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+        } else {
+            LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
+            // unsubscribed all of the successfully subscribed partitioned consumers
+            ResultCallback nullCallbackForCleanup = NULL;
+            closeAsync(nullCallbackForCleanup);
+            multiTopicsConsumerCreatedPromise_.setFailed(result);
+            return;
+        }
+        return;
+    }
+}
+
+// subscribe for passed in topic
+Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const std::string& topic) {
+    TopicNamePtr topicName;
+    ConsumerSubResultPromisePtr topicPromise = boost::make_shared<Promise<Result, Consumer>>();
+    if (!(topicName = TopicName::get(topic))) {
+        LOG_ERROR("TopicName invalid: " << topic);
+        topicPromise->setFailed(ResultInvalidTopicName);
+        return topicPromise->getFuture();
+    }
+
+    if (namespaceName_ && !(*namespaceName_ == *(topicName->getNamespaceName()))) {
+        LOG_ERROR("TopicName namespace not the same with topicsConsumer. wanted namespace: "
+                  << namespaceName_->toString() << " this topic: " << topic);
+        topicPromise->setFailed(ResultInvalidTopicName);
+        return topicPromise->getFuture();
+    }
+
+    if (state_ == Closed || state_ == Closing) {
+        LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
+        topicPromise->setFailed(ResultAlreadyClosed);
+        return topicPromise->getFuture();
+    }
+
+    // subscribe for each partition, when all partitions completed, complete promise
+    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+        boost::bind(&MultiTopicsConsumerImpl::subscribeTopicPartitions, shared_from_this(), _1, _2, topicName,
+                    subscriptionName_, conf_, topicPromise));
+    return topicPromise->getFuture();
+}
+
+void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
+                                                       const LookupDataResultPtr partitionMetadata,
+                                                       TopicNamePtr topicName,
+                                                       const std::string& consumerName,
+                                                       ConsumerConfiguration conf,
+                                                       ConsumerSubResultPromisePtr topicSubResultPromise) {
+    if (result != ResultOk) {
+        LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
+                  << consumerStr_ << " result: " << result)
+        topicSubResultPromise->setFailed(result);
+        return;
+    }
+
+    boost::shared_ptr<ConsumerImpl> consumer;
+    ConsumerConfiguration config;
+    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
+
+    // all the consumers should have same name.
+    config.setConsumerName(conf_.getConsumerName());
+    config.setConsumerType(conf_.getConsumerType());
+    config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
+    config.setMessageListener(
+        boost::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), _1, _2));
+    config.setReceiverQueueSize(conf_.getReceiverQueueSize());
+
+    int numPartitions = partitionMetadata->getPartitions() >= 1 ? partitionMetadata->getPartitions() : 1;
+
+    Lock lock(mutex_);
+    topicsPartitions_.insert(std::make_pair(topicName->toString(), numPartitions));
+    lock.unlock();
+    numberTopicPartitions_->fetch_add(numPartitions);
+
+    boost::shared_ptr<std::atomic<int>> partitionsNeedCreate =
+        boost::make_shared<std::atomic<int>>(numPartitions);
+
+    for (int i = 0; i < numPartitions; i++) {
+        std::string topicPartitionName = topicName->getTopicPartitionName(i);
+        consumer = boost::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+                                                    internalListenerExecutor, Partitioned);
+        consumer->getConsumerCreatedFuture().addListener(
+            boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), _1, _2,
+                        partitionsNeedCreate, topicSubResultPromise));
+        consumer->setPartitionIndex(i);
+        consumers_.insert(std::make_pair(topicPartitionName, consumer));
+        LOG_DEBUG("Create Consumer for - " << topicPartitionName << " - " << consumerStr_);
+        consumer->start();
+    }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
+    Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+    boost::shared_ptr<std::atomic<int>> partitionsNeedCreate,
+    ConsumerSubResultPromisePtr topicSubResultPromise) {
+    if (state_ == Failed) {
+        // one of the consumer creation failed, and we are cleaning up
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        LOG_ERROR("Unable to create Consumer " << consumerStr_ << " state == Failed, result: " << result);
+        return;
+    }
+
+    int previous = partitionsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        topicSubResultPromise->setFailed(result);
+        LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
+        return;
+    }
+
+    LOG_DEBUG("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
+              << "Partitions need to create - " << previous - 1);
+
+    if (partitionsNeedCreate->load() == 0) {
+        topicSubResultPromise->setValue(Consumer(shared_from_this()));
+    }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+    LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
+
+    Lock lock(mutex_);
+    if (state_ == Closing || state_ == Closed) {
+        LOG_INFO(consumerStr_ << " already closed");
+        lock.unlock();
+        callback(ResultAlreadyClosed);
+        return;
+    }
+    state_ = Closing;
+    lock.unlock();
+
+    boost::shared_ptr<std::atomic<int>> consumerUnsubed = boost::make_shared<std::atomic<int>>(0);
+
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        LOG_DEBUG("Unsubcribing Consumer - " << consumer->first);
+        (consumer->second)
+            ->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync,
+                                           shared_from_this(), _1, consumerUnsubed, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
+                                                      boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                                      ResultCallback callback) {
+    int previous = consumerUnsubed->fetch_add(1);
+    assert(previous < numberTopicPartitions_->load());
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
+                  << result << " subscription - " << subscriptionName_);
+    }
+
+    if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
+        LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer.  - " << consumerStr_);
+        consumers_.clear();
+        topicsPartitions_.clear();
+        unAckedMessageTrackerPtr_->clear();
+
+        Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
+        setState(Closed);
+        callback(result1);
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
+    std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
+    if (it == topicsPartitions_.end()) {
+        LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - "
+                                                               << subscriptionName_);
+        callback(ResultTopicNotFound);
+        return;
+    }
+
+    if (state_ == Closing || state_ == Closed) {
+        LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
+                                                                           << subscriptionName_);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    TopicNamePtr topicName;
+    if (!(topicName = TopicName::get(topic))) {
+        LOG_ERROR("TopicName invalid: " << topic);
+        callback(ResultUnknownError);
+    }
+    int numberPartitions = it->second;
+    boost::shared_ptr<std::atomic<int>> consumerUnsubed = boost::make_shared<std::atomic<int>>(0);
+
+    for (int i = 0; i < numberPartitions; i++) {
+        std::string topicPartitionName = topicName->getTopicPartitionName(i);
+        std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
+
+        if (consumers_.end() == iterator) {
+            LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
+            callback(ResultUnknownError);
+        }
+
+        (iterator->second)
+            ->unsubscribeAsync(boost::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync,
+                                           shared_from_this(), _1, consumerUnsubed, numberPartitions,
+                                           topicName, topicPartitionName, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
+    Result result, boost::shared_ptr<std::atomic<int>> consumerUnsubed, int numberPartitions,
+    TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback) {
+    int previous = consumerUnsubed->fetch_add(1);
+    assert(previous < numberPartitions);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
+                  << result << " topicPartitionName - " << topicPartitionName);
+    }
+
+    LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
+
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
+    if (consumers_.end() != iterator) {
+        iterator->second->pauseMessageListener();
+        consumers_.erase(iterator);
+    }
+
+    if (consumerUnsubed->load() == numberPartitions) {
+        LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer.  - " << consumerStr_);
+        std::map<std::string, int>::iterator it = topicsPartitions_.find(topicNamePtr->toString());
+        if (it != topicsPartitions_.end()) {
+            numberTopicPartitions_->fetch_sub(numberPartitions);
+            Lock lock(mutex_);
+            topicsPartitions_.erase(it);
+            lock.unlock();
+        }
+        if (state_ != Failed) {
+            callback(ResultOk);
+        } else {
+            callback(ResultUnknownError);
+        }
+        unAckedMessageTrackerPtr_->removeTopicMessage(topicNamePtr->toString());
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    if (state_ == Closing || state_ == Closed) {
+        LOG_ERROR("TopicsConsumer already closed "
+                  << " topic" << topic_ << " consumer - " << consumerStr_);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    setState(Closing);
+
+    if (consumers_.empty()) {
+        LOG_ERROR("TopicsConsumer have no consumers to close "
+                  << " topic" << topic_ << " subscription - " << subscriptionName_);
+        setState(Closed);
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    // close successfully subscribed consumers
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        std::string topicPartitionName = consumer->first;
+        ConsumerImplPtr consumerPtr = consumer->second;
+
+        consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
+                                            shared_from_this(), _1, topicPartitionName, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName,
+                                                        CloseCallback callback) {
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
+    if (consumers_.end() != iterator) {
+        consumers_.erase(iterator);
+    }
+
+    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
+                                                      << numberTopicPartitions_->load());
+
+    assert(numberTopicPartitions_->load() > 0);
+    numberTopicPartitions_->fetch_sub(1);
+
+    if (result != ResultOk) {
+        setState(Failed);
+        LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
+                                                                 << result);
+    }
+
+    // closed all consumers
+    if (numberTopicPartitions_->load() == 0) {
+        consumers_.clear();
+        topicsPartitions_.clear();
+        unAckedMessageTrackerPtr_->clear();
+
+        if (state_ != Failed) {
+            state_ = Closed;
+        }
+
+        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+        if (!callback.empty()) {
+            callback(result);
+        }
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
+    LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
+                                                          << " message:" << msg.getDataAsString());
+    const std::string& topicPartitionName = consumer.getTopic();
+    msg.impl_->setTopicName(topicPartitionName);
+    messages_.push(msg);
+
+    if (messageListener_) {
+        listenerExecutor_->postWork(
+            boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
+    }
+}
+
+void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
+    Message m;
+    messages_.pop(m);
+
+    try {
+        messageListener_(Consumer(shared_from_this()), m);
+    } catch (const std::exception& e) {
+        LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
+    }
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        return ResultAlreadyClosed;
+    }
+
+    if (messageListener_) {
+        lock.unlock();
+        LOG_ERROR("Can not receive when a listener has been set");
+        return ResultInvalidConfiguration;
+    }
+    messages_.pop(msg);
+    lock.unlock();
+
+    unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        return ResultAlreadyClosed;
+    }
+
+    if (messageListener_) {
+        lock.unlock();
+        LOG_ERROR("Can not receive when a listener has been set");
+        return ResultInvalidConfiguration;
+    }
+
+    if (messages_.pop(msg, milliseconds(timeout))) {
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        return ResultOk;
+    } else {
+        return ResultTimeout;
+    }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    const std::string& topicPartitionName = msgId.getTopicName();
+    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
+
+    if (consumers_.end() != iterator) {
+        unAckedMessageTrackerPtr_->remove(msgId);
+        iterator->second->acknowledgeAsync(msgId, callback);
+    } else {
+        LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
+        callback(ResultUnknownError);
+        return;
+    }
+}
+
+void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
+    callback(ResultOperationNotSupported);
+}
+
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+
+Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
+    return multiTopicsConsumerCreatedPromise_.getFuture();
+}
+const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
+
+const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
+
+const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
+
+void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
+    Lock lock(mutex_);
+    state_ = state;
+}
+
+bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect,
+                                                 MultiTopicsConsumerState update) {
+    Lock lock(mutex_);
+    if (state_ == expect) {
+        state_ = update;
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void MultiTopicsConsumerImpl::shutdown() {}
+
+bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
+
+bool MultiTopicsConsumerImpl::isOpen() {
+    Lock lock(mutex_);
+    return state_ == Ready;
+}
+
+void MultiTopicsConsumerImpl::receiveMessages() {
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        ConsumerImplPtr consumerPtr = consumer->second;
+        consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
+        LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId());
+    }
+}
+
+Result MultiTopicsConsumerImpl::pauseMessageListener() {
+    if (!messageListener_) {
+        return ResultInvalidConfiguration;
+    }
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        (consumer->second)->pauseMessageListener();
+    }
+    return ResultOk;
+}
+
+Result MultiTopicsConsumerImpl::resumeMessageListener() {
+    if (!messageListener_) {
+        return ResultInvalidConfiguration;
+    }
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        (consumer->second)->resumeMessageListener();
+    }
+    return ResultOk;
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
+    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
+    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        (consumer->second)->redeliverUnacknowledgedMessages();
+    }
+}
+
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
+
+void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        callback(ResultConsumerNotInitialized, BrokerConsumerStats());
+        return;
+    }
+    MultiTopicsBrokerConsumerStatsPtr statsPtr =
+        boost::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
+    LatchPtr latchPtr = boost::make_shared<Latch>(numberTopicPartitions_->load());
+    int size = consumers_.size();
+    lock.unlock();
+
+    ConsumerMap::const_iterator consumer = consumers_.begin();
+    for (int i = 0; i < size; i++, consumer++) {
+        consumer->second->getBrokerConsumerStatsAsync(
+            boost::bind(&MultiTopicsConsumerImpl::handleGetConsumerStats, shared_from_this(), _1, _2,
+                        latchPtr, statsPtr, i, callback));
+    }
+}
+
+void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
+                                                     LatchPtr latchPtr,
+                                                     MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
+                                                     BrokerConsumerStatsCallback callback) {
+    Lock lock(mutex_);
+    if (res == ResultOk) {
+        latchPtr->countdown();
+        statsPtr->add(brokerConsumerStats, index);
+    } else {
+        lock.unlock();
+        callback(res, BrokerConsumerStats());
+        return;
+    }
+    if (latchPtr->getCount() == 0) {
+        lock.unlock();
+        callback(ResultOk, BrokerConsumerStats(statsPtr));
+    }
+}
+
+boost::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(
+    const std::vector<std::string>& topics) {
+    TopicNamePtr topicNamePtr = boost::shared_ptr<TopicName>();
+    NamespaceNamePtr namespaceNamePtr = boost::shared_ptr<NamespaceName>();
+
+    // all topics name valid, and all topics have same namespace
+    for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) {
+        // topic name valid
+        if (!(topicNamePtr = TopicName::get(*itr))) {
+            LOG_ERROR("Topic name invalid when init " << *itr);
+            return boost::shared_ptr<TopicName>();
+        }
+
+        // all contains same namespace part
+        if (!namespaceNamePtr) {
+            namespaceNamePtr = topicNamePtr->getNamespaceName();
+        } else if (!(*namespaceNamePtr == *(topicNamePtr->getNamespaceName()))) {
+            LOG_ERROR("Different namespace name. expected: " << namespaceNamePtr->toString() << " now:"
+                                                             << topicNamePtr->getNamespaceName()->toString());
+            return boost::shared_ptr<TopicName>();
+        }
+    }
+
+    return topicNamePtr;
+}
+
+void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
+    callback(ResultOperationNotSupported);
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
new file mode 100644
index 0000000..6425687
--- /dev/null
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include "BlockingQueue.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include "boost/enable_shared_from_this.hpp"
+#include "ConsumerImplBase.h"
+#include "lib/UnAckedMessageTrackerDisabled.h"
+#include <lib/Latch.h>
+#include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+
+namespace pulsar {
+typedef boost::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
+typedef boost::function<void(Result result)> ResultCallback;
+
+class MultiTopicsConsumerImpl;
+class MultiTopicsConsumerImpl : public ConsumerImplBase,
+                                public boost::enable_shared_from_this<MultiTopicsConsumerImpl> {
+   public:
+    enum MultiTopicsConsumerState
+    {
+        Pending,
+        Ready,
+        Closing,
+        Closed,
+        Failed
+    };
+    MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
+                            const std::string& subscriptionName, TopicNamePtr topicName,
+                            const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_);
+    virtual ~MultiTopicsConsumerImpl();
+    virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
+    virtual const std::string& getSubscriptionName() const;
+    virtual const std::string& getTopic() const;
+    virtual const std::string& getName() const;
+    virtual Result receive(Message& msg);
+    virtual Result receive(Message& msg, int timeout);
+    virtual void unsubscribeAsync(ResultCallback callback);
+    virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
+    virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
+    virtual void closeAsync(ResultCallback callback);
+    virtual void start();
+    virtual void shutdown();
+    virtual bool isClosed();
+    virtual bool isOpen();
+    virtual Result pauseMessageListener();
+    virtual Result resumeMessageListener();
+    virtual void redeliverUnacknowledgedMessages();
+    virtual int getNumOfPrefetchedMessages() const;
+    virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
+    void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
+                                size_t, BrokerConsumerStatsCallback);
+    // return first topic name when all topics name valid, or return null pointer
+    static boost::shared_ptr<TopicName> topicNamesValid(const std::vector<std::string>& topics);
+    void unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback);
+    Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
+    // not supported
+    virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
+
+   private:
+    const ClientImplPtr client_;
+    const std::string subscriptionName_;
+    std::string consumerStr_;
+    std::string topic_;
+    NamespaceNamePtr namespaceName_;
+    const ConsumerConfiguration conf_;
+    typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
+    ConsumerMap consumers_;
+    std::map<std::string, int> topicsPartitions_;
+    boost::mutex mutex_;
+    MultiTopicsConsumerState state_;
+    boost::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+    LookupServicePtr lookupServicePtr_;
+    BlockingQueue<Message> messages_;
+    ExecutorServicePtr listenerExecutor_;
+    MessageListener messageListener_;
+    Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
+    UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    const std::vector<std::string>& topics_;
+
+    /* methods */
+    void setState(MultiTopicsConsumerState state);
+    bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update);
+
+    void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+                                              unsigned int partitionIndex);
+    void handleSingleConsumerClose(Result result, std::string& topicPartitionName, CloseCallback callback);
+    void notifyResult(CloseCallback closeCallback);
+    void messageReceived(Consumer consumer, const Message& msg);
+    void internalListener(Consumer consumer);
+    void receiveMessages();
+
+    void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
+                                  boost::shared_ptr<std::atomic<int>> topicsNeedCreate);
+    void subscribeTopicPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
+                                  TopicNamePtr topicName, const std::string& consumerName,
+                                  ConsumerConfiguration conf,
+                                  ConsumerSubResultPromisePtr topicSubResultPromise);
+    void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
+                                     boost::shared_ptr<std::atomic<int>> partitionsNeedCreate,
+                                     ConsumerSubResultPromisePtr topicSubResultPromise);
+    void handleUnsubscribedAsync(Result result, boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                 ResultCallback callback);
+    void handleOneTopicUnsubscribedAsync(Result result, boost::shared_ptr<std::atomic<int>> consumerUnsubed,
+                                         int numberPartitions, TopicNamePtr topicNamePtr,
+                                         std::string& topicPartitionName, ResultCallback callback);
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/lib/NamespaceName.cc b/pulsar-client-cpp/lib/NamespaceName.cc
index caa5b79..273fc22 100644
--- a/pulsar-client-cpp/lib/NamespaceName.cc
+++ b/pulsar-client-cpp/lib/NamespaceName.cc
@@ -27,6 +27,7 @@
 #include <sstream>
 
 DECLARE_LOG_OBJECT()
+namespace pulsar {
 
 boost::shared_ptr<NamespaceName> NamespaceName::get(const std::string& property, const std::string& cluster,
                                                     const std::string& namespaceName) {
@@ -103,3 +104,7 @@ std::string NamespaceName::getCluster() { return this->cluster_; }
 std::string NamespaceName::getLocalName() { return this->localName_; }
 
 bool NamespaceName::isV2() { return this->cluster_.empty(); }
+
+std::string NamespaceName::toString() { return this->namespace_; }
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NamespaceName.h b/pulsar-client-cpp/lib/NamespaceName.h
index 1f121bb..d5bdf99 100644
--- a/pulsar-client-cpp/lib/NamespaceName.h
+++ b/pulsar-client-cpp/lib/NamespaceName.h
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 
 #pragma GCC visibility push(default)
+namespace pulsar {
 
 class NamespaceName : public ServiceUnitId {
    public:
@@ -38,6 +39,7 @@ class NamespaceName : public ServiceUnitId {
                                                 const std::string& namespaceName);
     bool operator==(const NamespaceName& namespaceName);
     bool isV2();
+    std::string toString();
 
    private:
     std::string namespace_;
@@ -51,6 +53,9 @@ class NamespaceName : public ServiceUnitId {
     NamespaceName(const std::string& property, const std::string& namespace_);
 };
 
+typedef boost::shared_ptr<NamespaceName> NamespaceNamePtr;
+
+}  // namespace pulsar
 #pragma GCC visibility pop
 
 #endif
diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc
index 533f91e..9186752 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -228,4 +228,7 @@ const std::string TopicName::getTopicPartitionName(unsigned int partition) {
     topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition;
     return topicPartitionName.str();
 }
+
+NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h
index 91b7994..1949d94 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -51,6 +51,7 @@ class TopicName : public ServiceUnitId {
     std::string getLocalName();
     std::string getEncodedLocalName();
     std::string toString();
+    NamespaceNamePtr getNamespaceName();
     static boost::shared_ptr<TopicName> get(const std::string& topicName);
     bool operator==(const TopicName& other);
     static std::string getEncodedName(const std::string& nameBeforeEncoding);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
index 62cf86e..c25c1a5 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerDisabled.h
@@ -26,6 +26,7 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
     bool add(const MessageId& m) { return false; }
     bool remove(const MessageId& m) { return false; }
     void removeMessagesTill(const MessageId& msgId) {}
+    void removeTopicMessage(const std::string& topic) {}
 
     void clear() {}
 };
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 90006b6..ba9fc97 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -100,6 +100,26 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     }
 }
 
+// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
+void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
+    for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
+        const std::string& topicPartitionName = it->getTopicName();
+        if (topicPartitionName.find(topic) != std::string::npos) {
+            oldSet_.erase(it++);
+        } else {
+            it++;
+        }
+    }
+    for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
+        const std::string& topicPartitionName = it->getTopicName();
+        if (topicPartitionName.find(topic) != std::string::npos) {
+            currentSet_.erase(it++);
+        } else {
+            it++;
+        }
+    }
+}
+
 void UnAckedMessageTrackerEnabled::clear() {
     currentSet_.clear();
     oldSet_.clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 217ee0b..7bea00d 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     bool add(const MessageId& m);
     bool remove(const MessageId& m);
     void removeMessagesTill(const MessageId& msgId);
+    void removeTopicMessage(const std::string& topic);
     void timeoutHandler();
 
     void clear();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index d010dd0..798ccb4 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -44,6 +44,9 @@ class UnAckedMessageTrackerInterface {
     virtual bool remove(const MessageId& m) = 0;
     virtual void removeMessagesTill(const MessageId& msgId) = 0;
     virtual void clear() = 0;
+    // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's
+    // message.
+    virtual void removeTopicMessage(const std::string& topic) = 0;
 };
 
 typedef boost::scoped_ptr<UnAckedMessageTrackerInterface> UnAckedMessageTrackerScopedPtr;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 023a587..cf28b34 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -33,6 +33,7 @@
 #include "HttpHelper.h"
 #include <set>
 #include <vector>
+#include <lib/MultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
@@ -45,7 +46,6 @@ static int globalCount = 0;
 static long globalResendMessageCount = 0;
 static std::string lookupUrl = "pulsar://localhost:8885";
 static std::string adminUrl = "http://localhost:8765/";
-
 static void messageListenerFunction(Consumer consumer, const Message& msg) {
     globalCount++;
     consumer.acknowledge(msg);
@@ -1508,3 +1508,174 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
     producer.close();
     client.close();
 }
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerTopicNameInvalid) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsTopicNameInvalid";
+    // cluster empty
+    std::string topicName1 = "persistent://prop/testMultiTopicsTopicNameInvalid";
+
+    // empty topics
+    ASSERT_EQ(0, topicNames.size());
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    LOG_INFO("subscribe on empty topics");
+    consumer.close();
+
+    // Invalid topic names
+    Consumer consumer1;
+    std::string subName1 = "testMultiTopicsTopicNameInvalid";
+    topicNames.push_back(topicName1);
+    Promise<Result, Consumer> consumerPromise1;
+    client.subscribeAsync(topicNames, subName1, consConfig, WaitForCallbackValue<Consumer>(consumerPromise1));
+    Future<Result, Consumer> consumerFuture1 = consumerPromise1.getFuture();
+    result = consumerFuture1.get(consumer1);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+    LOG_INFO("subscribe on TopicName1 failed");
+    consumer1.close();
+
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testMultiTopicsConsumerDifferentNamespace) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsDifferentNamespace";
+    std::string topicName1 = "persistent://prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1";
+    std::string topicName2 = "persistent://prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2";
+    std::string topicName3 = "persistent://prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3";
+
+    topicNames.push_back(topicName1);
+    topicNames.push_back(topicName2);
+    topicNames.push_back(topicName3);
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + "admin/persistent/prop/unit/ns1/testMultiTopicsConsumerDifferentNamespace1/partitions";
+    std::string url2 =
+        adminUrl + "admin/persistent/prop/unit/ns2/testMultiTopicsConsumerDifferentNamespace2/partitions";
+    std::string url3 =
+        adminUrl + "admin/persistent/prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    // empty topics
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+    LOG_INFO("subscribe on topics with different names should fail");
+    consumer.close();
+
+    client.shutdown();
+}
+
+// Test subscribe 3 topics using MultiTopicsConsumer
+TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
+    Client client(lookupUrl);
+    std::vector<std::string> topicNames;
+    topicNames.reserve(3);
+    std::string subName = "testMultiTopicsConsumer";
+    std::string topicName1 = "persistent://prop/unit/ns/testMultiTopicsConsumer1";
+    std::string topicName2 = "persistent://prop/unit/ns/testMultiTopicsConsumer2";
+    std::string topicName3 = "persistent://prop/unit/ns/testMultiTopicsConsumer3";
+
+    topicNames.push_back(topicName1);
+    topicNames.push_back(topicName2);
+    topicNames.push_back(topicName3);
+
+    // call admin api to make topics partitioned
+    std::string url1 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer1/partitions";
+    std::string url2 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer2/partitions";
+    std::string url3 = adminUrl + "admin/persistent/prop/unit/ns/testMultiTopicsConsumer3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 3 producers");
+
+    int messageNumber = 100;
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);  // size for each sub-consumer
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicNames, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created topics consumer on 3 topics");
+
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 10000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+
+    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
diff --git a/pulsar-client-cpp/tests/NamespaceNameTest.cc b/pulsar-client-cpp/tests/NamespaceNameTest.cc
index 84ccb13..5c214e2 100644
--- a/pulsar-client-cpp/tests/NamespaceNameTest.cc
+++ b/pulsar-client-cpp/tests/NamespaceNameTest.cc
@@ -19,6 +19,7 @@
 #include <NamespaceName.h>
 
 #include <gtest/gtest.h>
+using namespace pulsar;
 
 TEST(NamespaceNameTest, testNamespaceName) {
     boost::shared_ptr<NamespaceName> nn1 = NamespaceName::get("property", "cluster", "namespace");


Mime
View raw message