Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 048E3200C36 for ; Thu, 23 Feb 2017 23:51:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 03448160B67; Thu, 23 Feb 2017 22:51:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E7A23160B84 for ; Thu, 23 Feb 2017 23:51:09 +0100 (CET) Received: (qmail 76598 invoked by uid 500); 23 Feb 2017 22:51:09 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 76082 invoked by uid 99); 23 Feb 2017 22:51:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2017 22:51:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 72CF2DFFAB; Thu, 23 Feb 2017 22:51:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Thu, 23 Feb 2017 22:51:22 -0000 Message-Id: <0d7ad78d698845e1bc73acb2bf269454@git.apache.org> In-Reply-To: <18e12ab11c6247358369966c9b6620b7@git.apache.org> References: <18e12ab11c6247358369966c9b6620b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/38] qpid-proton git commit: PROTON-1403: c proactor direct example archived-at: Thu, 23 Feb 2017 22:51:12 -0000 PROTON-1403: c proactor direct example direct server that can accept a connection from the send or receive examples and act as a directly-connected receive or send as appropriate. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6291a75c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6291a75c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6291a75c Branch: refs/heads/go1 Commit: 6291a75cbed5840578c0d2577424ed574b8ee56f Parents: 8568737 Author: Alan Conway Authored: Tue Feb 14 12:06:43 2017 -0500 Committer: Alan Conway Committed: Tue Feb 14 12:51:31 2017 -0500 ---------------------------------------------------------------------- examples/c/proactor/CMakeLists.txt | 2 +- examples/c/proactor/README.dox | 14 +- examples/c/proactor/broker.c | 3 +- examples/c/proactor/direct.c | 357 ++++++++++++++++++++++++++++++++ examples/c/proactor/receive.c | 2 +- examples/c/proactor/test.py | 29 ++- examples/exampletest.py | 1 + 7 files changed, 398 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt index 7fec1c6..4189cf5 100644 --- a/examples/c/proactor/CMakeLists.txt +++ b/examples/c/proactor/CMakeLists.txt @@ -41,7 +41,7 @@ else(WIN32) set(PLATFORM_LIBS pthread) endif(WIN32) -foreach(name broker send receive) +foreach(name broker send receive direct) add_executable(proactor-${name} ${name}.c) target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS}) set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/README.dox ---------------------------------------------------------------------- diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox index 4b09cb7..19083e5 100644 --- a/examples/c/proactor/README.dox +++ b/examples/c/proactor/README.dox @@ -2,16 +2,22 @@ * @example send.c * * Send a fixed number of messages to the "example" node. + * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker. * * @example receive.c * - * Subscribes to the 'example' node and prints the message bodies - * received. + * Subscribes to the 'example' node and prints the message bodies received. + * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker. + * + * @example direct.c + * + * A server that can be used to demonstrate direct (no broker) peer-to-peer communication + * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples + * and will act as the directly-connected counterpart (receive or send) * * @example broker.c * - * A simple multithreaded broker that works with the send and receive - * examples. + * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples. * * __Requires C++11__ */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index 5679290..ebf4068 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -307,7 +307,7 @@ static void handle(broker_t* b, pn_event_t* e) { pn_listener_accept(pn_event_listener(e), pn_connection()); break; - case PN_CONNECTION_INIT: + case PN_CONNECTION_INIT: pn_connection_set_container(c, b->container_id); break; @@ -398,6 +398,7 @@ static void handle(broker_t* b, pn_event_t* e) { case PN_LISTENER_CLOSE: check_condition(e, pn_listener_condition(pn_event_listener(e))); + broker_stop(b); break; case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c new file mode 100644 index 0000000..26f1b33 --- /dev/null +++ b/examples/c/proactor/direct.c @@ -0,0 +1,357 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pncompat/misc_funcs.inc" + +#include +#include +#include + +typedef char str[1024]; + +typedef struct app_data_t { + /* Common values */ + pn_proactor_t *proactor; + bool finished; + str address; + str container_id; + pn_rwbytes_t message_buffer; + int message_count; + + /* Sender values */ + int sent; + int acknowledged; + pn_link_t *sender; + pn_millis_t delay; + bool delaying; + + /* Receiver values */ + int received; +} app_data_t; + +static const int BATCH = 1000; /* Batch size for unlimited receive */ + +int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + exit_code = 1; + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_t* body = pn_message_body(message); + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = 128; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); +} + +static void send(app_data_t* app) { + while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) { + ++app->sent; + // Use sent counter bytes as unique delivery tag. + pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(app->sender, msgbuf.start, msgbuf.size); + pn_link_advance(app->sender); + if (app->delay && app->sent < app->message_count) { + /* If delay is set, wait for TIMEOUT event to send more */ + app->delaying = true; + pn_proactor_set_timeout(app->proactor, app->delay); + break; + } + } +} + +#define MAX_SIZE 1024 + +static void decode_message(pn_delivery_t *dlv) { + static char buffer[MAX_SIZE]; + ssize_t len; + // try to decode the message body + if (pn_delivery_pending(dlv) < MAX_SIZE) { + // read in the raw data + len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); + if (len > 0) { + // decode it into a proton message + pn_message_t *m = pn_message(); + if (PN_OK == pn_message_decode(m, buffer, len)) { + pn_string_t *s = pn_string(NULL); + pn_inspect(pn_message_body(m), s); + printf("%s\n", pn_string_get(s)); + pn_free(s); + } + pn_message_free(m); + } + } +} + +/* This function handles events when we are acting as the receiver */ +static void handle_receive(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t *l = pn_event_link(event); + pn_link_open(l); + pn_link_flow(l, app->message_count ? app->message_count : BATCH); + } break; + + case PN_DELIVERY: { + /* A message has been received */ + pn_link_t *link = NULL; + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { + link = pn_delivery_link(dlv); + decode_message(dlv); + /* Accept the delivery */ + pn_delivery_update(dlv, PN_ACCEPTED); + /* done with the delivery, move to the next and free it */ + pn_link_advance(link); + pn_delivery_settle(dlv); /* dlv is now freed */ + + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(link) < BATCH/2) { + /* Grant enough credit to bring it up to BATCH: */ + pn_link_flow(link, BATCH - pn_link_credit(link)); + } + } else if (++app->received >= app->message_count) { + /* done receiving, close the endpoints */ + printf("%d messages received\n", app->received); + pn_session_t *ssn = pn_link_session(link); + pn_link_close(link); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); + } + } + } break; + + default: + break; + } +} + +/* This function handles events when we are acting as the sender */ +static void handle_send(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LINK_REMOTE_OPEN: { + pn_link_t* l = pn_event_link(event); + pn_terminus_set_address(pn_link_target(l), app->address); + pn_link_open(l); + } break; + + case PN_LINK_FLOW: + /* The peer has given us some credit, now we can send messages */ + if (!app->delaying) { + app->sender = pn_event_link(event); + send(app); + } break; + + case PN_DELIVERY: { + /* We received acknowledgedment from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + pn_connection_close(pn_event_connection(event)); + } + } + } break; + + default: + break; + } +} + +/* Handle all events, delegate to handle_send or handle_receive depending on link mode */ +static void handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_LISTENER_ACCEPT: + pn_listener_accept(pn_event_listener(event), pn_connection()); + break; + + case PN_CONNECTION_INIT: + pn_connection_set_container(pn_event_connection(event), app->container_id); + break; + + case PN_CONNECTION_BOUND: { + /* Turn off security */ + pn_transport_t *t = pn_event_transport(event); + pn_transport_require_auth(t, false); + pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); + } + case PN_CONNECTION_REMOTE_OPEN: { + pn_connection_open(pn_event_connection(event)); /* Complete the open */ + break; + } + + case PN_SESSION_REMOTE_OPEN: { + pn_session_open(pn_event_session(event)); + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + app->finished = true; + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_TIMEOUT: + /* Wake the sender's connection */ + pn_connection_wake(pn_session_connection(pn_link_session(app->sender))); + break; + + case PN_CONNECTION_WAKE: + /* Timeout, we can send more. */ + app->delaying = false; + send(app); + break; + + case PN_PROACTOR_INACTIVE: + app->finished = true; + break; + + case PN_LISTENER_CLOSE: + check_condition(event, pn_listener_condition(pn_event_listener(event))); + app->finished = true; + break; + + default: { + pn_link_t *l = pn_event_link(event); + if (l) { /* Only delegate link-related events */ + if (pn_link_is_sender(l)) { + handle_send(app, event); + } else { + handle_receive(app, event); + } + } + } + } +} + +static void usage(const char *arg0) { + fprintf(stderr, "Usage: %s [-a URL] [-m message-count] [-d delay-ms]\n", arg0); + fprintf(stderr, "Demonstrates direct peer-to-peer AMQP communication without a broker. Accepts a connection from either the send.c or receive.c client and provides the complementary behavior (receive or send."); + exit(1); +} + +int main(int argc, char **argv) { + /* Default values for application and connection. */ + app_data_t app = {0}; + app.message_count = 100; + const char* urlstr = NULL; + + int opt; + while((opt = getopt(argc, argv, "a:m:d:")) != -1) { + switch(opt) { + case 'a': urlstr = optarg; break; + case 'm': app.message_count = atoi(optarg); break; + case 'd': app.delay = atoi(optarg); break; + default: usage(argv[0]); break; + } + } + if (optind < argc) + usage(argv[0]); + /* Note container-id should be unique */ + snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]); + + /* Parse the URL or use default values */ + pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; + /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default, + this will also listen for mapped IPv4 on the same port. + */ + const char *host = url ? pn_url_get_host(url) : "::"; + const char *port = url ? pn_url_get_port(url) : "amqp"; + + app.proactor = pn_proactor(); + pn_proactor_listen(app.proactor, pn_listener(), host, port, 16); + printf("listening on '%s:%s'\n", host, port); + if (url) pn_url_free(url); + + do { + pn_event_batch_t *events = pn_proactor_wait(app.proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + handle(&app, e); + } + pn_proactor_done(app.proactor, events); + } while(!app.finished); + + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c index 1eb54b6..1bc5509 100644 --- a/examples/c/proactor/receive.c +++ b/examples/c/proactor/receive.c @@ -46,7 +46,7 @@ typedef struct app_data_t { bool finished; } app_data_t; -static const int BATCH = 100; /* Batch size for unlimited receive */ +static const int BATCH = 1000; /* Batch size for unlimited receive */ static int exit_code = 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/test.py ---------------------------------------------------------------------- diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py index 29aa327..45bb817 100644 --- a/examples/c/proactor/test.py +++ b/examples/c/proactor/test.py @@ -19,11 +19,9 @@ # This is a test script to run the examples and verify that they behave as expected. +import unittest, sys, time from exampletest import * -import unittest -import sys - def python_cmd(name): dir = os.path.dirname(__file__) return [sys.executable, os.path.join(dir, "..", "..", "python", name)] @@ -55,6 +53,31 @@ class CExampleTest(BrokerTestCase): r = self.proc(["receive", "-a", self.addr, "-m3"]) self.assertEqual(receive_expect(3), r.wait_out()) + def retry(self, args, max=10): + """Run until output does not contain "connection refused", up to max retries""" + while True: + try: + return self.proc(args).wait_out() + except ProcError, e: + if "connection refused" in e.args[0] and max > 0: + max -= 1 + time.sleep(.01) + continue + raise + + def test_send_direct(self): + """Send first then receive""" + addr = "127.0.0.1:%s/examples" % (pick_port()) + d = self.proc(["direct", "-a", addr]) + self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr])) + self.assertIn(receive_expect(100), d.wait_out()) + + def test_receive_direct(self): + """Send first then receive""" + addr = "127.0.0.1:%s/examples" % (pick_port()) + d = self.proc(["direct", "-a", addr]) + self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr])) + self.assertIn("100 messages sent and acknowledged\n", d.wait_out()) if __name__ == "__main__": unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/exampletest.py ---------------------------------------------------------------------- diff --git a/examples/exampletest.py b/examples/exampletest.py index d40b9cb..bf7ea9b 100644 --- a/examples/exampletest.py +++ b/examples/exampletest.py @@ -65,6 +65,7 @@ class Proc(Popen): """Start an example process""" args = list(args) self.args = args + self.kwargs = kwargs self._out = os.tmpfile() try: Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org