qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [3/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example
Date Wed, 27 Apr 2016 14:54:59 GMT
PROTON-1046: C++ multi-threaded controller and improved broker example

A complete portable multi-threaded API for proton that can be implemented on an
threading/IO platform.

API:
- proton::controller: A multi-threaded alternative to the proton::container.
- proton::work_queue: async functions serialized per-connection.

Examples:
- mt/epoll_controller.hpp: controller/work_queue implemented using native Linux epoll.
- mt/broker.cpp: multi-threaded broker, portable over any controller implementation.
  - illustrates multi-threading, use of work_queue, remote shutdown

TODO:
- Examples and implementations for non-Linux platforms.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/deccf354
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/deccf354
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/deccf354

Branch: refs/heads/master
Commit: deccf354a653e2106f40cdd59df9b67b74911e8b
Parents: b53a684
Author: Alan Conway <aconway@redhat.com>
Authored: Thu Mar 31 17:12:18 2016 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed Apr 27 10:39:59 2016 -0400

----------------------------------------------------------------------
 config.sh.in                                    |   2 +-
 examples/cpp/CMakeLists.txt                     |  25 +-
 examples/cpp/README.dox                         |  99 ++--
 examples/cpp/broker.cpp                         |   4 +-
 examples/cpp/client.cpp                         |   4 +-
 examples/cpp/direct_recv.cpp                    |   4 +-
 examples/cpp/direct_send.cpp                    |   4 +-
 examples/cpp/engine/CMakeLists.txt              |  37 --
 examples/cpp/engine/broker.cpp                  | 176 -------
 examples/cpp/engine/client.cpp                  | 103 ----
 examples/cpp/engine/direct_recv.cpp             |  79 ---
 examples/cpp/engine/direct_send.cpp             |  91 ----
 examples/cpp/engine/helloworld.cpp              |  68 ---
 examples/cpp/engine/options.hpp                 | 173 -------
 examples/cpp/engine/server.cpp                  |  90 ----
 examples/cpp/engine/simple_recv.cpp             |  85 ---
 examples/cpp/engine/simple_send.cpp             |  93 ----
 examples/cpp/example/socket_windows.cpp         | 218 ++++++++
 examples/cpp/example_test.py                    | 106 ++--
 examples/cpp/mt/broker.cpp                      | 280 ++++++++++
 examples/cpp/mt/epoll_controller.cpp            | 517 +++++++++++++++++++
 examples/cpp/options.hpp                        |   2 +
 examples/cpp/recurring_timer.cpp                |   4 +-
 examples/cpp/server.cpp                         |   4 +-
 examples/cpp/server_direct.cpp                  |   4 +-
 examples/cpp/simple_recv.cpp                    |   4 +-
 examples/cpp/simple_send.cpp                    |   4 +-
 examples/cpp/tutorial.dox                       | 403 +++++++++++++++
 proton-c/bindings/cpp/CMakeLists.txt            |  13 +-
 proton-c/bindings/cpp/cpp.cmake                 |   3 +
 proton-c/bindings/cpp/docs/mainpage.md          | 152 +++---
 proton-c/bindings/cpp/docs/mt_page.md           |  21 +
 proton-c/bindings/cpp/docs/tutorial.dox         | 428 ---------------
 proton-c/bindings/cpp/docs/user.doxygen.in      |   3 +-
 .../cpp/include/proton/connection_options.hpp   |   9 +-
 .../bindings/cpp/include/proton/controller.hpp  | 118 +++++
 proton-c/bindings/cpp/include/proton/error.hpp  |   7 +-
 .../bindings/cpp/include/proton/handler.hpp     |  12 +
 .../cpp/include/proton/io/connection_engine.hpp |  88 ++--
 .../include/proton/io/default_controller.hpp    |  47 ++
 .../bindings/cpp/include/proton/io/socket.hpp   | 130 -----
 proton-c/bindings/cpp/include/proton/sender.hpp |   3 +-
 .../bindings/cpp/include/proton/work_queue.hpp  |  75 +++
 .../bindings/cpp/src/connection_options.cpp     |  13 +-
 proton-c/bindings/cpp/src/contexts.hpp          |   5 +-
 proton-c/bindings/cpp/src/controller.cpp        |  59 +++
 proton-c/bindings/cpp/src/engine_test.cpp       |  45 --
 .../bindings/cpp/src/io/connection_engine.cpp   |  67 +--
 proton-c/bindings/cpp/src/io/posix/socket.cpp   | 196 -------
 proton-c/bindings/cpp/src/io/windows/socket.cpp | 218 --------
 proton-c/bindings/cpp/src/messaging_adapter.cpp |   5 +-
 tests/tools/apps/cpp/CMakeLists.txt             |   2 +-
 tests/tools/apps/cpp/reactor_send.cpp           |   4 +-
 53 files changed, 2054 insertions(+), 2352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 744ddb3..5eb779b 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -73,7 +73,7 @@ export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)"
 export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)"
 
 # can the test harness use valgrind?
-if [[ -x "$(type -p valgrind)" ]] ; then
+if [[ -x "$(type -p valgrind)" && "@ENABLE_VALGRIND" == "ON" ]] ; then
     export VALGRIND=$(type -p valgrind)
 fi
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 4f6b742..3a81718 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -20,7 +20,10 @@
 find_package(ProtonCpp REQUIRED)
 
 include_directories(${ProtonCpp_INCLUDE_DIRS})
+link_libraries(${ProtonCpp_LIBRARIES})
+add_compile_options(${CXX_WARNING_FLAGS})
 
+# Single-threaded examples.
 foreach(example
     broker
     helloworld
@@ -40,12 +43,9 @@ foreach(example
     ssl_client_cert
     encode_decode)
   add_executable(${example} ${example}.cpp)
-  target_link_libraries(${example} ${ProtonCpp_LIBRARIES})
-  set_source_files_properties(${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
 endforeach()
 
-add_subdirectory(engine)
-
+# Python test runner
 set(env_py ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py)
 
 function(set_test_path dir)
@@ -61,7 +61,16 @@ set_test_path("$<TARGET_FILE_DIR:broker>")
 add_test(NAME cpp_container_example_test
   COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ContainerExampleTest)
 
-set_test_path("$<TARGET_FILE_DIR:engine-broker>")
-
-add_test(NAME cpp_engine_example_test
-  COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v ConnectionEngineExampleTest)
+# TODO aconway 2016-04-26: need portable MT and IO examples.
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND BUILD_CPP_MT)
+  set(controller_src mt/epoll_controller.cpp)
+  foreach(example
+      broker
+      )
+    add_executable(mt_${example} mt/${example}.cpp ${controller_src})
+    target_link_libraries(mt_${example} pthread)
+    set_target_properties(mt_${example} PROPERTIES CXX_STANDARD 11)
+  endforeach()
+  add_test(NAME cpp_mt_example_test
+    COMMAND ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v MtBrokerTest)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1e78774..d545366 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -1,15 +1,22 @@
-// Examples overview.
+// C++ examples list (doxygen format)
 //
-// For a better overview, see the tutorial in the generated documentation.
-//
-// In your build directory do:
+// For a tutorial-style description of the examples see tutorial.dox.
+// To build the full HTML tutorial and documentation, in your build directory do:
 //
 //     make docs-cpp
 //
 // then open proton-c/bindings/cpp/docs/html/tutorial.html in your browser.
 
-// DEVELOPER NOTE: if you are adding or modifying examples you should keep this
-// file and ../proton-c/bindings/cpp/docs/tutorial.hpp up to date.
+// DEVELOPER NOTE: if you add or modify examples, please add/update a short
+// description below and (if appropriate) extend/update tutorial.dox.
+
+/** example sub directory
+
+The example sub-directory has utilities classes to make the example simpler,
+these classes are not directly related to the use of proton so are in a separate
+`example` directory and namespace.
+
+*/
 
 /** @example helloworld.cpp
 
@@ -46,7 +53,7 @@ on 127.0.0.1:5672. Simply prints out the body of received messages.
 /** @example direct_send.cpp
 
 Accepts an incoming connection and then sends like `simple_send`.  You can
-connect directly to `direct_send` *without* a broker using \ref simple_recv.cpp.
+connect directly to `direct_send` *without* a broker using @ref simple_recv.cpp.
 Make sure to stop the broker first or use a different port for `direct_send`.
 
 */
@@ -54,7 +61,7 @@ Make sure to stop the broker first or use a different port for `direct_send`.
 /** @example direct_recv.cpp
 
 Accepts an incoming connection and then receives like `simple_recv`.  You can
-connect directly to `direct_recv` *without* a broker using \ref simple_send.cpp.
+connect directly to `direct_recv` *without* a broker using @ref simple_send.cpp.
 Make sure to stop the broker first or use a different port for `direct_recv`.
 
 */
@@ -108,9 +115,6 @@ automatically when a client tries to send or subscribe. This file contains
 the `queue` class that queues messages and the `broker_handler` class
 that manages queues and links and transfers messages to/from clients.
 
-Examples \ref broker.cpp and \ref engine/broker.cpp use this same
-broker logic but show different ways to run it in a server application.
-
 */
 
 /** @example broker.cpp
@@ -120,79 +124,40 @@ to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
 broker. This broker creates queues automatically when a client tries to send or
 subscribe.
 
-Uses the broker logic from \ref broker.hpp, the same logic as the
-`proton::connection_engine` broker example \ref engine/broker.cpp.
-
 */
 
-//////////////// connection_engine examples.
+/** @example mt/epoll_controller.cpp
 
-/** \example engine/helloworld.cpp
+An example implementation of the proton::mt::controller API that shows how to
+use the prton::io::connection_engine SPI to adapt the proton API to native
+IO. In this case using a multi-threaded Linux epoll poller as the implementation.
 
-`proton::connection_engine` example to send a "Hello World" message to
-itself. Compare with the corresponding `proton::container` example \ref
-helloworld.cpp.
+__Requires C++11__
 
 */
 
-/** \example engine/simple_send.cpp
+/** @example mt/broker.cpp
 
-`proton::connection_engine` example of sending a fixed number of messages and
-tracking their (asynchronous) acknowledgement. Messages are sent through the
-'examples' node on an intermediary accessible on 127.0.0.1:5672.
+A multi-threaded broker, using the proton::mt extensions. This broker is
+portable over any implementation of the proton::mt API, see @ref
+mt/epoll_controller.cpp for an example.
 
-*/
-
-/** \example engine/simple_recv.cpp
-
-`proton::connection_engine` example that subscribes to the 'examples' node and prints
- the body of received messages.
+__Requires C++11__
 
 */
 
-/** \example engine/direct_send.cpp
+/** @example mt/simple_send.cpp
 
-`proton::connection_engine` example accepts an incoming connection and then
-sends like `simple_send`.  You can connect directly to `direct_send` *without* a
-broker using \ref simple_recv.cpp.  Make sure to stop the broker first or use a
-different port for `direct_send`.
+A multi-threaded sender client. Sends messages concurrently to multiple addresses.
 
-*/
-
-/** \example engine/direct_recv.cpp
-
-`proton::connection_engine` example accepts an incoming connection and then
-receives like `simple_recv`.  You can connect directly to `direct_recv`
-*without* a broker using \ref simple_send.cpp.  Make sure to stop the broker
-first or use a different port for `direct_recv`.
+__Requires C++11__
 
 */
 
-/** \example engine/client.cpp
+/** @example mt/simple_recv.cpp
 
-`proton::connection_engine` client for request-response example. Sends requests and
-prints out responses. Requires an intermediary that supports the AMQP 1.0
-dynamic nodes on which the responses are received. The requests are sent through
-the 'examples' node.
+A multi-threaded receiver client. Receives messages concurrently to multiple addresses.
 
-*/
+__Requires C++11__
 
-/** \example engine/server.cpp
-
-`proton::connection_engine` server for request-response example, that receives
-requests via the examples node, converts the body to uppercase and sends the
-result back to the indicated reply address.
-
-*/
-
-/** \example engine/broker.cpp
-
-A simple, single-threaded broker using the `proton::container`. You can use this
-to run other examples that reqiure an intermediary, or you can use any AMQP 1.0
-broker. This broker creates queues automatically when a client tries to send or
-subscribe.
-
-Uses the broker logic from \ref broker.hpp, the same logic as the
-proton::container` broker example \ref broker.cpp.
-
-*/
+*/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 37839c6..a19997f 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -61,7 +61,7 @@ class broker {
 
 int main(int argc, char **argv) {
     proton::url url("0.0.0.0");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(url, 'a', "address", "listen on URL", "URL");
 
@@ -72,7 +72,7 @@ int main(int argc, char **argv) {
         proton::container(b.handler()).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 0c38ac6..494294e 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -80,7 +80,7 @@ class client : public proton::handler {
 
 int main(int argc, char **argv) {
     proton::url url("127.0.0.1:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(url, 'a', "address", "connect and send to URL", "URL");
 
@@ -97,7 +97,7 @@ int main(int argc, char **argv) {
         proton::container(c).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_recv.cpp b/examples/cpp/direct_recv.cpp
index f999869..76bbaf9 100644
--- a/examples/cpp/direct_recv.cpp
+++ b/examples/cpp/direct_recv.cpp
@@ -72,7 +72,7 @@ class direct_recv : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
     opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
@@ -84,7 +84,7 @@ int main(int argc, char **argv) {
         proton::container(recv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_send.cpp b/examples/cpp/direct_send.cpp
index 0b63ec5..860acc4 100644
--- a/examples/cpp/direct_send.cpp
+++ b/examples/cpp/direct_send.cpp
@@ -82,7 +82,7 @@ class simple_send : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     
     opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
     opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -94,7 +94,7 @@ int main(int argc, char **argv) {
         proton::container(send).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/CMakeLists.txt b/examples/cpp/engine/CMakeLists.txt
deleted file mode 100644
index bafa20c..0000000
--- a/examples/cpp/engine/CMakeLists.txt
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(ProtonCpp REQUIRED)
-
-include_directories(${ProtonCpp_INCLUDE_DIRS})
-
-foreach(example
-    broker
-    helloworld
-    simple_recv
-    simple_send
-    direct_recv
-    direct_send
-    client
-    server)
-  add_executable(engine-${example} ${example}.cpp ${extra_source})
-  target_link_libraries(engine-${example} ${ProtonCpp_LIBRARIES})
-  set_source_files_properties(engine-${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
-  set_target_properties(engine-${example} PROPERTIES OUTPUT_NAME ${example})
-endforeach()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
deleted file mode 100644
index bfe84fc..0000000
--- a/examples/cpp/engine/broker.cpp
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../options.hpp"
-#include "../broker.hpp"
-
-#include <iostream>
-
-#ifndef WIN32                   // TODO aconway 2016-03-23: windows broker example
-#include <proton/io/socket.hpp>
-#include <sys/select.h>
-#include <set>
-
-template <class T> T check(T result, const std::string& msg="io_error: ") {
-    if (result < 0)
-        throw proton::io::socket::io_error(msg + proton::io::socket::error_str());
-    return result;
-}
-
-void fd_set_if(bool on, int fd, fd_set *fds);
-
-class broker {
-    typedef std::set<proton::io::socket::engine*> engines;
-
-    queues queues_;
-    broker_handler handler_;
-    proton::io::connection_engine::container container_;
-    engines engines_;
-    fd_set reading_, writing_;
-
-  public:
-    broker() : handler_(queues_) {
-        FD_ZERO(&reading_);
-        FD_ZERO(&writing_);
-    }
-
-    ~broker() {
-        for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
-            delete *i;
-    }
-
-    void run(const proton::url& url) {
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
-        FD_SET(listener.socket(), &reading_);
-        while(true) {
-            fd_set readable_set = reading_;
-            fd_set writable_set = writing_;
-            check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
-
-            if (FD_ISSET(listener.socket(), &readable_set)) {
-                std::string client_host, client_port;
-                int fd = listener.accept(client_host, client_port);
-                std::cout << "accepted " << client_host << ":" << client_port
-                          << " fd=" << fd << std::endl;
-                engines_.insert(
-                    new proton::io::socket::engine(
-                        fd, handler_, container_.make_options()));
-                FD_SET(fd, &reading_);
-                FD_SET(fd, &writing_);
-            }
-
-            for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
-                proton::io::socket::engine *eng = *(i++);
-                int flags = 0;
-                if (FD_ISSET(eng->socket(), &writable_set))
-                    eng->write();
-                if (FD_ISSET(eng->socket(), &readable_set))
-                    eng->read();
-                if (eng->dispatch()) {
-                    fd_set_if(eng->read_buffer().size, eng->socket(), &reading_);
-                    fd_set_if(eng->write_buffer().size, eng->socket(), &writing_);
-                } else {
-                    std::cout << "closed fd=" << eng->socket() << std::endl;
-                    engines_.erase(eng);
-                    delete eng;
-                }
-            }
-        }
-    }
-};
-
-void fd_set_if(bool on, int fd, fd_set *fds) {
-    if (on)
-        FD_SET(fd, fds);
-    else
-        FD_CLR(fd, fds);
-}
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker().run(address);
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-#else // WIN32
-
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
-
-#include "../fake_cpp11.hpp"
-
-class broker {
-  public:
-    broker(const proton::url& url) : handler_(url, queues_) {}
-
-    proton::handler& handler() { return handler_; }
-
-  private:
-
-    class my_handler : public broker_handler {
-      public:
-        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
-
-        void on_container_start(proton::container &c) override {
-            c.listen(url_);
-            std::cout << "broker listening on " << url_ << std::endl;
-        }
-
-      private:
-        const proton::url& url_;
-    };
-
-  private:
-    queues queues_;
-    my_handler handler_;
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    proton::url url("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker b(url);
-        proton::container(b.handler()).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
deleted file mode 100644
index 8e58a38..0000000
--- a/examples/cpp/engine/client.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/source_options.hpp"
-
-#include <iostream>
-#include <vector>
-
-#include "../fake_cpp11.hpp"
-
-using proton::receiver_options;
-using proton::source_options;
-
-class client : public proton::handler {
-  private:
-    proton::url url;
-    std::vector<std::string> requests;
-    proton::sender sender;
-    proton::receiver receiver;
-
-  public:
-    client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}
-
-    void on_connection_open(proton::connection &c) override {
-        sender = c.open_sender(url.path());
-        // Create a receiver requesting a dynamically created queue
-        // for the message source.
-        receiver_options dynamic_addr = receiver_options().source(source_options().dynamic(true));
-        receiver = c.open_receiver("", dynamic_addr);
-    }
-
-    void send_request() {
-        proton::message req;
-        req.body(requests.front());
-        req.reply_to(receiver.source().address());
-        sender.send(req);
-    }
-
-    void on_receiver_open(proton::receiver &) override {
-        send_request();
-    }
-
-    void on_message(proton::delivery &d, proton::message &response) override {
-        if (requests.empty()) return; // Spurious extra message!
-        std::cout << requests.front() << " => " << response.body() << std::endl;
-        requests.erase(requests.begin());
-        if (!requests.empty()) {
-            send_request();
-        } else {
-            d.connection().close();
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-
-    try {
-        opts.parse();
-
-        std::vector<std::string> requests;
-        requests.push_back("Twas brillig, and the slithy toves");
-        requests.push_back("Did gire and gymble in the wabe.");
-        requests.push_back("All mimsy were the borogroves,");
-        requests.push_back("And the mome raths outgrabe.");
-        client handler(address, requests);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
deleted file mode 100644
index 48f4478..0000000
--- a/examples/cpp/engine/direct_recv.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/url.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class direct_recv : public proton::handler {
-  private:
-    uint64_t expected;
-    uint64_t received;
-
-  public:
-    direct_recv(int c) : expected(c), received(0) {}
-
-    void on_message(proton::delivery &d, proton::message &msg) override {
-        if (msg.id().get<uint64_t>() < received)
-            return; // ignore duplicate
-        if (expected == 0 || received < expected) {
-            std::cout << msg.body() << std::endl;
-            received++;
-        }
-        if (received == expected) {
-            d.receiver().close();
-            d.connection().close();
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        proton::url url(address);
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "direct_recv listening on " << url << std::endl;
-        direct_recv handler(message_count);
-        proton::io::socket::engine(listener.accept(), handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
deleted file mode 100644
index 2d9acf0..0000000
--- a/examples/cpp/engine/direct_send.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/acceptor.hpp"
-#include "proton/connection.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
-  private:
-    int sent;
-    int confirmed;
-    int total;
-  public:
-    simple_send(int c) : sent(0), confirmed(0), total(c) {}
-
-    void on_sendable(proton::sender &sender) override {
-        while (sender.credit() && sent < total) {
-            proton::message msg;
-            msg.id(sent + 1);
-            std::map<std::string, int> m;
-            m["sequence"] = sent+1;
-            msg.body(m);
-            sender.send(msg);
-            sent++;
-        }
-    }
-
-    void on_tracker_accept(proton::tracker &t) override {
-        confirmed++;
-        if (confirmed == total) {
-            std::cout << "all messages confirmed" << std::endl;
-            t.connection().close();
-        }
-    }
-
-    void on_transport_close(proton::transport &) override {
-        sent = confirmed;
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        proton::url url(address);
-        proton::io::socket::listener listener(url.host(), url.port());
-        std::cout << "direct_send listening on " << url << std::endl;
-        simple_send handler(message_count);
-        proton::io::socket::engine(listener.accept(), handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
deleted file mode 100644
index a4f23ef..0000000
--- a/examples/cpp/engine/helloworld.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/delivery.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-#include "proton/io/socket.hpp"
-
-#include <iostream>
-
-#include "../fake_cpp11.hpp"
-
-class hello_world : public proton::handler {
-  private:
-    std::string address_;
-
-  public:
-    hello_world(const std::string& address) : address_(address) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_receiver(address_);
-        c.open_sender(address_);
-    }
-
-    void on_sendable(proton::sender &s) override {
-        proton::message m("Hello World!");
-        s.send(m);
-        s.close();
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        std::cout << m.body() << std::endl;
-        d.connection().close();
-    }
-};
-
-int main(int argc, char **argv) {
-    try {
-        proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
-        hello_world hw(url.path());
-        proton::io::socket::engine(url, hw).run();
-
-        return 0;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/options.hpp b/examples/cpp/engine/options.hpp
deleted file mode 100644
index bd477b5..0000000
--- a/examples/cpp/engine/options.hpp
+++ /dev/null
@@ -1,173 +0,0 @@
-#ifndef OPTIONS_HPP
-#define OPTIONS_HPP
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <string>
-#include <sstream>
-#include <ostream>
-#include <vector>
-#include <stdexcept>
-
-/** bad_option is thrown for option parsing errors */
-struct bad_option : public std::runtime_error {
-    bad_option(const std::string& s) : std::runtime_error(s) {}
-};
-
-/** Simple command-line option parser for example programs */
-class options {
-  public:
-
-    options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
-        size_t slash = prog_.find_last_of("/\\");
-        if (slash != std::string::npos)
-            prog_ = prog_.substr(slash+1); // Extract prog name from path
-        add_flag(help_, 'h', "help", "Print the help message");
-    }
-
-    ~options() {
-        for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
-            delete *i;
-    }
-
-    /** Updates value when parse() is called if option is present with a value. */
-    template<class T>
-    void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
-        opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
-    }
-
-    /** Sets flag when parse() is called if option is present. */
-    void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
-        opts_.push_back(new option_flag(flag, short_name, long_name, description));
-    }
-
-    /** Parse the command line, return the index of the first non-option argument.
-     *@throws bad_option if there is a parsing error or unknown option.
-     */
-    int parse() {
-        int arg = 1;
-        for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
-            opts::iterator i = opts_.begin();
-            while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
-                ++i;
-            if (i == opts_.end())
-                throw bad_option(std::string("unknown option ") + argv_[arg]);
-        }
-        if (help_) throw bad_option("");
-        return arg;
-    }
-
-    /** Print a usage message */
-  friend std::ostream& operator<<(std::ostream& os, const options& op) {
-      os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
-      os << std::endl << "options:" << std::endl;
-      for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
-          os << **i << std::endl;
-      return os;
-  }
-
- private:
-    class option {
-      public:
-        option(char s, const std::string& l, const std::string& d, const std::string v) :
-            short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
-        virtual ~option() {}
-
-        virtual bool parse(int argc, char const * const * argv, int &i) = 0;
-        virtual void print_default(std::ostream&) const {};
-
-      friend std::ostream& operator<<(std::ostream& os, const option& op) {
-          os << "  " << op.short_;
-          if (!op.var_.empty()) os << " " << op.var_;
-          os << ", " << op.long_;
-          if (!op.var_.empty()) os << "=" << op.var_;
-          os << std::endl << "        " << op.desc_;
-          op.print_default(os);
-          return os;
-      }
-
-      protected:
-        std::string short_, long_, desc_, var_;
-    };
-
-    template <class T>
-    class option_value : public option {
-      public:
-        option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
-            option(s, l, d, v), value_(value) {}
-
-        bool parse(int argc, char const * const * argv, int &i) {
-            std::string arg(argv[i]);
-            if (arg == short_ || arg == long_) {
-                if (i < argc-1) {
-                    set_value(arg, argv[++i]);
-                    return true;
-                } else {
-                    throw bad_option("missing value for " + arg);
-                }
-            }
-            if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
-                set_value(long_, arg.substr(long_.size()+1));
-                return true;
-            }
-            return false;
-        }
-
-        virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
-
-        void set_value(const std::string& opt, const std::string& s) {
-            std::istringstream is(s);
-            is >> value_;
-            if (is.fail() || is.bad())
-                throw bad_option("bad value for " + opt + ": " + s);
-        }
-
-      private:
-        T& value_;
-    };
-
-    class option_flag: public option {
-      public:
-        option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
-            option(s, l, d, ""), flag_(flag)
-        { flag_ = false; }
-
-        bool parse(int /*argc*/, char const * const * argv, int &i) {
-            if (argv[i] == short_ || argv[i] == long_) {
-                flag_ = true;
-                return true;
-            } else {
-                return false;
-            }
-        }
-
-      private:
-        bool &flag_;
-    };
-
-    typedef std::vector<option*> opts;
-
-    int argc_;
-    char const * const * argv_;
-    std::string prog_;
-    opts opts_;
-    bool help_;
-};
-
-#endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
deleted file mode 100644
index 31f3599..0000000
--- a/examples/cpp/engine/server.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/connection.hpp"
-#include "proton/delivery.hpp"
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/tracker.hpp"
-#include "proton/url.hpp"
-
-#include <iostream>
-#include <map>
-#include <string>
-#include <cctype>
-
-#include "../fake_cpp11.hpp"
-
-class server : public proton::handler {
-  private:
-    typedef std::map<std::string, proton::sender> sender_map;
-    proton::url url;
-    sender_map senders;
-
-  public:
-
-    server(const std::string &u) : url(u) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_receiver(url.path());
-        std::cout << "server connected to " << url << std::endl;
-    }
-
-    std::string to_upper(const std::string &s) {
-        std::string uc(s);
-        size_t l = uc.size();
-        for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
-        return uc;
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) override {
-        std::cout << "Received " << m.body() << std::endl;
-        std::string reply_to = m.reply_to();
-        proton::message reply;
-        reply.to(reply_to);
-        reply.body(to_upper(proton::get<std::string>(m.body())));
-        reply.correlation_id(m.correlation_id());
-        if (!senders[reply_to])
-            senders[reply_to] = d.connection().open_sender(reply_to);
-        senders[reply_to].send(reply);
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("amqp://0.0.0.0:5672/examples");
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        server handler(address);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
deleted file mode 100644
index ffd80f9..0000000
--- a/examples/cpp/engine/simple_recv.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/receiver.hpp"
-#include "proton/value.hpp"
-#include "proton/message_id.hpp"
-#include "proton/delivery.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_recv : public proton::handler {
-  private:
-    proton::url url;
-    proton::receiver receiver;
-    uint64_t expected;
-    uint64_t received;
-  public:
-
-    simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
-
-    void on_connection_open(proton::connection &c) override {
-        receiver = c.open_receiver(url.path());
-        std::cout << "simple_recv listening on " << url << std::endl;
-    }
-
-    void on_message(proton::delivery& d, proton::message &msg) override {
-        if (msg.id().get<uint64_t>() < received)
-            return; // ignore duplicate
-        if (expected == 0 || received < expected) {
-            std::cout << msg.body() << std::endl;
-            received++;
-            if (received == expected) {
-                d.receiver().close();
-                d.connection().close();
-            }
-        }
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
-
-    try {
-        opts.parse();
-        simple_recv handler(address, message_count);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
deleted file mode 100644
index e08f39f..0000000
--- a/examples/cpp/engine/simple_send.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "options.hpp"
-
-#include "proton/io/socket.hpp"
-#include "proton/url.hpp"
-#include "proton/handler.hpp"
-#include "proton/connection.hpp"
-#include "proton/tracker.hpp"
-#include "proton/value.hpp"
-
-#include <iostream>
-#include <map>
-
-#include "../fake_cpp11.hpp"
-
-class simple_send : public proton::handler {
-  private:
-    proton::url url;
-    int sent;
-    int confirmed;
-    int total;
-  public:
-
-    simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
-
-    void on_connection_open(proton::connection &c) override {
-        c.open_sender(url.path());
-    }
-
-    void on_sendable(proton::sender &sender) override {
-        while (sender.credit() && sent < total) {
-            proton::message msg;
-            msg.id(sent + 1);
-            std::map<std::string, int> m;
-            m["sequence"] = sent+1;
-            msg.body(m);
-            sender.send(msg);
-            sent++;
-        }
-    }
-
-    void on_tracker_accept(proton::tracker &t) override {
-        confirmed++;
-        if (confirmed == total) {
-            std::cout << "all messages confirmed" << std::endl;
-            t.connection().close();
-        }
-    }
-
-    void on_transport_close(proton::transport &) override {
-        sent = confirmed;
-    }
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("127.0.0.1:5672/examples");
-    int message_count = 100;
-    options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
-    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
-    try {
-        opts.parse();
-        simple_send handler(address, message_count);
-        proton::io::socket::engine(address, handler).run();
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example/socket_windows.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/example/socket_windows.cpp b/examples/cpp/example/socket_windows.cpp
new file mode 100644
index 0000000..f312525
--- /dev/null
+++ b/examples/cpp/example/socket_windows.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+
+#include <proton/io/socket.hpp>
+#include <proton/url.hpp>
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+namespace proton {
+namespace io {
+namespace socket {
+
+const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
+
+std::string error_str() {
+    HRESULT code = WSAGetLastError();
+    char err[1024] = {0};
+    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+    return err;
+}
+
+io_error::io_error(const std::string& s) : error(s) {}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result == SOCKET_ERROR)
+        throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result)
+        throw io_error(msg + gai_strerror(result));
+}
+
+} // namespace
+
+void initialize() {
+    WSADATA unused;
+    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
+}
+
+void finalize() {
+    WSACleanup();
+}
+
+void engine::init() {
+    u_long nonblock = 1;
+    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
+}
+
+engine::engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+engine::engine(const url& u, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+    connection().open();
+}
+
+engine::~engine() {}
+
+void engine::read() {
+    mutable_buffer rbuf = read_buffer();
+    if (rbuf.size > 0) {
+        int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
+        if (n > 0)
+            read_done(n);
+        else if (n == 0)
+            read_close();
+        else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+            close(error_condition("io_error", error_str()));
+    }
+}
+
+void engine::write() {
+    const_buffer wbuf = write_buffer();
+    if (wbuf.size > 0) {
+    int n = ::send(socket_, wbuf.data, wbuf.size, 0);
+    if (n > 0)
+        write_done(n);
+    else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
+        close(error_condition("io_error", error_str()));
+    }
+}
+
+void engine::run() {
+    while (dispatch()) {
+        fd_set rd, wr;
+        FD_ZERO(&rd);
+        if (read_buffer().size)
+            FD_SET(socket_, &rd);
+        FD_ZERO(&wr);
+        if (write_buffer().size)
+            FD_SET(socket_, &wr);
+        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
+        if (n < 0) {
+            close(error_condition("select: ", error_str()));
+            break;
+        }
+        if (FD_ISSET(socket_, &rd)) {
+            read();
+        }
+        if (FD_ISSET(socket_, &wr))
+            write();
+    }
+    ::closesocket(socket_);
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+
+static const char *amqp_service(const char *port) {
+  // Help older Windows to know about amqp[s] ports
+  if (port) {
+    if (!strcmp("amqp", port)) return "5672";
+    if (!strcmp("amqps", port)) return "5671";
+  }
+  return port;
+}
+}
+
+
+descriptor connect(const proton::url& u) {
+    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
+    descriptor fd = INVALID_SOCKET;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
+                                0, &addr.ptr),
+                  "connect address invalid: ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd != INVALID_SOCKET) ::closesocket(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
+        bool yes = true;
+        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
+        check(::listen(socket_, 32), "listener listen: ");
+    } catch (...) {
+        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::closesocket(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_storage addr;
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index d228d67..38a5154 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -131,60 +131,40 @@ class Proc(Popen):
             raise ProcError(self, "timeout waiting for exit")
 
 
-def count_tests(cls):
-    methods = inspect.getmembers(cls, predicate=inspect.ismethod)
-    tests = [ i for i,j in methods if i.startswith('test_') ]
-    return len(tests)
-
-class CompatSetupClass(object):
-    # Roughly provides setUpClass and tearDownClass functionality for older python versions
-    # in our test scenarios
-    def __init__(self, target):
-        self.completed = False
-        self.test_count = count_tests(target)
-        self.target = target
-        self.global_setup = False
-
-    def note_setup(self):
-        if not self.global_setup:
-            self.global_setup = True
-            self.target.setup_class()
-
-    def note_teardown(self):
-        self.test_count -=  1
-        if self.test_count == 0:
-            self.completed = True
-            self.target.teardown_class()
-        
-
-class ExampleTestCase(unittest.TestCase):
-
-    @classmethod
-    def setup_class(cls):
-        pass
-
-    @classmethod
-    def teardown_class(cls):
-        pass
-
-    def completed(self):
-        cls = self.__class__
-        return cls.compat_ and cls.compat_.completed
-
+if hasattr(unittest.TestCase, 'setUpClass') and  hasattr(unittest.TestCase, 'tearDownClass'):
+    TestCase = unittest.TestCase
+else:
+    class TestCase(unittest.TestCase):
+        """
+        Roughly provides setUpClass and tearDownClass functionality for older python
+        versions in our test scenarios. If subclasses override setUp or tearDown
+        they *must* call the superclass.
+        """
+        def setUp(self):
+            if not hasattr(type(self), '_setup_class_count'):
+                type(self)._setup_class_count = len(
+                    inspect.getmembers(
+                        type(self),
+                        predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+                type(self).setUpClass()
+
+        def tearDown(self):
+            self.assertTrue(self._setup_class_count > 0)
+            self._setup_class_count -=  1
+            if self._setup_class_count == 0:
+                type(self).tearDownClass()
+
+
+class ExampleTestCase(TestCase):
+    """TestCase that manages started processes"""
     def setUp(self):
-        cls = self.__class__
-        if not hasattr(cls, "compat_"):
-            cls.compat_ = CompatSetupClass(cls)
-        if cls.compat_.completed:
-            # Last test for this class already seen.
-            raise Exception("Test sequencing error")
-        cls.compat_.note_setup()
+        super(ExampleTestCase, self).setUp()
         self.procs = []
 
     def tearDown(self):
         for p in self.procs:
             p.safe_kill()
-        self.__class__.compat_.note_teardown()
+        super(ExampleTestCase, self).tearDown()
 
     def proc(self, *args, **kwargs):
         p = Proc(*args, **kwargs)
@@ -194,27 +174,26 @@ class ExampleTestCase(unittest.TestCase):
 class BrokerTestCase(ExampleTestCase):
     """
     ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+    Subclasses must set `broker_exe` class variable with the name of the broker executable.
     """
 
-    # setUpClass not available until 2.7
     @classmethod
-    def setup_class(cls):
+    def setUpClass(cls):
         cls.addr = pick_addr() + "/examples"
-        cls.broker = Proc(["broker", "-a", cls.addr], ready="listening")
+        cls.broker = None       # In case Proc throws, create the attribute.
+        cls.broker = Proc([cls.broker_exe, "-a", cls.addr], ready="listening")
         cls.broker.wait_ready()
 
-    # tearDownClass not available until 2.7
     @classmethod
-    def teardown_class(cls):
-        cls.broker.safe_kill()
+    def tearDownClass(cls):
+        if cls.broker: cls.broker.safe_kill()
 
     def tearDown(self):
+        b = type(self).broker
+        if b and b.poll() !=  None: # Broker crashed
+            type(self).setUpClass() # Start another for the next test.
+            raise ProcError(b, "broker crash")
         super(BrokerTestCase, self).tearDown()
-        if not self.completed():
-            b = type(self).broker
-            if b.poll() !=  None: # Broker crashed
-                type(self).setUpClass() # Start another for the next test.
-                raise ProcError(b, "broker crash")
 
 
 CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES
@@ -230,6 +209,8 @@ def recv_expect(name, addr):
 class ContainerExampleTest(BrokerTestCase):
     """Run the container examples, verify they behave as expected."""
 
+    broker_exe = "broker"
+
     def test_helloworld(self):
         self.assertEqual('Hello World!\n', self.proc(["helloworld", self.addr]).wait_exit())
 
@@ -341,8 +322,8 @@ Hello World!
         self.assertEqual(expect_found, True)
 
 
-class ConnectionEngineExampleTest(BrokerTestCase):
-    """Run the connction_engine examples, verify they behave as expected."""
+class EngineTestCase(BrokerTestCase):
+    """Run selected clients to test a connction_engine broker."""
 
     def test_helloworld(self):
         self.assertEqual('Hello World!\n',
@@ -380,5 +361,8 @@ class ConnectionEngineExampleTest(BrokerTestCase):
         self.assertEqual(CLIENT_EXPECT,
                          self.proc(["client", "-a", self.addr]).wait_exit())
 
+class MtBrokerTest(EngineTestCase):
+    broker_exe = "mt_broker"
+
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
new file mode 100644
index 0000000..48738c9
--- /dev/null
+++ b/examples/cpp/mt/broker.cpp
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "../options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/controller.hpp>
+#include <proton/delivery.hpp>
+#include <proton/handler.hpp>
+#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <functional>
+#include <iostream>
+#include <mutex>
+#include <thread>
+
+// Thread safe queue.
+// Stores messages, notifies subscribed connections when there is data.
+class queue {
+  public:
+    queue(const std::string& name) : name_(name) {}
+
+    std::string name() const { return name_; }
+
+    // Push a message onto the queue.
+    // If the queue was previously empty, notify subscribers it has messages.
+    // Called from receiver's connection.
+    void push(const proton::message &m) {
+        std::lock_guard<std::mutex> g(lock_);
+        messages_.push_back(m);
+        if (messages_.size() == 1) { // Non-empty, notify subscribers
+            for (auto cb : callbacks_)
+                cb(this);
+            callbacks_.clear();
+        }
+    }
+
+    // If the queue is not empty, pop a message into m and return true.
+    // Otherwise save callback to be called when there are messages and return false.
+    // Called from sender's connection.
+    bool pop(proton::message& m, std::function<void(queue*)> callback) {
+        std::lock_guard<std::mutex> g(lock_);
+        if (messages_.empty()) {
+            callbacks_.push_back(callback);
+            return false;
+        } else {
+            m = std::move(messages_.front());
+            messages_.pop_front();
+            return true;
+        }
+    }
+
+  private:
+    const std::string name_;
+    std::mutex lock_;
+    std::deque<proton::message> messages_;
+    std::vector<std::function<void(queue*)> > callbacks_;
+};
+
+/// Thread safe map of queues.
+class queues {
+  public:
+    queues() : next_id_(0) {}
+
+    // Get or create the named queue.
+    queue* get(const std::string& name) {
+        std::lock_guard<std::mutex> g(lock_);
+        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
+        if (!i->second)
+            i->second.reset(new queue(name));
+        return i->second.get();
+    }
+
+    // Create a dynamic queue with a unique name.
+    queue* dynamic() {
+        std::ostringstream os;
+        os << "_dynamic_" << next_id_++;
+        return get(os.str());
+    }
+
+  private:
+    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
+
+    std::mutex lock_;
+    queue_map queues_;
+    std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs.
+};
+
+/// Broker connection handler. Things to note:
+///
+/// Each handler manages a single connection. Proton AMQP callbacks and queue
+/// callbacks via proton::work_queue are serialized per-connection, so the
+/// handler does not need a lock. Handlers for different connections can be
+/// called concurrently.
+///
+/// Senders (aka subscriptions) need some cross-thread notification:.
+///
+/// - a sender that gets credit calls queue::pop() in `on_sendable()`
+///   - on success it sends the message immediatly.
+///   - on queue empty, the sender is added to the `blocked_` set and the queue stores a callback.
+/// - when a receiver thread pushes a message, the queue calls its callbacks.
+/// - the callback causes a serialized call to has_messages() which re-tries all `blocked_` senders.
+///
+class broker_connection_handler : public proton::handler {
+  public:
+    broker_connection_handler(queues& qs) : queues_(qs) {}
+
+    void on_connection_open(proton::connection& c) override {
+        // Create the has_messages callback for use with queue subscriptions.
+        //
+        // Note the captured and bound arguments must be thread-safe to copy,
+        // shared_ptr<work_queue>, and plain pointers this and q are all safe.
+        //
+        // The proton::connection object c is not thread-safe to copy.
+        // However when the work_queue calls this->has_messages it will be safe
+        // to use any proton objects associated with c again.
+        auto work = proton::work_queue::get(c);
+        has_messages_callback_ = [this, work](queue* q) {
+            work->push(std::bind(&broker_connection_handler::has_messages, this, q));
+        };
+        c.open();               // Always accept
+    }
+
+    // A sender sends messages from a queue to a subscriber.
+    void on_sender_open(proton::sender &sender) override {
+        queue *q = sender.source().dynamic() ?
+            queues_.dynamic() : queues_.get(sender.source().address());
+        std::cout << "sending from " << q->name() << std::endl;
+    }
+
+    // We have credit to send a message.
+    void on_sendable(proton::sender &s) override {
+        queue* q = sender_queue(s);
+        if (!do_send(q, s))     // Queue is empty, save ourselves in the blocked set.
+            blocked_.insert(std::make_pair(q, s));
+    }
+
+    // A receiver receives messages from a publisher to a queue.
+    void on_receiver_open(proton::receiver &receiver) override {
+        std::string qname = receiver.target().address();
+        if (qname == "shutdown") {
+            std::cout << "broker shutting down" << std::endl;
+            // Sending to the special "shutdown" queue stops the broker.
+            proton::controller::get(receiver.connection()).stop(
+                proton::error_condition("shutdown", "stop broker"));
+        } else {
+            std::cout << "receiving to " << qname << std::endl;
+        }
+    }
+
+    // A message is received.
+    void on_message(proton::delivery &d, proton::message &m) override {
+        std::string qname = d.receiver().target().address();
+        queues_.get(qname)->push(m);
+    }
+
+    void on_session_close(proton::session &session) override {
+        // Erase all blocked senders that belong to session.
+        auto predicate = [session](const proton::sender& s) {
+            return s.session() == session;
+        };
+        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
+    }
+
+    void on_sender_close(proton::sender &sender) override {
+        // Erase sender from the blocked set.
+        auto range = blocked_.equal_range(sender_queue(sender));
+        auto predicate = [sender](const proton::sender& s) { return s == sender; };
+        erase_sender_if(range.first, range.second, predicate);
+    }
+
+    // The controller calls on_transport_close() last.
+    void on_transport_close(proton::transport&) override {
+        delete this;            // All done.
+    }
+
+  private:
+    typedef std::multimap<queue*, proton::sender> blocked_map;
+
+    // Get the queue associated with a sender.
+    queue* sender_queue(const proton::sender& s) {
+        return queues_.get(s.source().address()); // Thread safe.
+    }
+
+    // Only called if we have credit. Return true if we sent a message.
+    bool do_send(queue* q, proton::sender &s) {
+        proton::message m;
+        bool popped =  q->pop(m, has_messages_callback_);
+        if (popped)
+            s.send(m);
+        /// if !popped the queue has saved the callback for later.
+        return popped;
+    }
+
+    // Called via @ref work_queue when q has messages. Try all the blocked senders.
+    void has_messages(queue* q) {
+        auto range = blocked_.equal_range(q);
+        for (auto i = range.first; i != range.second;) {
+            if (i->second.credit() <= 0 || do_send(q, i->second))
+                i = blocked_.erase(i); // No credit or send was successful, stop blocked.
+            else
+                ++i;            // have credit, didn't send, keep blocked
+        }
+    }
+
+    // Use to erase closed senders from blocked_ set.
+    template <class Predicate>
+    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
+        for (auto i = begin; i != end; ) {
+            if (p(i->second))
+                i = blocked_.erase(i);
+            else
+                ++i;
+        }
+    }
+
+    queues& queues_;
+    blocked_map blocked_;
+    std::function<void(queue*)> has_messages_callback_;
+    proton::connection connection_;
+};
+
+
+class broker {
+  public:
+    broker(const std::string addr) : controller_(proton::controller::create()) {
+        controller_->options(proton::connection_options().container_id("mt_broker"));
+        std::cout << "broker listening on " << addr << std::endl;
+        controller_->listen(addr, std::bind(&broker::new_handler, this));
+    }
+
+    void run() {
+        for(int i = 0; i < std::thread::hardware_concurrency(); ++i)
+            std::thread(&proton::controller::run, controller_.get()).detach();
+        controller_->wait();
+    }
+
+  private:
+    proton::handler* new_handler() {
+        return new broker_connection_handler(queues_);
+    }
+
+    queues queues_;
+    std::unique_ptr<proton::controller> controller_;
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("0.0.0.0");
+    example::options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker(address).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << "broker shutdown: " << e.what() << std::endl;
+    }
+    return 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message