From commits-return-45116-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Tue May 1 15:44:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9F4C1180675 for ; Tue, 1 May 2018 15:44:53 +0200 (CEST) Received: (qmail 22228 invoked by uid 500); 1 May 2018 13:44:52 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 21801 invoked by uid 99); 1 May 2018 13:44:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2018 13:44:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60037E96AA; Tue, 1 May 2018 13:44:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jross@apache.org To: commits@qpid.apache.org Date: Tue, 01 May 2018 13:44:52 -0000 Message-Id: <243aaaabc6b747f9b91ff373b6bbeb99@git.apache.org> In-Reply-To: <5079a328227b4d5eb56f2dbab943f4bb@git.apache.org> References: <5079a328227b4d5eb56f2dbab943f4bb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/51] [abbrv] [partial] qpid-site git commit: QPID-8154: Refresh the proton doc snapshot http://git-wip-us.apache.org/repos/asf/qpid-site/blob/701caf21/content/releases/qpid-proton-master/proton/cpp/api/messaging__handler_8hpp_source.html ---------------------------------------------------------------------- diff --git a/content/releases/qpid-proton-master/proton/cpp/api/messaging__handler_8hpp_source.html b/content/releases/qpid-proton-master/proton/cpp/api/messaging__handler_8hpp_source.html index 2b808ee..1983dfd 100755 --- a/content/releases/qpid-proton-master/proton/cpp/api/messaging__handler_8hpp_source.html +++ b/content/releases/qpid-proton-master/proton/cpp/api/messaging__handler_8hpp_source.html @@ -93,8 +93,8 @@ $(document).ready(function(){initNavTree('messaging__handler_8hpp_source.html','
messaging_handler.hpp
-Go to the documentation of this file.
1 #ifndef PROTON_MESSAGING_HANDLER_HPP
2 #define PROTON_MESSAGING_HANDLER_HPP
3 
4 /*
5  *
6  * Licensed to the Apache Software Foundation (ASF) under one
7  * or more contributor license agreements. See the NOTICE file
8  * distributed with this work for additional information
9  * regarding copyright ownership. The ASF licenses this file
10  * to you under the Apache License, Version 2.0 (the
11  * "License"); you may not use this file except in compliance
12  * with the License. You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing,
17  * software distributed under the License is distributed on an
18  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19  * KIND, either express or impli ed. See the License for the
20  * specific language governing permissions and limitations
21  * under the License.
22  *
23  */
24 
25 #include "./fwd.hpp"
26 #include "./internal/export.hpp"
27 
30 
31 namespace proton {
32 
61 class
62 PN_CPP_CLASS_EXTERN messaging_handler {
63  public:
64  PN_CPP_EXTERN messaging_handler();
65  PN_CPP_EXTERN virtual ~messaging_handler();
66 
69  PN_CPP_EXTERN virtual void on_container_start(container &c);
70 
73  PN_CPP_EXTERN virtual void on_container_stop(container &c);
74 
76  PN_CPP_EXTERN virtual void on_message(delivery &d, message &m);
77 
79  PN_CPP_EXTERN virtual void on_sendable(sender &s);
80 
82  PN_CPP_EXTERN virtual void on_transport_open(transport &t);
83 
85  PN_CPP_EXTERN virtual void on_transport_close(transport &t);
86 
89  PN_CPP_EXTERN virtual void on_transport_error(transport &t);
90 
92  PN_CPP_EXTERN virtual void on_connection_open(connection &c);
93 
95  PN_CPP_EXTERN virtual void on_connection_close(connection &c);
96 
98  PN_CPP_EXTERN virtual void on_connection_error(connection &c);
99 
101  PN_CPP_EXTERN virtual void on_session_open(session &s);
102 
104  PN_CPP_EXTERN virtual void on_session_close(session &s);
105 
107  PN_CPP_EXTERN virtual void on_session_error(session &s);
108 
110  PN_CPP_EXTERN virtual void on_receiver_open(receiver& l);
111 
113  PN_CPP_EXTERN virtual void on_receiver_detach(receiver& l);
114 
116  PN_CPP_EXTERN virtual void on_receiver_close(receiver< /a>& l);
117 
119  PN_CPP_EXTERN virtual void on_receiver_error(receiver& l);
120 
122  PN_CPP_EXTERN virtual void on_sender_open(sender& l);
123 
125  PN_CPP_EXTERN virtual void on_sender_detach(sender& l);
126 
128  PN_CPP_EXTERN virtual void on_sender_close(sender& l);
129 
131  PN_CPP_EXTERN virtual void on_sender_error(sender& l);
132 
134  PN_CPP_EXTERN virtual void on_tracker_accept(tracker &d);
135 
137  PN_CPP_EXTERN virtual void on_tracker_reject(tracker &d);
138 
140  PN_CPP_EXTERN virtual void on_tracker_release(tracker &d);
141 
143  PN_CPP_EXTERN virtual void on_tracker_settle(tracker &d);
144 
146  PN_CPP_EXTERN virtual void on_delivery_settle(delivery &d);
147 
150  PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
151 
154  PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
155 
170  PN_CPP_EXTERN virtual void on_connection_wake(connection&);
171 
173  PN_CPP_EXTERN virtual void on_error(const error_condition &c);
174 };
175 
176 } // proton
177 
178 #endif // PROTON_MESSAGING_HANDLER_HPP
An AMQP message.
Definition: message.hpp:50
-
A top-level container of connections, sessions, and links.
Definition: container.hpp:55
+Go to the documentation of this file.
1 #ifndef PROTON_MESSAGING_HANDLER_HPP
2 #define PROTON_MESSAGING_HANDLER_HPP
3 
4 /*
5  *
6  * Licensed to the Apache Software Foundation (ASF) under one
7  * or more contributor license agreements. See the NOTICE file
8  * distributed with this work for additional information
9  * regarding copyright ownership. The ASF licenses this file
10  * to you under the Apache License, Version 2.0 (the
11  * "License"); you may not use this file except in compliance
12  * with the License. You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing,
17  * software distributed under the License is distributed on an
18  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
19  * KIND, either express or impli ed. See the License for the
20  * specific language governing permissions and limitations
21  * under the License.
22  *
23  */
24 
25 #include "./fwd.hpp"
26 #include "./internal/export.hpp"
27 
30 
31 namespace proton {
32 
62 class
63 PN_CPP_CLASS_EXTERN messaging_handler {
64  public:
65  PN_CPP_EXTERN messaging_handler();
66  PN_CPP_EXTERN virtual ~messaging_handler();
67 
72  PN_CPP_EXTERN virtual void on_container_start(container&);
73 
78  PN_CPP_EXTERN virtual void on_container_stop(container&);
79 
81  PN_CPP_EXTERN virtual void on_message(delivery&, message&);
82 
84  PN_CPP_EXTERN virtual void on_sendable(sender&);
85 
87  PN_CPP_EXTERN virtual void on_transport_open(transport&);
88 
90  PN_CPP_EXTERN virtual void on_transport_close(transport&);
91 
94  PN_CPP_EXTERN virtual void on_transport_error(transport&);
95 
97  PN_CPP_EXTERN virtual void on_connection_open(connection&);
98 
100  PN_CPP_EXTERN virtual void on_connection_close(connection&);
101 
103  PN_CPP_EXTERN virtual void on_connection_error(connection&);
104 
106  PN_CPP_EXTERN virtual void on_session_open(session&);
107 
109  PN_CPP_EXTERN virtual void on_session_close(session&);
110 
112  PN_CPP_EXTERN virtual void on_session_error(session&);
113& #160;
115  PN_CPP_EXTERN virtual void on_receiver_open(receiver&);
116 
118  PN_CPP_EXTERN virtual void on_receiver_detach(receiver&);
119 
121  PN_CPP_EXTERN virtual void on_receiver_close(receiver&);
122 
124  PN_CPP_EXTERN virtual void on_receiver_error(receiver&);
125 
127  PN_CPP_EXTERN virtual void on_sender_open(sender&);
128 
130  PN_CPP_EXTERN virtual void on_sender_detach(se nder&);
131 
133  PN_CPP_EXTERN virtual void on_sender_close(sender&);
134 
136  PN_CPP_EXTERN virtual void on_sender_error(sender&);
137 
139  PN_CPP_EXTERN virtual void on_tracker_accept(tracker&);
140 
142  PN_CPP_EXTERN virtual void on_tracker_reject(tracker&);
143 
145  PN_CPP_EXTERN virtual void on_tracker_release(tracker&);
146 
148  PN_CPP_EXTERN virtual void on_tracker_settle(tracker&);
149 
151  PN_CPP_EXTERN virtual void on_delivery_settle(delivery&);
152 
155  PN_CPP_EXTERN virtual void on_sender_drain_start(sender&);
156 
159  PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver&);
160 
176  PN_CPP_EXTERN virtual void on_connection_wake(connection&);
177 
179  PN_CPP_EXTERN virtual void on_error(const error_condition&);
180 };
181 
182 } // proton
183 
184 #endif // PROTON_MESSAGING_HANDLER_HPP
An AMQP message.
Definition: message.hpp:50
+
A top-level container of connections, sessions, and links.
Definition: container.hpp:50
A channel for sending messages.
Definition: sender.hpp:39
Forward declarations.
A connection to a remote AMQP peer.
Definition: connection.hpp:43
@@ -102,7 +102,7 @@ $(document).ready(function(){initNavTree('messaging__handler_8hpp_source.html','
A received message.
Definition: delivery.hpp:39
A tracker for a sent message.
Definition: tracker.hpp:40
A channel for receiving messages.
Definition: receiver.hpp:40
-
A handler for Proton messaging events.
Definition: messaging_handler.hpp:61
+
A handler for Proton messaging events.
Definition: messaging_handler.hpp:62
A network channel supporting an AMQP connection.
Definition: transport.hpp:37
The main Proton namespace.
Definition: annotation_key.hpp:33
Describes an endpoint error state.
Definition: error_condition.hpp:40
http://git-wip-us.apache.org/repos/asf/qpid-site/blob/701caf21/content/releases/qpid-proton-master/proton/cpp/api/mt_page.html ---------------------------------------------------------------------- diff --git a/content/releases/qpid-proton-master/proton/cpp/api/mt_page.html b/content/releases/qpid-proton-master/proton/cpp/api/mt_page.html index 2c5a46f..a97973f 100755 --- a/content/releases/qpid-proton-master/proton/cpp/api/mt_page.html +++ b/content/releases/qpid-proton-master/proton/cpp/api/mt_page.html @@ -94,21 +94,19 @@ $(document).ready(function(){initNavTree('mt_page.html','');});

Full multithreading support is available with C++11 and later. Limited multithreading is possible with older versions of C++. See the last section of this page for more information.

+

proton::container handles multiple connections concurrently in a thread pool, created using proton::container::run(). As AMQP events occur on a connection the container calls proton::messaging_handler event callbacks. The calls for each connection are serialized - callbacks for the same connection are never made concurrently.

+

You assign a handler to a connection in proton::container::connect() or proton::listen_handler::on_accept() with proton::connection_options::handler(). We recommend you create a separate handler for each connection. That means the handler doesn't need locks or other synchronization to protect it against concurrent use by Proton threads. If you use the handler concurrently from non-Proton threads then you will need synchronization.

+

The examples multithreaded_client.cpp and multithreaded_client_flow_control.cpp illustrate these points.

Thread-safety rules

-

proton::message is a value type with the same threading constraints as a standard C++ built-in type. It cannot be concurrently modified.

-

The proton::container is thread-safe with C++11 or greater. It has the following capabilities.

-
    -
  • Application threads can open (or listen for) new connections at any time.
  • -
  • It manages worker threads to process connections.
  • -
  • It handles network IO and calls the relevant proton::messaging_handler event callbacks to execute application code.
  • -
-

The proton::container ensures that calls to event callbacks for each connection instance are serialized (not called concurrently), but callbacks for different connections can be safely executed in parallel.

-

The proton::connection and related objects (proton::session, proton::sender, proton::receiver, proton::delivery) are not thread-safe and are subject to the following rules.

+

proton::container is thread-safe with C++11 or greater. An application thread can open (or listen for) new connections at any time. The container uses threads that call proton::container::run() to handle network IO and call user-defined proton::messaging_handler callbacks.

+

proton::container ensures that calls to event callbacks for each connection instance are serialized (not called concurrently), but callbacks for different connections can be safely executed in parallel.

+

proton::connection and related objects (proton::session, proton::sender, proton::receiver, proton::delivery) are not thread-safe and are subject to the following rules.

  1. They can only be used from a proton::messaging_handler event callback called by Proton or a proton::work_queue function (more below).
  2. You cannot use objects belonging to one connection from a callback for another connection. We recommend a single handler instance per connection to avoid confusion.
  3. You can store Proton objects in member variables for use in a later callback, provided you respect rule two.
+

proton::message is a value type with the same threading constraints as a standard C++ built-in type. It cannot be concurrently modified.

Work queues

proton::work_queue provides a safe way to communicate between different connection handlers or between non-Proton threads and connection handlers.

    http://git-wip-us.apache.org/repos/asf/qpid-site/blob/701caf21/content/releases/qpid-proton-master/proton/cpp/api/multithreaded_client_8cpp-example.html ---------------------------------------------------------------------- diff --git a/content/releases/qpid-proton-master/proton/cpp/api/multithreaded_client_8cpp-example.html b/content/releases/qpid-proton-master/proton/cpp/api/multithreaded_client_8cpp-example.html index d1f4f38..407b2cf 100755 --- a/content/releases/qpid-proton-master/proton/cpp/api/multithreaded_client_8cpp-example.html +++ b/content/releases/qpid-proton-master/proton/cpp/api/multithreaded_client_8cpp-example.html @@ -94,7 +94,7 @@ $(document).ready(function(){initNavTree('multithreaded_client_8cpp-example.html

A multithreaded sender and receiver.Requires C++11

-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/li censes/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
//
// C++11 or greater
//
// A multi-threaded client that calls proton::container::run() in one thread, sends
// messages in another and receives messages in a third.
//
// Note this client does not deal with flow-control. If the sender is faster
// than the receiver, messages will build up in memory on the sending side.
// See @ref multithreaded_client_flow_control.cpp for a more complex example with
// flow control.
//
// NOTE: no proper error handling
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
// Lock output from threads to avoid scramblin
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
// Handler for a single thread-safe sending and receiving connection.
class client : public proton::messaging_handler {
// Invariant
const std::string url_;
const std::string address_;
// Only used in proton handler thread
proton::sender sender_;
// Shared by proton and user threads, protected by lock_
std::mutex lock_;
proton::work_queue *work_queue_;
std::condition_variable sender_ready_;
std::queue<proton::message> messages_;
std::condition_variable messages_ready_;
public:
client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
// Thread safe
void send(const proton::message& msg) {
// Use [=] to copy the message, we cannot pass it by reference since it
// will be used in another thread.
work_queue()->add([=]() { sender_.send(msg); });
}
// Thread safe
proton::message receive() {
std::unique_lock<std::mutex> l(lock_);
while (messages_.empty()) messages_ready_.wait(l);
auto msg = std::move(messa ges_.front());
messages_.pop();
return msg;
}
// Thread safe
void close() {
work_queue()->add([=]() { sender_.connection().close(); });
}
private:
proton::work_queue* work_queue() {
// Wait till work_queue_ and sender_ ar e initialized.
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_) sender_ready_.wait(l);
return work_queue_;
}
// == messaging_handler overrides, only called in proton hander thread
// Note: this example creates a connection when the container starts.
// To create connections after the container has started, use
// container::connect().
// See @ref multithreaded_client_flow_control.cpp for an example.
voi d on_container_start(proton::container& cont) override {
cont.connect(url_);
}
void on_connection_open(proton::connection& conn) override {
conn.open _sender(address_);
conn.open_receiver(address_);
}
void on_sender_open(proton::sender& s) override {
// sender_ and work_queue_ must be set atomically
std::lock_guard<std::mutex> l(lock_);
sender_ = s;
work_queue_ = &s.work_queue();
sende r_ready_.notify_all();
}
void on_message(proton::delivery& dlv, proton::message& msg) override {
std::lock_guard<std::mutex> l(lock_);
messages_.push(msg);
messages_ready_.notify_all();
}
void on_error(const proton::error_condition& e) override {
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
int main(int argc, const char** argv) {
try {
if (argc != 4) {
std ::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int n_messages = atoi(argv[3]);
client cl(url, address);
proton::container container(cl);
std::thread container_thread([&]() { container.run(); });
std::thread sender([&]() {
for (int i = 0; i < n_messages; ++i) {
cl.send(msg);
OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
}
});
int received = 0;
std::thread receiver([&]() {
for (int i = 0; i < n_messages; ++i) {
auto msg = cl.receive();
OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
++received;
}
});
sender.join();
receiver.join();
cl.close();
container_thread.join();
std::cout << received << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/li censes/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
//
// C++11 or greater
//
// A multi-threaded client that calls proton::container::run() in one thread, sends
// messages in another and receives messages in a third.
//
// Note this client does not deal with flow-control. If the sender is faster
// than the receiver, messages will build up in memory on the sending side.
// See @ref multithreaded_client_flow_control.cpp for a more complex example with
// flow control.
//
// NOTE: no proper error handling
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
// Lock output from threads to avoid scrambling
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)< /span>
// Handler for a single thread-safe sending and receiving connection.
class client : public proton::messaging_handler {
// Invariant
const std::string url_;
const std::string address_;
// Only used in proton handler thread
proton::sender sender_;
// Shared by proton and user threads, protected by lock_
std::mutex lock_;
proton::work_queue *work_queue_;
std::condition_variable sender_ready_;
std::queue<proton::message> messages_;
std::condition_variable messages_ready_;
public:
client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
// Thread safe
void send(const proton::message& msg) {
// Use [=] to copy the message, we cannot pass it by reference since it
// will be used in another thread.
work_queue()->add([=]() { sender_.send(msg); });
}
// Thread safe
proton::message receive() {
std::unique_lock<std::mutex> l(lock_);
while (messages_.empty()) messages_ready_.wait(l);
auto msg = std::move(mess ages_.front());
messages_.pop();
return msg;
}
// Thread safe
void close() {
work_queue()->add([=]() { sender_.connection().close(); });
}
private:
proton::work_queue* work_queue() {
// Wait till work_queue_ and sender_ a re initialized.
std::unique_lock<std::mutex> l(lock_);
while (!work_queue_) sender_ready_.wait(l);
return work_queue_;
}
// == messaging_handler overrides, only called in proton handler thread
// Note: this example creates a connection when the container starts.
// To create connections after the container has started, use
// container::connect().
// See @ref multithreaded_client_flow_control.cpp for an example.
v oid on_container_start(proton::container& cont) override {
cont.connect(url_);
}
void on_connection_open(proton::connection& conn) override {
conn.op en_sender(address_);
conn.open_receiver(address_);
}
void on_sender_open(proton::sender& s) override {
// sender_ and work_queue_ must be set atomically
std::lock_guard<std::mutex> l(lock_);
sender_ = s;
work_queue_ = &s.work_queue();
sen der_ready_.notify_all();
}
void on_message(proton::delivery& dlv, proton::message& msg) override {
std::lock_guard<std::mutex> l(lock_);
messages_.push(msg);
messages_ready_.notify_all();
}
void on_error(const proton::error_condition& e) override {
OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
int main(int argc, const char** argv) {
try {
if (argc != 4) {
std ::cerr <<
"Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"< /div>
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n";
return 1;
}
const char *url = argv[1];
const char *address = argv[2];
int n_messages = atoi(argv[3]);
client cl(url, address);
proton::container container(cl);
std::thread container_thread([&]() { container.run(); });
std::thread sender([&]() {
for (int i = 0; i < n_messages; ++i) {
cl.send(msg);
OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
}
});
int received = 0;
std::thread receiver([&]() {
for (int i = 0; i < n_messages; ++i) {
auto msg = cl.receive();
OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
++received;
}
});
sender.join();
receiver.join();
cl.close();
container_thread.join();
std::cout << received << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}