qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [05/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree
Date Tue, 03 Jul 2018 22:12:54 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/flow_control.cpp b/cpp/examples/flow_control.cpp
new file mode 100644
index 0000000..c74070c
--- /dev/null
+++ b/cpp/examples/flow_control.cpp
@@ -0,0 +1,261 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+namespace {
+
+bool verbose = true;
+
+void verify(bool success, const std::string &msg) {
+    if (!success)
+        throw std::runtime_error("example failure:" + msg);
+    else {
+        std::cout << "success: " << msg << std::endl;
+        if (verbose) std::cout << std::endl;
+    }
+}
+
+}
+
+// flow_sender manages the incoming connection and acts as the message sender.
+class flow_sender : public proton::messaging_handler {
+  private:
+    int available;  // Number of messages the sender may send assuming sufficient credit.
+    int sequence;
+
+  public:
+    flow_sender() : available(0), sequence(0) {}
+
+    void send_available_messages(proton::sender &s) {
+        for (int i = sequence; available && s.credit() > 0; i++) {
+            std::ostringstream mbody;
+            mbody << "flow_sender message " << sequence++;
+            proton::message m(mbody.str());
+            s.send(m);
+            available--;
+        }
+    }
+
+    void on_sendable(proton::sender &s) OVERRIDE {
+        if (verbose)
+            std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
+                      << " and " << available << " available messages" << std::endl;
+        send_available_messages(s);
+    }
+
+    void on_sender_drain_start(proton::sender &s) OVERRIDE {
+        if (verbose)
+            std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
+                      << " and " << available << " available messages" << std::endl;
+        send_available_messages(s);
+        if (s.credit()) {
+            s.return_credit(); // return the rest
+        }
+    }
+
+    void set_available(int n) { available = n; }
+};
+
+class flow_receiver : public proton::messaging_handler {
+  public:
+    int stage;
+    int received;
+    flow_sender &sender;
+
+    flow_receiver(flow_sender &s) : stage(0), received(0), sender(s) {}
+
+    void example_setup(int n) {
+        received = 0;
+        sender.set_available(n);
+    }
+
+    void run_stage(proton::receiver &r, const std::string &caller) {
+        // Serialize the progression of the flow control examples.
+        switch (stage) {
+        case 0:
+            if (verbose) std::cout << "Example 1.  Simple use of credit." << std::endl;
+            // TODO: add timeout callbacks, show no messages until credit.
+            example_setup(2);
+            r.add_credit(2);
+            break;
+        case 1:
+            if (r.credit() > 0) return;
+            verify(received == 2, "Example 1: simple credit");
+
+            if (verbose) std::cout << "Example 2.   Use basic drain, sender has 3 \"immediate\" messages." << std::endl;
+            example_setup(3);
+            r.add_credit(5); // ask for up to 5
+            r.drain();       // but only use what's available
+            break;
+        case 2:
+            if (caller == "on_message") return;
+            if (caller == "on_receiver_drain_finish") {
+                // Note that unused credit of 2 at sender is returned and is now 0.
+                verify(received == 3 && r.credit() == 0, "Example 2: basic drain");
+
+                if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl;
+                example_setup(0);
+                r.drain();
+                break;
+            }
+            verify(false, "example 2 run_stage");
+            return;
+
+        case 3:
+            verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit");
+
+            if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl;
+            example_setup(25);
+            r.add_credit(10);
+            break;
+
+        case 4:
+            if (received < 25) {
+                // Top up credit as needed.
+                uint32_t credit = r.credit();
+                if (credit <= 3) {
+                    uint32_t new_credit = 10;
+                    uint32_t remaining = 25 - received;
+                    if (new_credit > remaining)
+                        new_credit = remaining;
+                    if (new_credit > credit) {
+                        r.add_credit(new_credit - credit);
+                        if (verbose)
+                            std::cout << "flow_receiver adding credit for " << new_credit - credit
+                                      << " messages" << std::endl;
+                    }
+                }
+                return;
+            }
+
+            verify(received == 25 && r.credit() == 0, "Example 4: high/low watermark");
+            r.connection().close();
+            break;
+
+        default:
+            throw std::runtime_error("run_stage sequencing error");
+        }
+        stage++;
+    }
+
+    void on_receiver_open(proton::receiver &r) OVERRIDE {
+        run_stage(r, "on_receiver_open");
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
+        if (verbose)
+            std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
+        proton::receiver r(d.receiver());
+        received++;
+        run_stage(r, "on_message");
+    }
+
+    void on_receiver_drain_finish(proton::receiver &r) OVERRIDE {
+        if (verbose)
+            std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
+        run_stage(r, "on_receiver_drain_finish");
+    }
+};
+
+class flow_listener : public proton::listen_handler {
+    proton::connection_options opts;
+  public:
+    flow_listener(flow_sender& sh) {
+        opts.handler(sh);
+    }
+
+    void on_open(proton::listener& l) OVERRIDE {
+        std::ostringstream url;
+        url << "//:" << l.port() << "/example"; // Connect to the actual listening port
+        l.container().connect(url.str());
+    }
+
+    proton::connection_options on_accept(proton::listener&) OVERRIDE { return opts; }
+};
+
+class flow_control : public proton::messaging_handler {
+  private:
+    proton::listener listener;
+    flow_sender send_handler;
+    flow_receiver receive_handler;
+    flow_listener listen_handler;
+
+  public:
+    flow_control() : receive_handler(send_handler), listen_handler(send_handler) {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        // Listen on a dynamic port on the local host.
+        listener = c.listen("//:0", listen_handler);
+    }
+
+    void on_connection_open(proton::connection &c) OVERRIDE {
+        if (c.active()) {
+            // outbound connection
+            c.open_receiver("flow_example", proton::receiver_options().handler(receive_handler).credit_window(0));
+        }
+    }
+
+    void on_connection_close(proton::connection &) OVERRIDE {
+        listener.stop();
+    }
+};
+
+int main(int argc, char **argv) {
+    // Pick an "unusual" port since we are going to be talking to
+    // ourselves, not a broker.
+    bool quiet = false;
+
+    example::options opts(argc, argv);
+    opts.add_flag(quiet, 'q', "quiet", "suppress additional commentary of credit allocation and consumption");
+
+    try {
+        opts.parse();
+        if (quiet)
+            verbose = false;
+
+        flow_control fc;
+        proton::container(fc).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/helloworld.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/helloworld.cpp b/cpp/examples/helloworld.cpp
new file mode 100644
index 0000000..5962826
--- /dev/null
+++ b/cpp/examples/helloworld.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class hello_world : public proton::messaging_handler {
+    std::string conn_url_;
+    std::string addr_;
+
+  public:
+    hello_world(const std::string& u, const std::string& a) :
+        conn_url_(u), addr_(a) {}
+
+    void on_container_start(proton::container& c) OVERRIDE {
+        c.connect(conn_url_);
+    }
+
+    void on_connection_open(proton::connection& c) OVERRIDE {
+        c.open_receiver(addr_);
+        c.open_sender(addr_);
+    }
+
+    void on_sendable(proton::sender &s) OVERRIDE {
+        proton::message m("Hello World!");
+        s.send(m);
+        s.close();
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
+        std::cout << m.body() << std::endl;
+        d.connection().close();
+    }
+};
+
+int main(int argc, char **argv) {
+    try {
+        std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+        std::string addr = argc > 2 ? argv[2] : "examples";
+
+        hello_world hw(conn_url, addr);
+        proton::container(hw).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/message_properties.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/message_properties.cpp b/cpp/examples/message_properties.cpp
new file mode 100644
index 0000000..cb5c6b8
--- /dev/null
+++ b/cpp/examples/message_properties.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/message.hpp>
+#include <proton/types.hpp>
+#include <iostream>
+#include <map>
+
+int main(int argc, char **argv) {
+    try {
+        proton::message m;
+
+        // Setting properties: legal types are converted automatically to their
+        // AMQP counterpart.
+        m.properties().put("short", int16_t(123));
+        m.properties().put("string", "foo");
+        m.properties().put("symbol", proton::symbol("sym"));
+
+        // Examining properties using proton::get()
+
+        // 1 argument get<>() template specifies expected type of property.
+        std::string s = proton::get<std::string>(m.properties().get("string"));
+
+        // 2 argument get, property must have matching type to output argument.
+        int16_t i;
+        proton::get(m.properties().get("short"), i);
+
+        // Checking property types
+        proton::type_id type = m.properties().get("symbol").type();
+        if (type != proton::SYMBOL) {
+            throw std::logic_error("wrong type!");
+        }
+
+        // proton::scalar has its own ostream <<
+        std::cout << "using put/get:"
+                  << " short=" << i
+                  << " string=" << s
+                  << " symbol=" << m.properties().get("symbol")
+                  << std::endl;
+
+        // Converting properties to a convertible type
+        std::cout << "using coerce:"
+                  << " short(as long)="
+                  << proton::coerce<long>(m.properties().get("short"))
+                  << std::endl;
+
+        // Extract the properties as a std::map for more complex map operations.
+        // You can use other map and sequence types to hold a map, see @ref types_page
+        typedef std::map<std::string, proton::scalar> property_map;
+        property_map props;
+        proton::get(m.properties(), props);
+        for (property_map::iterator i = props.begin(); i != props.end(); ++i) {
+            std::cout << "props[" << i->first << "]=" << i->second << std::endl;
+        }
+        props["string"] = "bar";
+        props["short"] = 42;
+        // Update the properties in the message from the modified props map
+        m.properties() = props;
+
+        std::cout << "short=" << m.properties().get("short")
+                  << " string=" << m.properties().get("string")
+                  << std::endl;
+
+        // proton::get throws an exception if types do not match exactly.
+        try {
+            proton::get<uint32_t>(m.properties().get("short")); // bad: uint32_t != int16_t
+            throw std::logic_error("expected exception");
+        } catch (const proton::conversion_error& e) {
+            std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl;
+        }
+
+        // proton::coerce throws an exception if types are not convertible.
+        try {
+            proton::get<uint32_t>(m.properties().get("string"));  // bad: string to uint32_t
+            throw std::logic_error("expected exception");
+        } catch (const proton::conversion_error& e) {
+            std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl;
+        }
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << "unexpected exception: " << e.what() << std::endl;
+        return 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client.cpp b/cpp/examples/multithreaded_client.cpp
new file mode 100644
index 0000000..78085e2
--- /dev/null
+++ b/cpp/examples/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 or greater
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Lock output from threads to avoid scrambling
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+    // Invariant
+    const std::string url_;
+    const std::string address_;
+
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    std::queue<proton::message> messages_;
+    std::condition_variable messages_ready_;
+
+  public:
+    client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
+
+    // Thread safe
+    void send(const proton::message& msg) {
+        // Use [=] to copy the message, we cannot pass it by reference since it
+        // will be used in another thread.
+        work_queue()->add([=]() { sender_.send(msg); });
+    }
+
+    // Thread safe
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        while (messages_.empty()) messages_ready_.wait(l);
+        auto msg = std::move(messages_.front());
+        messages_.pop();
+        return msg;
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton handler thread
+
+    // Note: this example creates a connection when the container starts.
+    // To create connections after the container has started, use
+    // container::connect().
+    // See @ref multithreaded_client_flow_control.cpp for an example.
+    void on_container_start(proton::container& cont) override {
+        cont.connect(url_);
+    }
+
+    void on_connection_open(proton::connection& conn) override {
+        conn.open_sender(address_);
+        conn.open_receiver(address_);
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        // sender_ and work_queue_ must be set atomically
+        std::lock_guard<std::mutex> l(lock_);
+        sender_ = s;
+        work_queue_ = &s.work_queue();
+        sender_ready_.notify_all();
+    }
+
+    void on_message(proton::delivery& dlv, proton::message& msg) override {
+        std::lock_guard<std::mutex> l(lock_);
+        messages_.push(msg);
+        messages_ready_.notify_all();
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
+        exit(1);
+    }
+};
+
+int main(int argc, const char** argv) {
+    try {
+        if (argc != 4) {
+            std ::cerr <<
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n";
+            return 1;
+        }
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+
+        client cl(url, address);
+        proton::container container(cl);
+        std::thread container_thread([&]() { container.run(); });
+
+        std::thread sender([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    proton::message msg(std::to_string(i + 1));
+                    cl.send(msg);
+                    OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
+                }
+            });
+
+        int received = 0;
+        std::thread receiver([&]() {
+                for (int i = 0; i < n_messages; ++i) {
+                    auto msg = cl.receive();
+                    OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
+                    ++received;
+                }
+            });
+
+        sender.join();
+        receiver.join();
+        cl.close();
+        container_thread.join();
+        std::cout << received << " messages sent and received" << std::endl;
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..93c6b3d
--- /dev/null
+++ b/cpp/examples/multithreaded_client_flow_control.cpp
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 or greater
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Lock output from threads to avoid scrambling
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+    // Only used in proton handler thread
+    proton::sender sender_;
+
+    // Shared by proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue *work_queue_;
+    std::condition_variable sender_ready_;
+    int queued_;                       // Queued messages waiting to be sent
+    int credit_;                       // AMQP credit - number of messages we can send
+
+  public:
+    sender(proton::container& cont, const std::string& url, const std::string& address)
+        : work_queue_(0), queued_(0), credit_(0)
+    {
+        cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+    }
+
+    // Thread safe
+    void send(const proton::message& m) {
+        {
+            std::unique_lock<std::mutex> l(lock_);
+            // Don't queue up more messages than we have credit for
+            while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+            ++queued_;
+        }
+        work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+    }
+
+    // Thread safe
+    void close() {
+        work_queue()->add([=]() { sender_.connection().close(); });
+    }
+
+  private:
+
+    proton::work_queue* work_queue() {
+        // Wait till work_queue_ and sender_ are initialized.
+        std::unique_lock<std::mutex> l(lock_);
+        while (!work_queue_) sender_ready_.wait(l);
+        return work_queue_;
+    }
+
+    // == messaging_handler overrides, only called in proton handler thread
+
+    void on_sender_open(proton::sender& s) override {
+        // Make sure sender_ and work_queue_ are set atomically
+        std::lock_guard<std::mutex> l(lock_);
+        sender_ = s;
+        work_queue_ = &s.work_queue();
+    }
+
+    void on_sendable(proton::sender& s) override {
+        std::lock_guard<std::mutex> l(lock_);
+        credit_ = s.credit();
+        sender_ready_.notify_all(); // Notify senders we have credit
+    }
+
+    // work_queue work items is are automatically dequeued and called by proton
+    // This function is called because it was queued by send()
+    void do_send(const proton::message& m) {
+        sender_.send(m);
+        std::lock_guard<std::mutex> l(lock_);
+        --queued_;                    // work item was consumed from the work_queue
+        credit_ = sender_.credit();   // update credit
+        sender_ready_.notify_all();       // Notify senders we have space on queue
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
+        exit(1);
+    }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+    static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+    // Used in proton threads only
+    proton::receiver receiver_;
+
+    // Used in proton and user threads, protected by lock_
+    std::mutex lock_;
+    proton::work_queue* work_queue_;
+    std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+    std::condition_variable can_receive_; // Notify receivers of messages
+
+  public:
+
+    // Connect to url
+    receiver(proton::container& cont, const std::string& url, const std::string& address)
+        : work_queue_()
+    {
+        // NOTE:credit_window(0) disables automatic flow control.
+        // We will use flow control to match AMQP credit to buffer capacity.
+        cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+                           proton::connection_options().handler(*this));
+    }
+
+    // Thread safe receive
+    proton::message receive() {
+        std::unique_lock<std::mutex> l(lock_);
+        // Wait for buffered messages
+        while (!work_queue_ || buffer_.empty())
+            can_receive_.wait(l);
+        proton::message m = std::move(buffer_.front());
+        buffer_.pop();
+        // Add a lambda to the work queue to call receive_done().
+        // This will tell the handler to add more credit.
+        work_queue_->add([=]() { this->receive_done(); });
+        return m;
+    }
+
+    void close() {
+        std::lock_guard<std::mutex> l(lock_);
+        if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+    }
+
+  private:
+    // ==== The following are called by proton threads only.
+
+    void on_receiver_open(proton::receiver& r) override {
+        receiver_ = r;
+        std::lock_guard<std::mutex> l(lock_);
+        work_queue_ = &receiver_.work_queue();
+        receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+    }
+
+    void on_message(proton::delivery &d, proton::message &m) override {
+        // Proton automatically reduces credit by 1 before calling on_message
+        std::lock_guard<std::mutex> l(lock_);
+        buffer_.push(m);
+        can_receive_.notify_all();
+    }
+
+    // called via work_queue
+    void receive_done() {
+        // Add 1 credit, a receiver has taken a message out of the buffer.
+        receiver_.add_credit(1);
+    }
+
+    void on_error(const proton::error_condition& e) override {
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
+        exit(1);
+    }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n) {
+    auto id = std::this_thread::get_id();
+    for (int i = 0; i < n; ++i) {
+        std::ostringstream ss;
+        ss << std::this_thread::get_id() << "-" << i;
+        s.send(proton::message(ss.str()));
+        OUT(std::cout << id << " sent \"" << ss.str() << '"' << std::endl);
+    }
+    OUT(std::cout << id << " sent " << n << std::endl);
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining) {
+    auto id = std::this_thread::get_id();
+    int n = 0;
+    // atomically check and decrement remaining *before* receiving.
+    // If it is 0 or less then return, as there are no more
+    // messages to receive so calling r.receive() would block forever.
+    while (remaining-- > 0) {
+        auto m = r.receive();
+        ++n;
+        OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
+    }
+    OUT(std::cout << id << " received " << n << " messages" << std::endl);
+}
+
+int main(int argc, const char **argv) {
+    try {
+        if (argc != 5) {
+            std::cerr <<
+                "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to send\n"
+                "THREAD-COUNT: number of sender/receiver thread pairs\n";
+            return 1;
+        }
+
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int n_messages = atoi(argv[3]);
+        int n_threads = atoi(argv[4]);
+        int count = n_messages * n_threads;
+
+        // Total messages to be received, multiple receiver threads will decrement this.
+        std::atomic_int remaining;
+        remaining.store(count);
+
+        // Run the proton container
+        proton::container container;
+        auto container_thread = std::thread([&]() { container.run(); });
+
+        // A single sender and receiver to be shared by all the threads
+        sender send(container, url, address);
+        receiver recv(container, url, address);
+
+        // Start receiver threads, then sender threads.
+        // Starting receivers first gives all receivers a chance to compete for messages.
+        std::vector<std::thread> threads;
+        threads.reserve(n_threads*2); // Avoid re-allocation once threads are started
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
+        for (int i = 0; i < n_threads; ++i)
+            threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
+
+        // Wait for threads to finish
+        for (auto& t : threads) t.join();
+        send.close();
+        recv.close();
+        container_thread.join();
+        if (remaining > 0)
+            throw std::runtime_error("not all messages were received");
+        std::cout << count << " messages sent and received" << std::endl;
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/options.hpp
----------------------------------------------------------------------
diff --git a/cpp/examples/options.hpp b/cpp/examples/options.hpp
new file mode 100644
index 0000000..dab1bc2
--- /dev/null
+++ b/cpp/examples/options.hpp
@@ -0,0 +1,175 @@
+#ifndef OPTIONS_HPP
+#define OPTIONS_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+#include <sstream>
+#include <ostream>
+#include <vector>
+#include <stdexcept>
+
+namespace example {
+/** bad_option is thrown for option parsing errors */
+struct bad_option : public std::runtime_error {
+    bad_option(const std::string& s) : std::runtime_error(s) {}
+};
+
+/** Simple command-line option parser for example programs */
+class options {
+  public:
+
+    options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
+        size_t slash = prog_.find_last_of("/\\");
+        if (slash != std::string::npos)
+            prog_ = prog_.substr(slash+1); // Extract prog name from path
+        add_flag(help_, 'h', "help", "Print the help message");
+    }
+
+    ~options() {
+        for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
+            delete *i;
+    }
+
+    /** Updates value when parse() is called if option is present with a value. */
+    template<class T>
+    void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
+        opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
+    }
+
+    /** Sets flag when parse() is called if option is present. */
+    void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
+        opts_.push_back(new option_flag(flag, short_name, long_name, description));
+    }
+
+    /** Parse the command line, return the index of the first non-option argument.
+     *@throws bad_option if there is a parsing error or unknown option.
+     */
+    int parse() {
+        int arg = 1;
+        for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
+            opts::iterator i = opts_.begin();
+            while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
+                ++i;
+            if (i == opts_.end())
+                throw bad_option(std::string("unknown option ") + argv_[arg]);
+        }
+        if (help_) throw bad_option("");
+        return arg;
+    }
+
+    /** Print a usage message */
+  friend std::ostream& operator<<(std::ostream& os, const options& op) {
+      os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
+      os << std::endl << "options:" << std::endl;
+      for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
+          os << **i << std::endl;
+      return os;
+  }
+
+ private:
+    class option {
+      public:
+        option(char s, const std::string& l, const std::string& d, const std::string v) :
+            short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
+        virtual ~option() {}
+
+        virtual bool parse(int argc, char const * const * argv, int &i) = 0;
+        virtual void print_default(std::ostream&) const {}
+
+      friend std::ostream& operator<<(std::ostream& os, const option& op) {
+          os << "  " << op.short_;
+          if (!op.var_.empty()) os << " " << op.var_;
+          os << ", " << op.long_;
+          if (!op.var_.empty()) os << "=" << op.var_;
+          os << std::endl << "        " << op.desc_;
+          op.print_default(os);
+          return os;
+      }
+
+      protected:
+        std::string short_, long_, desc_, var_;
+    };
+
+    template <class T>
+    class option_value : public option {
+      public:
+        option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
+            option(s, l, d, v), value_(value) {}
+
+        bool parse(int argc, char const * const * argv, int &i) {
+            std::string arg(argv[i]);
+            if (arg == short_ || arg == long_) {
+                if (i < argc-1) {
+                    set_value(arg, argv[++i]);
+                    return true;
+                } else {
+                    throw bad_option("missing value for " + arg);
+                }
+            }
+            if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
+                set_value(long_, arg.substr(long_.size()+1));
+                return true;
+            }
+            return false;
+        }
+
+        virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
+
+        void set_value(const std::string& opt, const std::string& s) {
+            std::istringstream is(s);
+            is >> value_;
+            if (is.fail() || is.bad())
+                throw bad_option("bad value for " + opt + ": " + s);
+        }
+
+      private:
+        T& value_;
+    };
+
+    class option_flag: public option {
+      public:
+        option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
+            option(s, l, d, ""), flag_(flag)
+        { flag_ = false; }
+
+        bool parse(int /*argc*/, char const * const * argv, int &i) {
+            if (argv[i] == short_ || argv[i] == long_) {
+                flag_ = true;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+      private:
+        bool &flag_;
+    };
+
+    typedef std::vector<option*> opts;
+
+    int argc_;
+    char const * const * argv_;
+    std::string prog_;
+    opts opts_;
+    bool help_;
+};
+}
+
+#endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/queue_browser.cpp b/cpp/examples/queue_browser.cpp
new file mode 100644
index 0000000..b306e76
--- /dev/null
+++ b/cpp/examples/queue_browser.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/source_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class queue_browser : public proton::messaging_handler {
+    std::string conn_url_;
+    std::string addr_;
+
+  public:
+    queue_browser(const std::string& u, const std::string& a) :
+        conn_url_(u), addr_(a) {}
+
+    void on_container_start(proton::container& c) OVERRIDE {
+        proton::receiver_options ropts;
+        proton::source_options sopts;
+        ropts.source(sopts.distribution_mode(proton::source::COPY));
+
+        proton::connection conn = c.connect(conn_url_);
+        conn.open_receiver(addr_, ropts);
+    }
+
+    void on_message(proton::delivery&, proton::message& m) OVERRIDE {
+        std::cout << m.body() << std::endl;
+    }
+};
+
+int main(int argc, char** argv) {
+    try {
+        std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+        std::string addr = argc > 2 ? argv[2] : "examples";
+
+        queue_browser qb(conn_url, addr);
+        proton::container(qb).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/reconnect_client.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/reconnect_client.cpp b/cpp/examples/reconnect_client.cpp
new file mode 100644
index 0000000..ed93214
--- /dev/null
+++ b/cpp/examples/reconnect_client.cpp
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+#include <proton/value.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include "fake_cpp11.hpp"
+
+class reconnect_client : public proton::messaging_handler {
+    std::string url;
+    std::string address;
+    std::vector<std::string> failovers;
+    proton::sender sender;
+    int sent;
+    int expected;
+    int received;
+
+  public:
+    reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) :
+        url(u), address(a), failovers(f), sent(0), expected(c), received(0) {}
+
+  private:
+    void on_container_start(proton::container &c) OVERRIDE {
+        proton::connection_options co;
+        proton::reconnect_options ro;
+
+        ro.failover_urls(failovers);
+        co.reconnect(ro);
+        c.connect(url, co);
+    }
+
+    void on_connection_open(proton::connection & c) OVERRIDE {
+        c.open_receiver(address);
+        c.open_sender(address);
+        // reconnect we probably lost the last message sent
+        sent = received;
+        std::cout << "simple_recv listening on " << url << std::endl;
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+        if (proton::coerce<int>(msg.id()) < received) {
+            return; // Ignore duplicate
+        }
+
+        if (expected == 0 || received < expected) {
+            std::cout << msg.body() << std::endl;
+            received++;
+
+            if (received == expected) {
+                d.receiver().close();
+                sender.close();
+                d.connection().close();
+            } else {
+                // See if we can send any messages now
+                send(sender);
+            }
+        }
+    }
+
+    void send(proton::sender& s) {
+        // Only send with credit and only allow one outstanding message
+        while (s.credit() && sent < received+1) {
+            std::map<std::string, int> m;
+            m["sequence"] = sent + 1;
+
+            proton::message msg;
+            msg.id(sent + 1);
+            msg.body(m);
+
+            std::cout << "Sending: " << sent+1 << std::endl;
+            s.send(msg);
+            sent++;
+        }
+    }
+
+    void on_sender_open(proton::sender & s) OVERRIDE {
+        sender = s;
+    }
+
+    void on_sendable(proton::sender &s) OVERRIDE {
+        send(s);
+    }
+};
+
+int main(int argc, const char** argv) {
+    try {
+        if (argc < 4) {
+            std ::cerr <<
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n"
+                "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+                "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+                "MESSAGE-COUNT: number of messages to receive\n"
+                "FAILOVER_URL...: zero or more failover urls\n";
+            return 1;
+        }
+        const char *url = argv[1];
+        const char *address = argv[2];
+        int message_count = atoi(argv[3]);
+        std::vector<std::string> failovers(&argv[4], &argv[argc]);
+
+        reconnect_client client(url, address, message_count, failovers);
+        proton::container(client).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/scheduled_send.cpp b/cpp/examples/scheduled_send.cpp
new file mode 100644
index 0000000..3244540
--- /dev/null
+++ b/cpp/examples/scheduled_send.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/connection.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+class scheduled_sender : public proton::messaging_handler {
+  private:
+    std::string url;
+    proton::sender sender;
+    proton::duration interval, timeout;
+    proton::work_queue* work_queue;
+    bool ready, canceled;
+
+  public:
+
+    scheduled_sender(const std::string &s, double d, double t) :
+        url(s),
+        interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval.
+        timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
+        work_queue(0),
+        ready(true),            // Ready to send.
+        canceled(false)         // Canceled.
+    {}
+
+    // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context
+    // and must arrange lambdas for send and close to happen in the connection context.
+    void on_container_start(proton::container &c) OVERRIDE {
+        c.open_sender(url);
+    }
+
+    void on_sender_open(proton::sender &s) OVERRIDE {
+        sender = s;
+        work_queue = &s.work_queue();
+        // Call this->cancel after timeout.
+        s.container().schedule(timeout, [this]() { this->work_queue->add( [this]() { this->cancel(); }); });
+         // Start regular ticks every interval.
+        s.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); });
+    }
+
+    void cancel() {
+        canceled = true;
+        sender.connection().close();
+    }
+
+    void tick() {
+        // Schedule the next tick unless we have been cancelled.
+        if (!canceled)
+            sender.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); });
+        if (sender.credit() > 0) // Only send if we have credit
+            send();
+        else
+            ready = true;  // Set the ready flag, send as soon as we get credit.
+    }
+
+    void on_sendable(proton::sender &) OVERRIDE {
+        if (ready)              // We have been ticked since the last send.
+            send();
+    }
+
+    void send() {
+        std::cout << "send" << std::endl;
+        sender.send(proton::message("ping"));
+        ready = false;
+    }
+};
+
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    double interval = 1.0;
+    double timeout = 5.0;
+
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL");
+    opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+    try {
+        opts.parse();
+        scheduled_sender h(address, interval, timeout);
+        proton::container(h).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/scheduled_send_03.cpp b/cpp/examples/scheduled_send_03.cpp
new file mode 100644
index 0000000..9050429
--- /dev/null
+++ b/cpp/examples/scheduled_send_03.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/connection.hpp>
+#include <proton/duration.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+// This example uses only C++03 features.
+class scheduled_sender : public proton::messaging_handler {
+  private:
+    std::string url;
+    proton::duration interval, timeout;
+    proton::work_queue *work_queue;
+    bool ready, canceled;
+
+  public:
+    scheduled_sender(const std::string &s, double d, double t) :
+        url(s),
+        interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval.
+        timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
+        work_queue(0),
+        ready(true),            // Ready to send.
+        canceled(false)         // Canceled.
+    {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        c.open_sender(url);
+    }
+
+    void on_sender_open(proton::sender & s) OVERRIDE {
+        work_queue = &s.work_queue();
+
+        work_queue->schedule(timeout, make_work(&scheduled_sender::cancel, this, s));
+        work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, s));
+    }
+
+    void cancel(proton::sender sender) {
+        canceled = true;
+        sender.connection().close();
+    }
+
+    void tick(proton::sender sender) {
+        if (!canceled) {
+            work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, sender)); // Next tick
+            if (sender.credit() > 0) // Only send if we have credit
+                send(sender);
+            else
+                ready = true; // Set the ready flag, send as soon as we get credit.
+        }
+    }
+
+    void on_sendable(proton::sender &sender) OVERRIDE {
+        if (ready)              // We have been ticked since the last send.
+            send(sender);
+    }
+
+    void send(proton::sender& sender) {
+        std::cout << "send" << std::endl;
+        sender.send(proton::message("ping"));
+        ready = false;
+    }
+};
+
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    double interval = 1.0;
+    double timeout = 5.0;
+
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL");
+    opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+    try {
+        opts.parse();
+        scheduled_sender h(address, interval, timeout);
+        proton::container(h).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/selected_recv.cpp b/cpp/examples/selected_recv.cpp
new file mode 100644
index 0000000..a7f9cea
--- /dev/null
+++ b/cpp/examples/selected_recv.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/source_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+namespace {
+
+    // Example custom function to configure an AMQP filter,
+    // specifically an APACHE.ORG:SELECTOR
+    // (http://www.amqp.org/specification/1.0/filters)
+
+    void set_filter(proton::source_options &opts, const std::string& selector_str) {
+        proton::source::filter_map map;
+        proton::symbol filter_key("selector");
+        proton::value filter_value;
+        // The value is a specific AMQP "described type": binary string with symbolic descriptor
+        proton::codec::encoder enc(filter_value);
+        enc << proton::codec::start::described()
+            << proton::symbol("apache.org:selector-filter:string")
+            << selector_str
+            << proton::codec::finish();
+        // In our case, the map has this one element
+        map.put(filter_key, filter_value);
+        opts.filters(map);
+    }
+}
+
+
+class selected_recv : public proton::messaging_handler {
+    std::string conn_url_;
+    std::string addr_;
+
+  public:
+    selected_recv(const std::string& u, const std::string& a) :
+        conn_url_(u), addr_(a) {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        proton::source_options opts;
+        set_filter(opts, "colour = 'green'");
+        proton::connection conn = c.connect(conn_url_);
+        conn.open_receiver(addr_, proton::receiver_options().source(opts));
+    }
+
+    void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+        std::cout << m.body() << std::endl;
+    }
+};
+
+int main(int argc, char **argv) {
+    try {
+        std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+        std::string addr = argc > 2 ? argv[2] : "examples";
+
+        selected_recv recv(conn_url, addr);
+        proton::container(recv).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/server.cpp b/cpp/examples/server.cpp
new file mode 100644
index 0000000..8e177df
--- /dev/null
+++ b/cpp/examples/server.cpp
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <cctype>
+
+#include "fake_cpp11.hpp"
+
+class server : public proton::messaging_handler {
+    std::string conn_url_;
+    std::string addr_;
+    proton::connection conn_;
+    std::map<std::string, proton::sender> senders_;
+
+  public:
+    server(const std::string& u, const std::string& a) :
+        conn_url_(u), addr_(a) {}
+
+    void on_container_start(proton::container& c) OVERRIDE {
+        conn_ = c.connect(conn_url_);
+        conn_.open_receiver(addr_);
+
+        std::cout << "Server connected to " << conn_url_ << std::endl;
+    }
+
+    std::string to_upper(const std::string& s) {
+        std::string uc(s);
+        size_t l = uc.size();
+
+        for (size_t i=0; i<l; i++) {
+            uc[i] = static_cast<char>(std::toupper(uc[i]));
+        }
+
+        return uc;
+    }
+
+    void on_message(proton::delivery&, proton::message& m) OVERRIDE {
+        std::cout << "Received " << m.body() << std::endl;
+
+        std::string reply_to = m.reply_to();
+        proton::message reply;
+
+        reply.to(reply_to);
+        reply.body(to_upper(proton::get<std::string>(m.body())));
+        reply.correlation_id(m.correlation_id());
+
+        if (!senders_[reply_to]) {
+            senders_[reply_to] = conn_.open_sender(reply_to);
+        }
+
+        senders_[reply_to].send(reply);
+    }
+};
+
+int main(int argc, char** argv) {
+    try {
+        std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+        std::string addr = argc > 2 ? argv[2] : "examples";
+
+        server srv(conn_url, addr);
+        proton::container(srv).run();
+
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server_direct.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/server_direct.cpp b/cpp/examples/server_direct.cpp
new file mode 100644
index 0000000..d46fc29
--- /dev/null
+++ b/cpp/examples/server_direct.cpp
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <sstream>
+#include <cctype>
+
+#include "fake_cpp11.hpp"
+
+class server : public proton::messaging_handler {
+  private:
+    class listener_ready_handler : public proton::listen_handler {
+        void on_open(proton::listener& l) OVERRIDE {
+            std::cout << "listening on " << l.port() << std::endl;
+        }
+    };
+
+    typedef std::map<std::string, proton::sender> sender_map;
+    listener_ready_handler listen_handler;
+    std::string url;
+    sender_map senders;
+    int address_counter;
+
+  public:
+    server(const std::string &u) : url(u), address_counter(0) {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        c.listen(url, listen_handler);
+    }
+
+    std::string to_upper(const std::string &s) {
+        std::string uc(s);
+        size_t l = uc.size();
+
+        for (size_t i=0; i<l; i++)
+            uc[i] = static_cast<char>(std::toupper(uc[i]));
+
+        return uc;
+    }
+
+    std::string generate_address() {
+        std::ostringstream addr;
+        addr << "server" << address_counter++;
+
+        return addr.str();
+    }
+
+    void on_sender_open(proton::sender &sender) OVERRIDE {
+        if (sender.source().dynamic()) {
+            std::string addr = generate_address();
+            sender.open(proton::sender_options().source(proton::source_options().address(addr)));
+            senders[addr] = sender;
+        }
+    }
+
+    void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+        std::cout << "Received " << m.body() << std::endl;
+
+        std::string reply_to = m.reply_to();
+        sender_map::iterator it = senders.find(reply_to);
+
+        if (it == senders.end()) {
+            std::cout << "No link for reply_to: " << reply_to << std::endl;
+        } else {
+            proton::sender sender = it->second;
+            proton::message reply;
+
+            reply.to(reply_to);
+            reply.body(to_upper(proton::get<std::string>(m.body())));
+            reply.correlation_id(m.correlation_id());
+
+            sender.send(reply);
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    std::string address("amqp://127.0.0.1:5672/examples");
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
+
+    try {
+        opts.parse();
+
+        server srv(address);
+        proton::container(srv).run();
+
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/service_bus.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/service_bus.cpp b/cpp/examples/service_bus.cpp
new file mode 100644
index 0000000..c99bca6
--- /dev/null
+++ b/cpp/examples/service_bus.cpp
@@ -0,0 +1,322 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Service Bus example.
+ *
+ * This is an example of using "Service Bus sessions" (not the same thing as an
+ * AMQP session) to selectively retrieve messages from a queue.  The queue must
+ * be configured within Service Bus to support sessions.  Service Bus uses the
+ * AMQP group_id message property to associate messages with a particular
+ * Service Bus session.  It uses AMQP filters to specify which session is
+ * associated with a receiver.
+ *
+ * The mechanics for sending and receiving to other types of service bus queue
+ * are broadly the same, as long as the step using the
+ * receiver.source().filters() is omitted.
+ *
+ * Other Service Bus notes: There is no drain support, hence the need to to use
+ * timeouts in this example to detect the end of the message stream.  There is
+ * no browse support when setting the AMQP link distribution mode to COPY.
+ * Service Bus claims to support browsing, but it is unclear how to manage that
+ * with an AMQP client.  Maximum message sizes (for body and headers) vary
+ * between queue types and fee tier ranging from 64KB to 1MB.  Due to the
+ * distributed nature of Service Bus, queues do not automatically preserve FIFO
+ * order of messages unless the user takes steps to force the message stream to
+ * a single partition of the queue or creates the queue with partitioning disabled.
+ *
+ * This example shows use of the simpler SAS (Shared Access Signature)
+ * authentication scheme where the credentials are supplied on the connection.
+ * Service Bus does not actually check these credentials when setting up the
+ * connection, it merely caches the SAS key and policy (AKA key name) for later
+ * access authorization when creating senders and receivers.  There is a second
+ * authentication scheme that allows for multiple tokens and even updating them
+ * within a long-lived connection which uses special management request-response
+ * queues in Service Bus.  The format of this exchange may be documented
+ * somewhere but is also available by working through the CbsAsyncExample.cs
+ * program in the Amqp.Net Lite project.
+ *
+ * The sample output for this program is:
+
+   sent message: message 0 in service bus session "red"
+   sent message: message 1 in service bus session "green"
+   sent message: message 2 in service bus session "blue"
+   sent message: message 3 in service bus session "red"
+   sent message: message 4 in service bus session "black"
+   sent message: message 5 in service bus session "blue"
+   sent message: message 6 in service bus session "yellow"
+receiving messages with session identifier "green" from queue ses_q1
+   received message: message 1 in service bus session "green"
+receiving messages with session identifier "red" from queue ses_q1
+   received message: message 0 in service bus session "red"
+   received message: message 3 in service bus session "red"
+receiving messages with session identifier "blue" from queue ses_q1
+   received message: message 2 in service bus session "blue"
+   received message: message 5 in service bus session "blue"
+receiving messages with session identifier "black" from queue ses_q1
+   received message: message 4 in service bus session "black"
+receiving messages with session identifier "yellow" from queue ses_q1
+   received message: message 6 in service bus session "yellow"
+Done. No more messages.
+
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+using proton::source_options;
+using proton::connection_options;
+using proton::sender_options;
+using proton::receiver_options;
+
+void do_next_sequence();
+
+namespace {
+void check_arg(const std::string &value, const std::string &name) {
+    if (value.empty())
+        throw std::runtime_error("missing argument for \"" + name + "\"");
+}
+}
+
+/// Connect to Service Bus queue and retrieve messages in a particular session.
+class session_receiver : public proton::messaging_handler {
+  private:
+    const std::string &connection_url;
+    const std::string &entity;
+    proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier
+    int message_count;
+    bool closed;
+    proton::duration read_timeout;
+    proton::timestamp last_read;
+    proton::container *container;
+    proton::receiver receiver;
+
+  public:
+    session_receiver(const std::string &c, const std::string &e,
+                     const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
+        if (sid)
+            session_identifier = std::string(sid);
+        // session_identifier is now either empty/null or an AMQP string type.
+        // If null, Service Bus will pick the first available message and create
+        // a filter at its end with that message's session identifier.
+        // Technically, an AMQP string is not a valid filter-set value unless it
+        // is annotated as an AMQP described type, so this may change.
+
+    }
+
+    void run (proton::container &c) {
+        message_count = 0;
+        closed = false;
+        c.connect(connection_url, connection_options().handler(*this));
+        container = &c;
+    }
+
+    void on_connection_open(proton::connection &connection) OVERRIDE {
+        proton::source::filter_map sb_filter_map;
+        proton::symbol key("com.microsoft:session-filter");
+        sb_filter_map.put(key, session_identifier);
+        receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));
+
+        // Start timeout processing here.  If Service Bus has no pending
+        // messages, it may defer completing the receiver open until a message
+        // becomes available (e.g. to be able to set the actual session
+        // identifier if none was specified).
+        last_read = proton::timestamp::now();
+        // Call this->process_timeout after read_timeout.
+        container->schedule(read_timeout, [this]() { this->process_timeout(); });
+    }
+
+    void on_receiver_open(proton::receiver &r) OVERRIDE {
+        if (closed) return; // PROTON-1264
+        proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter");
+        std::cout << "receiving messages with session identifier \"" << actual_session_id
+                  << "\" from queue " << entity << std::endl;
+        last_read = proton::timestamp::now();
+    }
+
+    void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+        message_count++;
+        std::cout << "   received message: " << m.body() << std::endl;
+        last_read = proton::timestamp::now();
+    }
+
+    void process_timeout() {
+        proton::timestamp deadline = last_read + read_timeout;
+        proton::timestamp now = proton::timestamp::now();
+        if (now >= deadline) {
+            receiver.close();
+            closed = true;
+            receiver.connection().close();
+            if (message_count)
+                do_next_sequence();
+            else
+                std::cout << "Done. No more messages." << std::endl;
+        } else {
+            proton::duration next = deadline - now;
+            container->schedule(next, [this]() { this->process_timeout(); });
+        }
+    }
+};
+
+
+/// Connect to Service Bus queue and send messages divided into different sessions.
+class session_sender : public proton::messaging_handler {
+  private:
+    const std::string &connection_url;
+    const std::string &entity;
+    int msg_count;
+    int total;
+    int accepts;
+
+  public:
+    session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e),
+                                                                 msg_count(0), total(7), accepts(0) {}
+
+    void run(proton::container &c) {
+        c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this));
+    }
+
+    void send_remaining_messages(proton::sender &s) {
+        std::string gid;
+        for (; msg_count < total && s.credit() > 0; msg_count++) {
+            switch (msg_count) {
+            case 0: gid = "red"; break;
+            case 1: gid = "green"; break;
+            case 2: gid = "blue"; break;
+            case 3: gid = "red"; break;
+            case 4: gid = "black"; break;
+            case 5: gid = "blue"; break;
+            case 6: gid = "yellow"; break;
+            }
+
+            std::ostringstream mbody;
+            mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
+            proton::message m(mbody.str());
+            m.group_id(gid);  // Service Bus uses the group_id property to as the session identifier.
+            s.send(m);
+            std::cout << "   sent message: " << m.body() << std::endl;
+        }
+    }
+
+    void on_sendable(proton::sender &s) OVERRIDE {
+        send_remaining_messages(s);
+    }
+
+    void on_tracker_accept(proton::tracker &t) OVERRIDE {
+        accepts++;
+        if (accepts == total) {
+            // upload complete
+            t.sender().close();
+            t.sender().connection().close();
+            do_next_sequence();
+        }
+    }
+};
+
+
+/// Orchestrate the sequential actions of sending and receiving session-based messages.
+class sequence : public proton::messaging_handler {
+  private:
+    proton::container *container;
+    int sequence_no;
+    session_sender snd;
+    session_receiver rcv_red, rcv_green, rcv_null;
+
+  public:
+    static sequence *the_sequence;
+
+    sequence (const std::string &c, const std::string &e) :
+        container(0), sequence_no(0),
+        snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) {
+        the_sequence = this;
+    }
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        container = &c;
+        next_sequence();
+    }
+
+    void next_sequence() {
+        switch (sequence_no++) {
+        // run these in order exactly once
+        case 0: snd.run(*container); break;
+        case 1: rcv_green.run(*container); break;
+        case 2: rcv_red.run(*container); break;
+        // Run this until the receiver decides there is no messages left to sequence through
+        default: rcv_null.run(*container); break;
+        }
+    }
+};
+
+sequence *sequence::the_sequence = NULL;
+
+void do_next_sequence() { sequence::the_sequence->next_sequence(); }
+
+
+int main(int argc, char **argv) {
+    std::string sb_namespace; // i.e. "foo.servicebus.windows.net"
+    // Make sure the next two are urlencoded for Proton
+    std::string sb_key_name;  // shared access key name for entity (AKA "Policy Name")
+    std::string sb_key;       // shared access key
+    std::string sb_entity;    // AKA the service bus queue.  Must enable
+                              // sessions on it for this example.
+
+    example::options opts(argc, argv);
+    opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
+    opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
+    opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
+    opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");
+
+    try {
+        opts.parse();
+        check_arg(sb_namespace, "namespace");
+        check_arg(sb_key_name, "policy");
+        check_arg(sb_key, "key");
+        check_arg(sb_entity, "entity");
+        std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace);
+
+        sequence seq(connection_string, sb_entity);
+        proton::container(seq).run();
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_connect.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/simple_connect.cpp b/cpp/examples/simple_connect.cpp
new file mode 100644
index 0000000..74a8c87
--- /dev/null
+++ b/cpp/examples/simple_connect.cpp
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class simple_connect : public proton::messaging_handler {
+  private:
+    std::string url;
+    std::string user;
+    std::string password;
+    bool reconnect;
+    bool sasl;
+    std::string mechs;
+    bool insecure;
+    proton::connection connection;
+
+  public:
+    simple_connect(const std::string &a, const std::string &u, const std::string &p,
+                   bool r, bool s, const std::string& ms, bool in) :
+        url(a), user(u), password(p),
+        reconnect(r), sasl(s), mechs(ms), insecure(in) {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        proton::connection_options co;
+        if (!user.empty()) co.user(user);
+        if (!password.empty()) co.password(password);
+        if (reconnect) co.reconnect(proton::reconnect_options());
+        if (sasl) co.sasl_enabled(true);
+        //
+        // NB: We only set sasl options if they are not default to avoid
+        // forcing SASL negotiation on when it's not needed.
+        //
+        // This is because the SASL negotiation is turned off unless
+        // it is needed. Setting a username/password or any SASL option will
+        // force the SASL negotiation to be turned on.
+        //
+        if (!mechs.empty()) co.sasl_allowed_mechs(mechs);
+        if (insecure) co.sasl_allow_insecure_mechs(true);
+        connection = c.connect(url, co);
+    }
+
+    void on_connection_open(proton::connection &c) OVERRIDE {
+        c.close();
+    }
+
+    void on_error(const proton::error_condition& e) OVERRIDE {
+        throw std::runtime_error(e.what());
+    }
+};
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    std::string user;
+    std::string password;
+    bool reconnect = false;
+    bool sasl = false;
+    std::string mechs;
+    bool insecure = false;
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
+    opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+    opts.add_flag(reconnect, 'r', "reconnect", "reconnect on connection failure");
+    opts.add_flag(sasl,'s', "sasl", "force SASL authentication with no user specified (Use for Kerberos/GSSAPI)");
+    opts.add_value(mechs, 'm', "mechs", "allowed SASL mechanisms", "MECHS");
+    opts.add_flag(insecure, 'i', "insecure", "allow clear-text passwords");
+
+    try {
+        opts.parse();
+
+        simple_connect connect(address, user, password, reconnect, sasl, mechs, insecure);
+        proton::container(connect).run();
+
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/simple_recv.cpp b/cpp/examples/simple_recv.cpp
new file mode 100644
index 0000000..5a7cde4
--- /dev/null
+++ b/cpp/examples/simple_recv.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/value.hpp>
+
+#include <iostream>
+#include <map>
+
+#include "fake_cpp11.hpp"
+
+class simple_recv : public proton::messaging_handler {
+  private:
+    std::string url;
+    std::string user;
+    std::string password;
+    proton::receiver receiver;
+    int expected;
+    int received;
+
+  public:
+    simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) :
+        url(s), user(u), password(p), expected(c), received(0) {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        proton::connection_options co;
+        if (!user.empty()) co.user(user);
+        if (!password.empty()) co.password(password);
+        receiver = c.open_receiver(url, co);
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+        if (proton::coerce<int>(msg.id()) < received) {
+            return; // Ignore duplicate
+        }
+
+        if (expected == 0 || received < expected) {
+            std::cout << msg.body() << std::endl;
+            received++;
+
+            if (received == expected) {
+                d.receiver().close();
+                d.connection().close();
+            }
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    std::string user;
+    std::string password;
+    int message_count = 100;
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
+    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
+    opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
+    opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+
+
+    try {
+        opts.parse();
+
+        simple_recv recv(address, user, password, message_count);
+        proton::container(recv).run();
+
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message