qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: PROTON-1557: c++ improve multi-threaded clients [Forced Update!]
Date Mon, 28 Aug 2017 22:00:33 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master a5f88a1cc -> 398f786ba (forced update)


PROTON-1557: c++ improve multi-threaded clients

2 clients:
- multithreaded_client.cpp: simple send thread, receive thread, run thread
- multithreaded_client_flow_control: multi-connection, block for flow control

Changes:
- reduced needless diff between examples
- use separate work_queue* to clarify separate thread safety rules from sender
- took work_queue->add() out of lock to emphasize it is thread safe
- use fixed argument list, same arg order


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/398f786b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/398f786b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/398f786b

Branch: refs/heads/master
Commit: 398f786ba94857b00d052c03232f8f9ef97f3f44
Parents: f1ee268
Author: Alan Conway <aconway@redhat.com>
Authored: Mon Aug 28 11:53:35 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Mon Aug 28 18:00:24 2017 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   3 +-
 examples/cpp/multithreaded_client.cpp           | 185 ++++++++++++
 .../cpp/multithreaded_client_flow_control.cpp   | 287 +++++++++++++++++++
 examples/cpp/send_recv_mt.cpp                   | 269 -----------------
 4 files changed, 474 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index df9f6a7..d116913 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -63,7 +63,8 @@ if(HAS_CPP11)
   # Examples that require C++11
   foreach(example
       scheduled_send
-      send_recv_mt
+      multithreaded_client
+      multithreaded_client_flow_control
       )
     add_executable(${example} ${example}.cpp)
   endforeach()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
new file mode 100644
index 0000000..955655c
--- /dev/null
+++ b/examples/cpp/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 only
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+    // Invariant
+    const std::string url_;
+    const std::string address_;
+
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    std::queue<proton::message> messages_;
+    std::condition_variable messages_ready_;
+
+  public:
+    client(const std::string& url, const std::string& address) : url_(url), address_(address)
{}
+
+    // Thread safe
+    void send(const proton::message& msg) {
+        // Use [=] to copy the message, we cannot pass it by reference since it
+        // will be used in another thread.
+        work_queue()->add([=]() { sender_.send(msg); });
+    }
+
+    // Thread safe
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        while (messages_.empty()) messages_ready_.wait(l);
+        auto msg = std::move(messages_.front());
+        messages_.pop();
+        return msg;
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton hander thread
+
+    // Note: this example creates a connection when the container starts.
+    // To create connections after the container has started, use
+    // container::connect().
+    // See @ref multithreaded_client_flow_control.cpp for an example.
+    void on_container_start(proton::container& cont) override {
+        cont.connect(url_);
+    }
+
+    void on_connection_open(proton::connection& conn) override {
+        conn.open_sender(address_);
+        conn.open_receiver(address_);
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        {
+            // sender_ and work_queue_ must be set atomically
+            std::lock_guard<std::mutex> l(lock_);
+            sender_ = s;
+            work_queue_ = &s.work_queue();
+        }
+        sender_ready_.notify_all();
+    }
+
+    void on_message(proton::delivery& dlv, proton::message& msg) override {
+        {
+            std::lock_guard<std::mutex> l(lock_);
+            messages_.push(msg);
+        }
+        messages_ready_.notify_all();
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+int main(int argc, const char** argv) {
+    try {
+        if (argc != 4) {
+            std ::cerr <<
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n";
+            return 1;
+        }
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+
+        client cl(url, address);
+        proton::container container(cl);
+        std::thread container_thread([&]() { container.run(); });
+
+        std::thread sender([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    proton::message msg(std::to_string(i + 1));
+                    cl.send(msg);
+                    std::cout << "sent: " << msg.body() << std::endl;
+                }
+            });
+
+        int received = 0;
+        std::thread receiver([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    auto msg = cl.receive();
+                    std::cout << "received: " << msg.body() << std::endl;
+                    ++received;
+                }
+            });
+
+        sender.join();
+        receiver.join();
+        cl.close();
+        container_thread.join();
+        std::cout << "received " << received << " messages" << std::endl;
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..9eec782
--- /dev/null
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// C++11 only
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    int queued_;                       // Queued messages waiting to be sent
+    int credit_;                       // AMQP credit - number of messages we can send
+
+  public:
+    sender(proton::container& cont, const std::string& url, const std::string&
address)
+        : work_queue_(0), queued_(0), credit_(0)
+    {
+        cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+    }
+
+    // Thread safe
+    void send(const proton::message& m) {
+        {
+            std::unique_lock<std::mutex> l(lock_);
+            // Don't queue up more messages than we have credit for
+            while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+            ++queued_;
+        }
+        work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton hander thread
+
+    void on_sender_open(proton::sender& s) override {
+        // Make sure sender_ and work_queue_ are set atomically
+        std::lock_guard<std::mutex> l(lock_);
+        sender_ = s;
+        work_queue_ = &s.work_queue();
+    }
+
+    void on_sendable(proton::sender& s) override {
+        std::lock_guard<std::mutex> l(lock_);
+        credit_ = s.credit();
+        sender_ready_.notify_all(); // Notify senders we have credit
+    }
+
+    // work_queue work items is are automatically dequeued and called by proton
+    // This function is called because it was queued by send()
+    void do_send(const proton::message& m) {
+        sender_.send(m);
+        std::lock_guard<std::mutex> l(lock_);
+        --queued_;                    // work item was consumed from the work_queue
+        credit_ = sender_.credit();   // update credit
+        sender_ready_.notify_all();       // Notify senders we have space on queue
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+    // Used in proton threads only
+    proton::receiver receiver_;
+
+    // Used in proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue* work_queue_;
+    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+    std::condition_variable can_receive_; // Notify receivers of messages
+
+  public:
+
+    // Connect to url
+    receiver(proton::container& cont, const std::string& url, const std::string&
address)
+        : work_queue_()
+    {
+        // NOTE:credit_window(0) disables automatic flow control.
+        // We will use flow control to match AMQP credit to buffer capacity.
+        cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+                           proton::connection_options().handler(*this));
+    }
+
+    // Thread safe receive
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        // Wait for buffered messages
+        while (!work_queue_ || buffer_.empty())
+            can_receive_.wait(l);
+        proton::message m = std::move(buffer_.front());
+        buffer_.pop();
+        // Add a lambda to the work queue to call receive_done().
+        // This will tell the handler to add more credit.
+        work_queue_->add([=]() { this->receive_done(); });
+        return m;
+    }
+
+    void close() {
+        std::lock_guard<std::mutex> l(lock_);
+        if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close();
});
+    }
+
+  private:
+    // ==== The following are called by proton threads only.
+
+    void on_receiver_open(proton::receiver& r) override {
+        receiver_ = r;
+        std::lock_guard<std::mutex> l(lock_);
+        work_queue_ = &receiver_.work_queue();
+        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // Proton automatically reduces credit by 1 before calling on_message
+        std::lock_guard<std::mutex> l(lock_);
+        buffer_.push(m);
+        can_receive_.notify_all();
+    }
+
+    // called via work_queue
+    void receive_done() {
+        // Add 1 credit, a receiver has taken a message out of the buffer.
+        receiver_.add_credit(1);
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        std::cerr << "unexpected error: " << e << std::endl;
+        exit(1);
+    }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n, bool print) {
+    auto id = std::this_thread::get_id();
+    for (int i = 0; i < n; ++i) {
+        std::ostringstream ss;
+        ss << std::this_thread::get_id() << ":" << i;
+        s.send(proton::message(ss.str()));
+        if (print) std::cout << "received: " << ss.str() << std::endl;
+    }
+    std::cout << id << " sent " << n << std::endl;
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+    auto id = std::this_thread::get_id();
+    int n = 0;
+    while (remaining-- > 0) {
+        auto m = r.receive();
+        ++n;
+        if (print) std::cout << id << "received: " << m.body() <<
std::endl;
+    }
+    std::cout << id << " received " << n << " messages" <<
std::endl;
+}
+
+int main(int argc, const char **argv) {
+    try {
+        if (argc != 5) {
+            std::cerr <<
+                "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n"
+                "THREAD-COUNT: number of sender/receiver thread pairs\n";
+            return 1;
+        }
+
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+        int n_threads = atoi(argv[4]);
+
+        // Total messages to be received, multiple receiver threads will decrement this.
+        std::atomic_int remaining(n_messages * n_threads);
+        bool print = remaining < 1000; // Don't print for long runs, dominates run time
+
+        // Run the proton container
+        proton::container container;
+        auto container_thread = std::thread([&]() { container.run(); });
+
+        // A single sender and receiver to be shared by all the threads
+        sender send(container, url, address);
+        receiver recv(container, url, address);
+
+        // Start receiver threads, then sender threads.
+        // Starting receivers first gives all receivers a chance to compete for messages.
+        std::vector<std::thread> threads;
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print);
}));
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { send_thread(send, n_messages, print);
}));
+
+        // Wait for threads to finish
+        for (auto& t : threads)
+            t.join();
+        send.close();
+        recv.close();
+
+        container_thread.join();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/398f786b/examples/cpp/send_recv_mt.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp
deleted file mode 100644
index addcbaf..0000000
--- a/examples/cpp/send_recv_mt.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// C++11 only
-//
-// A multi-threaded client that sends and receives messages from multiple AMQP
-// addresses.
-//
-// Demonstrates how to:
-//
-// - implement proton handlers that interact with user threads safely
-// - block user threads calling send() to respect AMQP flow control
-// - use AMQP flow control to limit message buffering for receivers
-//
-// We define mt_sender and mt_receiver classes with simple, thread-safe blocking
-// send() and receive() functions.
-//
-// These classes are also privately proton::message_handler instances. They use
-// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
-// etc.) to pass messages between user and proton::container threads.
-//
-// NOTE: no proper error handling
-
-#include <proton/connection.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/container.hpp>
-#include <proton/message.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/sender.hpp>
-#include <proton/work_queue.hpp>
-
-#include <atomic>
-#include <condition_variable>
-#include <iostream>
-#include <mutex>
-#include <queue>
-#include <sstream>
-#include <thread>
-
-// Lock to serialize std::cout, std::cerr used from multiple threads.
-std::mutex out_lock;
-#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0)
-#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0)
-#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0)
-
-// A thread-safe sending connection.
-class mt_sender : private proton::messaging_handler {
-    // Only used in proton thread
-    proton::sender sender_;
-
-    // Shared by proton and user threads, use lock_ to protect.
-    std::mutex lock_;
-    proton::work_queue* work_queue_;   // Messages waiting to be sent
-    std::condition_variable can_send_; // Signal sending threads
-    int queued_;                       // Queued messages waiting to be sent
-    int credit_;                       // AMQP credit - number of messages we can send
-
-  public:
-    // Connect to url
-    mt_sender(proton::container& cont, const std::string& url) :
-        work_queue_(0), queued_(0), credit_(0)
-    {
-        // Pass *this as handler.
-        cont.open_sender(url, proton::connection_options().handler(*this));
-    }
-
-    // Thread safe send()
-    void send(const proton::message& m) {
-        std::unique_lock<std::mutex> l(lock_);
-        // Don't queue up more messages than we have credit for
-        while (!(work_queue_ && queued_ < credit_))
-            can_send_.wait(l);
-        ++queued_;
-        // Add a lambda function to the work queue.
-        // This will call do_send() with a copy of m in the correct proton thread.
-        work_queue_->add([=]() { this->do_send(m); });
-    }
-
-    void close() {
-        std::lock_guard<std::mutex> l(lock_);
-        if (work_queue_)
-            work_queue_->add([this]() { this->sender_.connection().close(); });
-    }
-
-  private:
-    // ==== called by proton threads only
-
-    void on_sender_open(proton::sender& s) override {
-        sender_ = s;
-        std::lock_guard<std::mutex> l(lock_);
-        work_queue_ = &s.work_queue();
-    }
-
-    void on_sendable(proton::sender& s) override {
-        std::lock_guard<std::mutex> l(lock_);
-        credit_ = s.credit();
-        can_send_.notify_all(); // Notify senders we have credit
-    }
-
-    // work_queue work items is are automatically dequeued and called by proton
-    // This function is called because it was queued by send()
-    void do_send(const proton::message& m) {
-        sender_.send(m);
-        std::lock_guard<std::mutex> l(lock_);
-        --queued_;                    // work item was consumed from the work_queue
-        credit_ = sender_.credit();   // update credit
-        can_send_.notify_all();       // Notify senders we have space on queue
-    }
-
-    void on_error(const proton::error_condition& e) override {
-        CERR("unexpected error: " << e << std::endl);
-        exit(1);
-    }
-};
-
-// A thread safe receiving connection.
-class mt_receiver : private proton::messaging_handler {
-    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
-
-    // Used in proton threads only
-    proton::receiver receiver_;
-
-    // Used in proton and user threads, protected by lock_
-    std::mutex lock_;
-    proton::work_queue* work_queue_;
-    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
-    std::condition_variable can_receive_; // Notify receivers of messages
-
-  public:
-
-    // Connect to url
-    mt_receiver(proton::container& cont, const std::string& url) : work_queue_()
-    {
-        // NOTE:credit_window(0) disables automatic flow control.
-        // We will use flow control to match AMQP credit to buffer capacity.
-        cont.open_receiver(url, proton::receiver_options().credit_window(0),
-                           proton::connection_options().handler(*this));
-    }
-
-    // Thread safe receive
-    proton::message receive() {
-        std::unique_lock<std::mutex> l(lock_);
-        // Wait for buffered messages
-        while (!work_queue_ || buffer_.empty())
-            can_receive_.wait(l);
-        proton::message m = std::move(buffer_.front());
-        buffer_.pop();
-        // Add a lambda to the work queue to call receive_done().
-        // This will tell the handler to add more credit.
-        work_queue_->add([=]() { this->receive_done(); });
-        return m;
-    }
-
-    void close() {
-        std::lock_guard<std::mutex> l(lock_);
-        if (work_queue_)
-            work_queue_->add([this]() { this->receiver_.connection().close(); });
-    }
-
-  private:
-    // ==== The following are called by proton threads only.
-
-    void on_receiver_open(proton::receiver& r) override {
-        receiver_ = r;
-        std::lock_guard<std::mutex> l(lock_);
-        work_queue_ = &receiver_.work_queue();
-        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        // Proton automatically reduces credit by 1 before calling on_message
-        std::lock_guard<std::mutex> l(lock_);
-        buffer_.push(m);
-        can_receive_.notify_all();
-    }
-
-    // called via work_queue
-    void receive_done() {
-        // Add 1 credit, a receiver has taken a message out of the buffer.
-        receiver_.add_credit(1);
-    }
-
-    void on_error(const proton::error_condition& e) override {
-        CERR("unexpected error: " << e << std::endl);
-        exit(1);
-    }
-};
-
-// ==== Example code using the mt_sender and mt_receiver
-
-// Send n messages
-void send_thread(mt_sender& s, int n) {
-    for (int i = 0; i < n; ++i) {
-        std::ostringstream o;
-        o << std::this_thread::get_id() << ":" << i;
-        s.send(proton::message(o.str()));
-    }
-    COUT(std::this_thread::get_id() << " sent " << n << std::endl);
-}
-
-// Receive messages till atomic remaining count is 0.
-// remaining is shared among all receiving threads
-void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) {
-    auto id = std::this_thread::get_id();
-    int n = 0;
-    while (remaining-- > 0) {
-        auto m = r.receive();
-        ++n;
-        if (print)
-            COUT(id << " received \"" << m.body() << '"' << std::endl);
-    }
-    COUT(id << " received " << n << " messages" << std::endl);
-}
-
-int main(int argc, const char **argv) {
-    try {
-        int n_threads = argc > 1 ? atoi(argv[1]) : 2;
-        int n_messages = argc > 2 ? atoi(argv[2]) : 10;
-        const char *url =  argc > 3 ? argv[3] : "amqp://127.0.0.1/examples";
-        std::atomic_int remaining(n_messages * n_threads); // Total messages to be received
-        bool print = (remaining <= 30); // Print messages for short runs only
-
-        // Run the proton container
-        proton::container container;
-        auto container_thread = std::thread([&]() { container.run(); });
-
-        // A single sender and receiver to be shared by all the threads
-        mt_sender sender(container, url);
-        mt_receiver receiver(container, url);
-
-        // Start receiver threads, then sender threads.
-        // Starting receivers first gives all receivers a chance to compete for messages.
-        std::vector<std::thread> threads;
-        for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { receive_thread(receiver, remaining,
print); }));
-        for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { send_thread(sender, n_messages); }));
-
-        // Wait for threads to finish
-        for (auto& n_messages_threads : threads)
-            n_messages_threads.join();
-        sender.close();
-        receiver.close();
-
-        container_thread.join();
-
-        return 0;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message