From commits-return-45992-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Wed Jul 4 00:12:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 76BF2180674 for ; Wed, 4 Jul 2018 00:12:52 +0200 (CEST) Received: (qmail 61939 invoked by uid 500); 3 Jul 2018 22:12:51 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 61738 invoked by uid 99); 3 Jul 2018 22:12:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jul 2018 22:12:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5AEEADFA21; Tue, 3 Jul 2018 22:12:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Tue, 03 Jul 2018 22:12:56 -0000 Message-Id: In-Reply-To: <506ba33a6fed44e08b20ccd012e637c7@git.apache.org> References: <506ba33a6fed44e08b20ccd012e637c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/msgr-common.c ---------------------------------------------------------------------- diff --git a/c/tools/msgr-common.c b/c/tools/msgr-common.c new file mode 100644 index 0000000..7c43a45 --- /dev/null +++ b/c/tools/msgr-common.c @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "msgr-common.h" +#include + +#include +#include +#include +#include + +void msgr_die(const char *file, int line, const char *message) +{ + fprintf(stderr, "%s:%i: %s\n", file, line, message); + exit(1); +} + +//sigh - would be nice if proton exported pn_strdup() +char *msgr_strdup( const char *src ) +{ + char *r = NULL; + if (src) { + r = (char *) malloc(sizeof(char) * (strlen(src) + 1)); + if (r) strcpy(r,src); + } + return r; +} + + +pn_timestamp_t msgr_now() +{ + // from "pncompat/misc_funcs.inc" + return time_now(); +} + +void addresses_init( Addresses_t *a ) +{ + a->size = 10; // whatever + a->count = 0; + a->addresses = (const char **) calloc( a->size, sizeof(const char *)); + check(a->addresses, "malloc failure"); +} + +void addresses_free( Addresses_t *a ) +{ + if (a->addresses) { + int i; + for (i = 0; i < a->count; i++) + if (a->addresses[i]) free( (void *)a->addresses[i] ); + free( a->addresses ); + a->addresses = NULL; + } +} + +void addresses_add( Addresses_t *a, const char *addr ) +{ + if (a->count == a->size) { + a->size += 10; + a->addresses = (const char **) realloc( a->addresses, + a->size * sizeof(const char *) ); + check( a->addresses, "malloc failure" ); + int i; + for (i = a->count; i < a->size; i++) + a->addresses[i] = NULL; + } + a->addresses[a->count] = msgr_strdup(addr); + check( a->addresses[a->count], "malloc failure" ); + a->count++; +} + +// merge a comma-separated list of addresses +void addresses_merge( Addresses_t *a, const char *list ) +{ + char *const l = msgr_strdup(list); + check( l, "malloc failure" ); + char *addr = l; + while (addr && *addr) { + char *comma = strchr( addr, ',' ); + if (comma) { + *comma++ = '\0'; + } + addresses_add( a, addr ); + addr = comma; + } + free(l); +} + + +void statistics_start( Statistics_t *s ) +{ + s->latency_samples = 0; + s->latency_total = s->latency_min = s->latency_max = 0.0; + s->start = msgr_now(); +} + +void statistics_msg_received( Statistics_t *s, pn_message_t *message ) +{ + pn_timestamp_t ts = pn_message_get_creation_time( message ); + if (ts) { + double l = (double)(msgr_now() - ts); + if (l > 0) { + s->latency_total += l; + if (++s->latency_samples == 1) { + s->latency_min = s->latency_max = l; + } else { + if (s->latency_min > l) + s->latency_min = l; + if (s->latency_max < l) + s->latency_max = l; + } + } + } +} + +void statistics_report( Statistics_t *s, uint64_t sent, uint64_t received ) +{ + pn_timestamp_t end = msgr_now() - s->start; + double secs = end/(double)1000.0; + + fprintf(stdout, "Messages sent: %" PRIu64 " recv: %" PRIu64 "\n", sent, received ); + fprintf(stdout, "Total time: %f sec\n", secs ); + fprintf(stdout, "Throughput: %f msgs/sec\n", (secs != 0.0) ? (double)sent/secs : 0); + fprintf(stdout, "Latency (sec): %f min %f max %f avg\n", + s->latency_min/1000.0, s->latency_max/1000.0, + (s->latency_samples) ? (s->latency_total/s->latency_samples)/1000.0 : 0); +} + +void parse_password( const char *input, char **password ) +{ + if (strncmp( input, "pass:", 5 ) == 0) { + // password provided on command line (not secure, shows up in 'ps') + *password = msgr_strdup( input + 5 ); + } else { // input assumed to be file containing password + FILE *f = fopen( input, "r" ); + check( f, "Cannot open password file\n" ); + *password = (char *)malloc(256); // 256 should be enough for anybody! + check( *password, "malloc failure" ); + int rc = fscanf( f, "%255s", *password ); + check( rc == 1, "Cannot read password from file\n" ); + fclose(f); + } +} + +static int log = 0; +void enable_logging() +{ + log = 1; +} + +void LOG( const char *fmt, ... ) +{ + if (log) { + va_list ap; + va_start(ap, fmt); + vfprintf( stdout, fmt, ap ); + va_end(ap); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/msgr-common.h ---------------------------------------------------------------------- diff --git a/c/tools/msgr-common.h b/c/tools/msgr-common.h new file mode 100644 index 0000000..d3f483a --- /dev/null +++ b/c/tools/msgr-common.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "pncompat/misc_defs.h" + +#if defined(USE_INTTYPES) +#ifdef __cplusplus +#define __STDC_FORMAT_MACROS +#endif +#include +#endif + +#ifdef _MSC_VER +#if !defined(PRIu64) +#define PRIu64 "I64u" +#endif +#if !defined(SCNu64) +#define SCNu64 "I64u" +#endif +#endif + +/* If still not defined, best guess */ +#if !defined(SCNu64) +#define SCNu64 "ul" +#endif +#if !defined(PRIu64) +#define PRIu64 "ul" +#endif + + +#include "proton/types.h" +#include "proton/message.h" + +void msgr_die(const char *file, int line, const char *message); +char *msgr_strdup( const char *src ); +pn_timestamp_t msgr_now(void); +void parse_password( const char *, char ** ); + +#define check_messenger(m) \ + { check(pn_messenger_errno(m) == 0, pn_error_text(pn_messenger_error(m))) } + +#define check( expression, message ) \ + { if (!(expression)) msgr_die(__FILE__,__LINE__, message); } + + +// manage an ordered list of addresses + +typedef struct { + const char **addresses; + int size; // room in 'addresses' + int count; // # entries +} Addresses_t; + +#define NEXT_ADDRESS(a, i) (((i) + 1) % (a).count) +void addresses_init( Addresses_t *a ); +void addresses_free( Addresses_t *a ); +void addresses_add( Addresses_t *a, const char *addr ); +void addresses_merge( Addresses_t *a, const char *list ); + +// Statistics handling + +typedef struct { + pn_timestamp_t start; + uint64_t latency_samples; + double latency_total; + double latency_min; + double latency_max; +} Statistics_t; + +void statistics_start( Statistics_t *s ); +void statistics_msg_received( Statistics_t *s, pn_message_t *message ); +void statistics_report( Statistics_t *s, uint64_t sent, uint64_t received ); + +void enable_logging(void); +void LOG( const char *fmt, ... ); + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/msgr-recv.c ---------------------------------------------------------------------- diff --git a/c/tools/msgr-recv.c b/c/tools/msgr-recv.c new file mode 100644 index 0000000..eff5820 --- /dev/null +++ b/c/tools/msgr-recv.c @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "msgr-common.h" +#include "proton/message.h" +#include "proton/messenger.h" +#include "proton/error.h" + +#include +#include +#include +#include + +typedef struct { + Addresses_t subscriptions; + uint64_t msg_count; + int recv_count; + int incoming_window; + int timeout; // seconds + unsigned int report_interval; // in seconds + int outgoing_window; + Addresses_t forwarding_targets; + int reply; + const char *name; + const char *ready_text; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: msgr-recv [OPTIONS] \n" + " -a [,]* \tAddresses to listen on [amqp://~0.0.0.0]\n" + " -c # \tNumber of messages to receive before exiting [0=forever]\n" + " -b # \tArgument to Messenger::recv(n) [2048]\n" + " -w # \tSize for incoming window [0]\n" + " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n" + " -e # \t# seconds to report statistics, 0 = end of test [0] *TBD*\n" + " -R \tSend reply if 'reply-to' present\n" + " -W # \t# outgoing window size [0]\n" + " -F [,]* \tAddresses used for forwarding received messages\n" + " -N \tSet the container name to \n" + " -X \tPrint '\\n' to stdout after all subscriptions are created\n" + " -V \tEnable debug logging\n" + " SSL options:\n" + " -T \tDatabase of trusted CA certificates for validating peer\n" + " -C \tFile containing self-identifying certificate\n" + " -K \tFile containing private key used to sign certificate\n" + " -P [pass:|path] \tPassword to unlock private key file.\n" + ); + exit(rc); +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->recv_count = -1; + opts->timeout = -1; + addresses_init(&opts->subscriptions); + addresses_init(&opts->forwarding_targets); + + while ((c = getopt(argc, argv, + "a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) { + switch (c) { + case 'a': addresses_merge( &opts->subscriptions, optarg ); break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'b': + if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'w': + if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'e': + if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'R': opts->reply = 1; break; + case 'W': + if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'F': addresses_merge( &opts->forwarding_targets, optarg ); break; + case 'V': enable_logging(); break; + case 'N': opts->name = optarg; break; + case 'X': opts->ready_text = optarg; break; + case 'T': opts->ca_db = optarg; break; + case 'C': opts->certificate = optarg; break; + case 'K': opts->privatekey = optarg; break; + case 'P': parse_password( optarg, &opts->password ); break; + + default: + usage(1); + } + } + + // default subscription if none specified + if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions, + "amqp://~0.0.0.0" ); +} + + + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + uint64_t sent = 0; + uint64_t received = 0; + int forwarding_index = 0; + int rc; + + pn_message_t *message; + pn_messenger_t *messenger; + + parse_options( argc, argv, &opts ); + + const int forward = opts.forwarding_targets.count != 0; + + message = pn_message(); + messenger = pn_messenger( opts.name ); + + /* load the various command line options if they're set */ + if (opts.certificate) { + rc = pn_messenger_set_certificate(messenger, opts.certificate); + check_messenger(messenger); + check( rc == 0, "Failed to set certificate" ); + } + + if (opts.privatekey) { + rc = pn_messenger_set_private_key(messenger, opts.privatekey); + check_messenger(messenger); + check( rc == 0, "Failed to set private key" ); + } + + if (opts.password) { + rc = pn_messenger_set_password(messenger, opts.password); + check_messenger(messenger); + check( rc == 0, "Failed to set password" ); + } + + if (opts.ca_db) { + rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db); + check_messenger(messenger); + check( rc == 0, "Failed to set trusted CA database" ); + } + + if (opts.incoming_window) { + // RAFI: seems to cause receiver to hang: + pn_messenger_set_incoming_window( messenger, opts.incoming_window ); + } + + pn_messenger_set_timeout( messenger, opts.timeout ); + + pn_messenger_start(messenger); + check_messenger(messenger); + + int i; + for (i = 0; i < opts.subscriptions.count; i++) { + pn_messenger_subscribe(messenger, opts.subscriptions.addresses[i]); + check_messenger(messenger); + LOG("Subscribing to '%s'\n", opts.subscriptions.addresses[i]); + } + + // hack to let test scripts know when the receivers are ready (so + // that the senders may be started) + if (opts.ready_text) { + fprintf(stdout, "%s\n", opts.ready_text); + fflush(stdout); + } + + while (!opts.msg_count || received < opts.msg_count) { + + LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count); + rc = pn_messenger_recv(messenger, opts.recv_count); + check_messenger(messenger); + check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed"); + + // start the timer only after receiving the first msg + if (received == 0) statistics_start( &stats ); + + LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger)); + while (pn_messenger_incoming(messenger)) { + pn_messenger_get(messenger, message); + check_messenger(messenger); + received++; + // TODO: header decoding? + // uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong; + statistics_msg_received( &stats, message ); + + if (opts.reply) { + const char *reply_addr = pn_message_get_reply_to( message ); + if (reply_addr) { + LOG("Replying to: %s\n", reply_addr ); + pn_message_set_address( message, reply_addr ); + pn_message_set_creation_time( message, msgr_now() ); + pn_messenger_put(messenger, message); + sent++; + } + } + + if (forward) { + const char *forward_addr = opts.forwarding_targets.addresses[forwarding_index]; + forwarding_index = NEXT_ADDRESS(opts.forwarding_targets, forwarding_index); + LOG("Forwarding to: %s\n", forward_addr ); + pn_message_set_address( message, forward_addr ); + pn_message_set_reply_to( message, NULL ); // else points to origin sender + pn_message_set_creation_time( message, msgr_now() ); + pn_messenger_put(messenger, message); + sent++; + } + + } + LOG("Messages received=%llu sent=%llu\n", received, sent); + } + + // this will flush any pending sends + if (pn_messenger_outgoing(messenger) > 0) { + LOG("Calling pn_messenger_send()\n"); + rc = pn_messenger_send(messenger, -1); + check_messenger(messenger); + check(rc == 0, "pn_messenger_send() failed"); + } + + rc = pn_messenger_stop(messenger); + check(rc == 0, "pn_messenger_stop() failed"); + check_messenger(messenger); + + statistics_report( &stats, sent, received ); + + pn_messenger_free(messenger); + pn_message_free(message); + addresses_free( &opts.subscriptions ); + addresses_free( &opts.forwarding_targets ); + + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/msgr-send.c ---------------------------------------------------------------------- diff --git a/c/tools/msgr-send.c b/c/tools/msgr-send.c new file mode 100644 index 0000000..7d67fcc --- /dev/null +++ b/c/tools/msgr-send.c @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "msgr-common.h" +#include "proton/message.h" +#include "proton/messenger.h" +#include "proton/error.h" + +#include +#include +#include +#include + + +typedef struct { + Addresses_t targets; + uint64_t msg_count; + uint32_t msg_size; // of body + uint32_t send_batch; + int outgoing_window; + unsigned int report_interval; // in seconds + //Addresses_t subscriptions; + //Addresses_t reply_tos; + int get_replies; + int timeout; // in seconds + int incoming_window; + int recv_count; + const char *name; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: msgr-send [OPTIONS] \n" + " -a [,]* \tThe target address [amqp[s]://domain[/name]]\n" + " -c # \tNumber of messages to send before exiting [0=forever]\n" + " -b # \tSize of message body in bytes [1024]\n" + " -p # \tSend batches of # messages (wait for replies before sending next batch if -R) [1024]\n" + " -w # \t# outgoing window size [0]\n" + " -e # \t# seconds to report statistics, 0 = end of test [0]\n" + " -R \tWait for a reply to each sent message\n" + " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n" + " -W # \tIncoming window size [0]\n" + " -B # \tArgument to Messenger::recv(n) [-1]\n" + " -N \tSet the container name to \n" + " -V \tEnable debug logging\n" + " SSL options:\n" + " -T \tDatabase of trusted CA certificates for validating peer\n" + " -C \tFile containing self-identifying certificate\n" + " -K \tFile containing private key used to sign certificate\n" + " -P [pass:|path] \tPassword to unlock private key file.\n" + ); + //printf("-p \t*TODO* Add N sample properties to each message [3]\n"); + exit(rc); +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->msg_size = 1024; + opts->send_batch = 1024; + opts->timeout = -1; + opts->recv_count = -1; + addresses_init(&opts->targets); + + while ((c = getopt(argc, argv, + "a:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) { + switch(c) { + case 'a': addresses_merge( &opts->targets, optarg ); break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'b': + if (sscanf( optarg, "%u", &opts->msg_size ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'p': + if (sscanf( optarg, "%u", &opts->send_batch ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'w': + if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'e': + if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'R': opts->get_replies = 1; break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'W': + if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'B': + if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'V': enable_logging(); break; + case 'N': opts->name = optarg; break; + case 'T': opts->ca_db = optarg; break; + case 'C': opts->certificate = optarg; break; + case 'K': opts->privatekey = optarg; break; + case 'P': parse_password( optarg, &opts->password ); break; + + default: + usage(1); + } + } + + // default target if none specified + if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" ); +} + + +// return the # of reply messages received +static int process_replies( pn_messenger_t *messenger, + pn_message_t *message, + Statistics_t *stats, + int max_count) +{ + int received = 0; + LOG("Calling pn_messenger_recv(%d)\n", max_count); + int rc = pn_messenger_recv( messenger, max_count ); + check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_recv() failed"); + LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger)); + while (pn_messenger_incoming(messenger)) { + pn_messenger_get(messenger, message); + check_messenger(messenger); + received++; + // TODO: header decoding? + statistics_msg_received( stats, message ); + // uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong; + } + return received; +} + + + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + uint64_t sent = 0; + uint64_t received = 0; + int target_index = 0; + int rc; + + pn_message_t *message = 0; + pn_message_t *reply_message = 0; + pn_messenger_t *messenger = 0; + + parse_options( argc, argv, &opts ); + + messenger = pn_messenger( opts.name ); + + if (opts.certificate) { + rc = pn_messenger_set_certificate(messenger, opts.certificate); + check( rc == 0, "Failed to set certificate" ); + } + + if (opts.privatekey) { + rc = pn_messenger_set_private_key(messenger, opts.privatekey); + check( rc == 0, "Failed to set private key" ); + } + + if (opts.password) { + rc = pn_messenger_set_password(messenger, opts.password); + check( rc == 0, "Failed to set password" ); + } + + if (opts.ca_db) { + rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db); + check( rc == 0, "Failed to set trusted CA database" ); + } + + if (opts.outgoing_window) { + pn_messenger_set_outgoing_window( messenger, opts.outgoing_window ); + } + pn_messenger_set_timeout( messenger, opts.timeout ); + pn_messenger_start(messenger); + + message = pn_message(); + check(message, "failed to allocate a message"); + pn_message_set_reply_to(message, "~"); + pn_data_t *body = pn_message_body(message); + char *data = (char *)calloc(1, opts.msg_size); + pn_data_put_binary(body, pn_bytes(opts.msg_size, data)); + free(data); + pn_atom_t id; + id.type = PN_ULONG; + +#if 0 + // TODO: how do we effectively benchmark header processing overhead??? + pn_data_t *props = pn_message_properties(message); + pn_data_put_map(props); + pn_data_enter(props); + // + //pn_data_put_string(props, pn_bytes(6, "string")); + //pn_data_put_string(props, pn_bytes(10, "this is awkward")); + // + //pn_data_put_string(props, pn_bytes(4, "long")); + pn_data_put_long(props, 12345); + // + //pn_data_put_string(props, pn_bytes(9, "timestamp")); + pn_data_put_timestamp(props, (pn_timestamp_t) 54321); + pn_data_exit(props); +#endif + + const int get_replies = opts.get_replies; + + if (get_replies) { + // disable the timeout so that pn_messenger_recv() won't block + reply_message = pn_message(); + check(reply_message, "failed to allocate a message"); + } + + statistics_start( &stats ); + while (!opts.msg_count || (sent < opts.msg_count)) { + + // setup the message to send + pn_message_set_address(message, opts.targets.addresses[target_index]); + target_index = NEXT_ADDRESS(opts.targets, target_index); + id.u.as_ulong = sent; + pn_message_set_correlation_id( message, id ); + pn_message_set_creation_time( message, msgr_now() ); + pn_messenger_put(messenger, message); + sent++; + if (opts.send_batch && (pn_messenger_outgoing(messenger) >= (int)opts.send_batch)) { + if (get_replies) { + while (received < sent) { + // this will also transmit any pending sent messages + received += process_replies( messenger, reply_message, + &stats, opts.recv_count ); + } + } else { + LOG("Calling pn_messenger_send()\n"); + rc = pn_messenger_send(messenger, -1); + check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_send() failed"); + } + } + check_messenger(messenger); + } + + LOG("Messages received=%llu sent=%llu\n", received, sent); + + if (get_replies) { + // wait for the last of the replies + while (received < sent) { + int count = process_replies( messenger, reply_message, + &stats, opts.recv_count ); + check( count > 0 || (opts.timeout == 0), + "Error: timed out waiting for reply messages\n"); + received += count; + LOG("Messages received=%llu sent=%llu\n", received, sent); + } + } else if (pn_messenger_outgoing(messenger) > 0) { + LOG("Calling pn_messenger_send()\n"); + rc = pn_messenger_send(messenger, -1); + check(rc == 0, "pn_messenger_send() failed"); + } + + rc = pn_messenger_stop(messenger); + check(rc == 0, "pn_messenger_stop() failed"); + check_messenger(messenger); + + statistics_report( &stats, sent, received ); + + pn_messenger_free(messenger); + pn_message_free(message); + if (reply_message) pn_message_free( reply_message ); + addresses_free( &opts.targets ); + + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/reactor-recv.c ---------------------------------------------------------------------- diff --git a/c/tools/reactor-recv.c b/c/tools/reactor-recv.c new file mode 100644 index 0000000..2a3d434 --- /dev/null +++ b/c/tools/reactor-recv.c @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Implements a subset of msgr-recv.c using reactor events. + */ + +#include "proton/message.h" +#include "proton/error.h" +#include "proton/types.h" +#include "proton/reactor.h" +#include "proton/handlers.h" +#include "proton/engine.h" +#include "proton/url.h" +#include "msgr-common.h" + +#include +#include +#include +#include + +// The exact struct from msgr-recv, mostly fallow. +typedef struct { + Addresses_t subscriptions; + uint64_t msg_count; + int recv_count; + int incoming_window; + int timeout; // seconds + unsigned int report_interval; // in seconds + int outgoing_window; + int reply; + const char *name; + const char *ready_text; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: reactor-recv [OPTIONS] \n" + " -c # \tNumber of messages to receive before exiting [0=forever]\n" + " -R \tSend reply if 'reply-to' present\n" + " -t # \tInactivity timeout in seconds, -1 = no timeout [-1]\n" + " -X \tPrint '\\n' to stdout after all subscriptions are created\n" + ); + exit(rc); +} + + +// Global context for this process +typedef struct { + Options_t *opts; + Statistics_t *stats; + uint64_t sent; + uint64_t received; + pn_message_t *message; + pn_acceptor_t *acceptor; + char *encoded_data; + size_t encoded_data_size; + int connections; + pn_list_t *active_connections; + bool shutting_down; + pn_handler_t *listener_handler; + int quiesce_count; +} global_context_t; + +// Per connection context +typedef struct { + global_context_t *global; + int connection_id; + pn_link_t *recv_link; + pn_link_t *reply_link; +} connection_context_t; + + +static char *ensure_buffer(char *buf, size_t needed, size_t *actual) +{ + char* new_buf; + // Make room for the largest message seen so far, plus extra for slight changes in metadata content + if (needed + 1024 <= *actual) + return buf; + needed += 2048; + new_buf = (char *) realloc(buf, needed); + if (new_buf != NULL) { + buf = new_buf; + *actual = buf ? needed : 0; + } + return buf; +} + +void global_shutdown(global_context_t *gc) +{ + if (gc->shutting_down) return; + gc->shutting_down = true; + pn_acceptor_close(gc->acceptor); + size_t n = pn_list_size(gc->active_connections); + for (size_t i = 0; i < n; i++) { + pn_connection_t *conn = (pn_connection_t *) pn_list_get(gc->active_connections, i); + if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { + pn_connection_close(conn); + } + } +} + +connection_context_t *connection_context(pn_handler_t *h) +{ + connection_context_t *p = (connection_context_t *) pn_handler_mem(h); + return p; +} + +void connection_context_init(connection_context_t *cc, global_context_t *gc) +{ + cc->global = gc; + pn_incref(gc->listener_handler); + cc->connection_id = gc->connections++; + cc->recv_link = 0; + cc->reply_link = 0; +} + +void connection_cleanup(pn_handler_t *h) +{ + connection_context_t *cc = connection_context(h); + // Undo pn_incref() from connection_context_init() + pn_decref(cc->global->listener_handler); +} + +void connection_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + connection_context_t *cc = connection_context(h); + bool replying = cc->global->opts->reply; + + switch (type) { + case PN_LINK_REMOTE_OPEN: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_is_receiver(link)) { + check(cc->recv_link == NULL, "Multiple incoming links on one connection"); + cc->recv_link = link; + pn_connection_t *conn = pn_event_connection(event); + pn_list_add(cc->global->active_connections, conn); + if (cc->global->shutting_down) { + pn_connection_close(conn); + break; + } + if (replying) { + // Set up a reply link and defer granting credit to the incoming link + pn_connection_t *conn = pn_session_connection(pn_link_session(link)); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + char name[100]; // prefer a multiplatform uuid generator + sprintf(name, "reply_sender_%d", cc->connection_id); + cc->reply_link = pn_sender(ssn, name); + pn_link_open(cc->reply_link); + } + else { + pn_flowcontroller_t *fc = pn_flowcontroller(1024); + pn_handler_add(h, fc); + pn_decref(fc); + } + } + } + break; + case PN_LINK_FLOW: + { + if (replying) { + pn_link_t *reply_link = pn_event_link(event); + // pn_flowcontroller handles the non-reply case + check(reply_link == cc->reply_link, "internal error"); + + // Grant the sender as much credit as just given to us for replies + int delta = pn_link_credit(reply_link) - pn_link_credit(cc->recv_link); + if (delta > 0) + pn_link_flow(cc->recv_link, delta); + } + } + break; + case PN_DELIVERY: + { + pn_link_t *recv_link = pn_event_link(event); + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) { + if (cc->global->received == 0) statistics_start(cc->global->stats); + + size_t encoded_size = pn_delivery_pending(dlv); + cc->global->encoded_data = ensure_buffer(cc->global->encoded_data, encoded_size, + &cc->global->encoded_data_size); + check(cc->global->encoded_data, "decoding buffer realloc failure"); + + ssize_t n = pn_link_recv(recv_link, cc->global->encoded_data, encoded_size); + check(n == (ssize_t) encoded_size, "message data read fail"); + pn_message_t *msg = cc->global->message; + int err = pn_message_decode(msg, cc->global->encoded_data, n); + check(err == 0, "message decode error"); + cc->global->received++; + pn_delivery_settle(dlv); + statistics_msg_received(cc->global->stats, msg); + + if (replying) { + const char *reply_addr = pn_message_get_reply_to(msg); + if (reply_addr) { + pn_link_t *rl = cc->reply_link; + check(pn_link_credit(rl) > 0, "message received without corresponding reply credit"); + LOG("Replying to: %s\n", reply_addr ); + + pn_message_set_address(msg, reply_addr); + pn_message_set_creation_time(msg, msgr_now()); + + char tag[8]; + void *ptr = &tag; + *((uint64_t *) ptr) = cc->global->sent; + pn_delivery_t *dlv = pn_delivery(rl, pn_dtag(tag, 8)); + size_t size = cc->global->encoded_data_size; + int err = pn_message_encode(msg, cc->global->encoded_data, &size); + check(err == 0, "message encoding error"); + pn_link_send(rl, cc->global->encoded_data, size); + pn_delivery_settle(dlv); + + cc->global->sent++; + } + } + } + if (cc->global->received >= cc->global->opts->msg_count) { + global_shutdown(cc->global); + } + } + break; + case PN_CONNECTION_UNBOUND: + { + pn_connection_t *conn = pn_event_connection(event); + pn_list_remove(cc->global->active_connections, conn); + pn_connection_release(conn); + } + break; + default: + break; + } +} + +pn_handler_t *connection_handler(global_context_t *gc) +{ + pn_handler_t *h = pn_handler_new(connection_dispatch, sizeof(connection_context_t), connection_cleanup); + connection_context_t *cc = connection_context(h); + connection_context_init(cc, gc); + return h; +} + + +void start_listener(global_context_t *gc, pn_reactor_t *reactor) +{ + check(gc->opts->subscriptions.count > 0, "no listening address"); + pn_url_t *listen_url = pn_url_parse(gc->opts->subscriptions.addresses[0]); + const char *host = pn_url_get_host(listen_url); + const char *port = pn_url_get_port(listen_url); + if (port == 0 || strlen(port) == 0) + port = "5672"; + if (host == 0 || strlen(host) == 0) + host = "0.0.0.0"; + if (*host == '~') host++; + gc->acceptor = pn_reactor_acceptor(reactor, host, port, NULL); + check(gc->acceptor, "acceptor creation failed"); + pn_url_free(listen_url); +} + +void global_context_init(global_context_t *gc, Options_t *o, Statistics_t *s) +{ + gc->opts = o; + gc->stats = s; + gc->sent = 0; + gc->received = 0; + gc->encoded_data_size = 0; + gc->encoded_data = 0; + gc->message = pn_message(); + check(gc->message, "failed to allocate a message"); + gc->connections = 0; + gc->active_connections = pn_list(PN_OBJECT, 0); + gc->acceptor = 0; + gc->shutting_down = false; + gc->listener_handler = 0; + gc->quiesce_count = 0; +} + +global_context_t *global_context(pn_handler_t *h) +{ + return (global_context_t *) pn_handler_mem(h); +} + +void listener_cleanup(pn_handler_t *h) +{ + global_context_t *gc = global_context(h); + pn_message_free(gc->message); + free(gc->encoded_data); + pn_free(gc->active_connections); +} + +void listener_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + global_context_t *gc = global_context(h); + if (type == PN_REACTOR_QUIESCED) + gc->quiesce_count++; + else + gc->quiesce_count = 0; + + switch (type) { + case PN_CONNECTION_INIT: + { + pn_connection_t *connection = pn_event_connection(event); + + // New incoming connection on listener socket. Give each a separate handler. + pn_handler_t *ch = connection_handler(gc); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(ch, handshaker); + pn_decref(handshaker); + pn_record_t *record = pn_connection_attachments(connection); + pn_record_set_handler(record, ch); + pn_decref(ch); + } + break; + case PN_REACTOR_QUIESCED: + { + // Two quiesce in a row means we have been idle for a timeout period + if (gc->opts->timeout != -1 && gc->quiesce_count > 1) + global_shutdown(gc); + } + break; + case PN_REACTOR_INIT: + { + pn_reactor_t *reactor = pn_event_reactor(event); + start_listener(gc, reactor); + + // hack to let test scripts know when the receivers are ready (so + // that the senders may be started) + if (gc->opts->ready_text) { + fprintf(stdout, "%s\n", gc->opts->ready_text); + fflush(stdout); + } + if (gc->opts->timeout != -1) + pn_reactor_set_timeout(pn_event_reactor(event), gc->opts->timeout); + } + break; + case PN_REACTOR_FINAL: + { + if (gc->received == 0) statistics_start(gc->stats); + statistics_report(gc->stats, gc->sent, gc->received); + } + break; + default: + break; + } +} + +pn_handler_t *listener_handler(Options_t *opts, Statistics_t *stats) +{ + pn_handler_t *h = pn_handler_new(listener_dispatch, sizeof(global_context_t), listener_cleanup); + global_context_t *gc = global_context(h); + global_context_init(gc, opts, stats); + gc->listener_handler = h; + return h; +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->recv_count = -1; + opts->timeout = -1; + addresses_init( &opts->subscriptions); + + while ((c = getopt(argc, argv, + "a:c:b:w:t:e:RW:F:VN:X:T:C:K:P:")) != -1) { + switch (c) { + case 'a': + { + // TODO: multiple addresses? + char *comma = strchr(optarg, ','); + check(comma == 0, "multiple addresses not implemented"); + check(opts->subscriptions.count == 0, "multiple addresses not implemented"); + addresses_merge( &opts->subscriptions, optarg ); + } + break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'R': opts->reply = 1; break; + case 'V': enable_logging(); break; + case 'X': opts->ready_text = optarg; break; + default: + usage(1); + } + } + + if (opts->subscriptions.count == 0) addresses_add( &opts->subscriptions, + "amqp://~0.0.0.0" ); +} + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + parse_options( argc, argv, &opts ); + pn_reactor_t *reactor = pn_reactor(); + + // set up default handlers for our reactor + pn_handler_t *root = pn_reactor_get_handler(reactor); + pn_handler_t *lh = listener_handler(&opts, &stats); + pn_handler_add(root, lh); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(root, handshaker); + + // Omit decrefs else segfault. Not sure why they are necessary + // to keep valgrind happy for the connection_handler, but not here. + // pn_decref(handshaker); + // pn_decref(lh); + + pn_reactor_run(reactor); + pn_reactor_free(reactor); + + addresses_free( &opts.subscriptions ); + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/tools/reactor-send.c ---------------------------------------------------------------------- diff --git a/c/tools/reactor-send.c b/c/tools/reactor-send.c new file mode 100644 index 0000000..f8d0c50 --- /dev/null +++ b/c/tools/reactor-send.c @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Implements a subset of msgr-send.c using reactor events. + */ + +#include "proton/message.h" +#include "proton/error.h" +#include "proton/types.h" +#include "proton/reactor.h" +#include "proton/handlers.h" +#include "proton/engine.h" +#include "proton/url.h" +#include "msgr-common.h" + +#include +#include +#include +#include + + +typedef struct { + Addresses_t targets; + uint64_t msg_count; + uint32_t msg_size; // of body + uint32_t send_batch; + int outgoing_window; + unsigned int report_interval; // in seconds + //Addresses_t subscriptions; + //Addresses_t reply_tos; + int get_replies; + int unique_message; // 1 -> create and free a pn_message_t for each send/recv + int timeout; // in seconds + int incoming_window; + int recv_count; + const char *name; + char *certificate; + char *privatekey; // used to sign certificate + char *password; // for private key file + char *ca_db; // trusted CA database +} Options_t; + + +static void usage(int rc) +{ + printf("Usage: reactor-send [OPTIONS] \n" + " -a \tThe target address [amqp[s]://domain[/name]]\n" + " -c # \tNumber of messages to send before exiting [0=forever]\n" + " -b # \tSize of message body in bytes [1024]\n" + " -R \tWait for a reply to each sent message\n" + " -V \tEnable debug logging\n" + ); + exit(rc); +} + + +typedef struct { + Options_t *opts; + Statistics_t *stats; + uint64_t sent; + uint64_t received; + pn_message_t *message; + pn_message_t *reply_message; + pn_atom_t id; + char *encoded_data; + size_t encoded_data_size; + pn_url_t *send_url; + pn_string_t *hostname; + pn_string_t *container_id; + pn_string_t *reply_to; +} sender_context_t; + +void sender_context_init(sender_context_t *sc, Options_t *opts, Statistics_t *stats) +{ + sc->opts = opts; + sc->stats = stats; + sc->sent = 0; + sc->received = 0; + sc->id.type = PN_ULONG; + // 4096 extra bytes should easily cover the message metadata + sc->encoded_data_size = sc->opts->msg_size + 4096; + sc->encoded_data = (char *)calloc(1, sc->encoded_data_size); + check(sc->encoded_data, "failed to allocate encoding buffer"); + sc->container_id = pn_string("reactor-send"); // prefer uuid-like name + + sc->reply_message = (sc->opts->get_replies) ? pn_message() : 0; + sc->message = pn_message(); + check(sc->message, "failed to allocate a message"); + sc->reply_to = pn_string("amqp://"); + pn_string_addf(sc->reply_to, "%s", pn_string_get(sc->container_id)); + pn_message_set_reply_to(sc->message, pn_string_get(sc->reply_to)); + pn_data_t *body = pn_message_body(sc->message); + // borrow the encoding buffer this one time + char *data = sc->encoded_data; + pn_data_put_binary(body, pn_bytes(sc->opts->msg_size, data)); + + check(sc->opts->targets.count > 0, "no specified address"); + sc->send_url = pn_url_parse(sc->opts->targets.addresses[0]); + const char *host = pn_url_get_host(sc->send_url); + const char *port = pn_url_get_port(sc->send_url); + sc->hostname = pn_string(host); + if (port && strlen(port)) + pn_string_addf(sc->hostname, ":%s", port); +} + +sender_context_t *sender_context(pn_handler_t *h) +{ + return (sender_context_t *) pn_handler_mem(h); +} + +void sender_cleanup(pn_handler_t *h) +{ + sender_context_t *sc = sender_context(h); + pn_message_free(sc->message); + pn_message_free(sc->reply_message); + pn_url_free(sc->send_url); + pn_free(sc->hostname); + pn_free(sc->container_id); + pn_free(sc->reply_to); + free(sc->encoded_data); +} + +pn_handler_t *replyto_handler(sender_context_t *sc); + +pn_message_t* get_message(sender_context_t *sc, bool sending) { + if (sc->opts->unique_message) { + pn_message_t *m = pn_message(); + check(m, "failed to allocate a message"); + if (sending) { + pn_message_set_reply_to(m, pn_string_get(sc->reply_to)); + // copy the data + pn_data_t *body = pn_message_body(m); + pn_data_t *template_body = pn_message_body(sc->message); + pn_data_put_binary(body, pn_data_get_binary(template_body)); + } + return m; + } + else + return sending ? sc->message : sc->reply_message; // our simplified "message pool" +} + +void return_message(sender_context_t *sc, pn_message_t *m) { + if (sc->opts->unique_message) + pn_message_free(m); +} + +void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) +{ + sender_context_t *sc = sender_context(h); + + switch (type) { + case PN_CONNECTION_INIT: + { + pn_connection_t *conn = pn_event_connection(event); + pn_connection_set_container(conn, pn_string_get(sc->container_id)); + pn_connection_set_hostname(conn, pn_string_get(sc->hostname)); + pn_connection_open(conn); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + pn_link_t *snd = pn_sender(ssn, "sender"); + const char *path = pn_url_get_path(sc->send_url); + if (path && strlen(path)) { + pn_terminus_set_address(pn_link_target(snd), path); + pn_terminus_set_address(pn_link_source(snd), path); + } + pn_link_open(snd); + } + break; + case PN_LINK_FLOW: + { + pn_link_t *snd = pn_event_link(event); + while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) { + if (sc->sent == 0) + statistics_start(sc->stats); + + char tag[8]; + void *ptr = &tag; + *((uint64_t *) ptr) = sc->sent; + pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8)); + + // setup the message to send + pn_message_t *msg = get_message(sc, true);; + pn_message_set_address(msg, sc->opts->targets.addresses[0]); + sc->id.u.as_ulong = sc->sent; + pn_message_set_correlation_id(msg, sc->id); + pn_message_set_creation_time(msg, msgr_now()); + + size_t size = sc->encoded_data_size; + int err = pn_message_encode(msg, sc->encoded_data, &size); + check(err == 0, "message encoding error"); + pn_link_send(snd, sc->encoded_data, size); + pn_delivery_settle(dlv); + sc->sent++; + return_message(sc, msg); + } + if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) { + pn_link_close(snd); + pn_connection_t *conn = pn_event_connection(event); + pn_connection_close(conn); + } + } + break; + case PN_LINK_INIT: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_is_receiver(link)) { + // Response messages link. Could manage credit and deliveries in this handler but + // a dedicated handler also works. + pn_handler_t *replyto = replyto_handler(sc); + pn_flowcontroller_t *fc = pn_flowcontroller(1024); + pn_handler_add(replyto, fc); + pn_decref(fc); + pn_handshaker_t *handshaker = pn_handshaker(); + pn_handler_add(replyto, handshaker); + pn_decref(handshaker); + pn_record_t *record = pn_link_attachments(link); + pn_record_set_handler(record, replyto); + pn_decref(replyto); + } + } + break; + case PN_CONNECTION_LOCAL_CLOSE: + { + statistics_report(sc->stats, sc->sent, sc->received); + } + break; + default: + break; + } +} + +pn_handler_t *sender_handler(Options_t *opts, Statistics_t *stats) +{ + pn_handler_t *h = pn_handler_new(sender_dispatch, sizeof(sender_context_t), sender_cleanup); + sender_context_t *sc = sender_context(h); + sender_context_init(sc, opts, stats); + return h; +} + +sender_context_t *replyto_sender_context(pn_handler_t *h) +{ + sender_context_t **p = (sender_context_t **) pn_handler_mem(h); + return *p; +} + +void replyto_cleanup(pn_handler_t *h) +{} + +void replyto_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type) { + sender_context_t *sc = replyto_sender_context(h); + + switch (type) { + case PN_DELIVERY: + { + check(sc->opts->get_replies, "Unexpected reply message"); + pn_link_t *recv_link = pn_event_link(event); + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) { + size_t encoded_size = pn_delivery_pending(dlv); + check(encoded_size <= sc->encoded_data_size, "decoding buffer too small"); + ssize_t n = pn_link_recv(recv_link, sc->encoded_data, encoded_size); + check(n == (ssize_t)encoded_size, "read fail on reply link"); + pn_message_t *msg = get_message(sc, false); + int err = pn_message_decode(msg, sc->encoded_data, n); + check(err == 0, "message decode error"); + statistics_msg_received(sc->stats, msg); + sc->received++; + pn_delivery_settle(dlv); + return_message(sc, msg); + } + if (sc->received == sc->opts->msg_count) { + pn_link_close(recv_link); + pn_connection_t *conn = pn_event_connection(event); + pn_connection_close(conn); + } + } + break; + default: + break; + } +} + +pn_handler_t *replyto_handler(sender_context_t *sc) +{ + pn_handler_t *h = pn_handler_new(replyto_dispatch, sizeof(sender_context_t *), replyto_cleanup); + sender_context_t **p = (sender_context_t **) pn_handler_mem(h); + *p = sc; + return h; +} + +static void parse_options( int argc, char **argv, Options_t *opts ) +{ + int c; + opterr = 0; + + memset( opts, 0, sizeof(*opts) ); + opts->msg_size = 1024; + opts->send_batch = 1024; + opts->timeout = -1; + opts->recv_count = -1; + opts->unique_message = 0; + addresses_init(&opts->targets); + + while ((c = getopt(argc, argv, + "ua:c:b:p:w:e:l:Rt:W:B:VN:T:C:K:P:")) != -1) { + switch(c) { + case 'a': + { + // TODO: multiple addresses? To keep tests happy, accept multiple for now, + // but ignore all but the first. + addresses_merge( &opts->targets, optarg ); + } + break; + case 'c': + if (sscanf( optarg, "%" SCNu64, &opts->msg_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'b': + if (sscanf( optarg, "%u", &opts->msg_size ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'p': + if (sscanf( optarg, "%u", &opts->send_batch ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'w': + if (sscanf( optarg, "%d", &opts->outgoing_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'e': + if (sscanf( optarg, "%u", &opts->report_interval ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'R': opts->get_replies = 1; break; + case 'u': opts->unique_message = 1; break; + case 't': + if (sscanf( optarg, "%d", &opts->timeout ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + if (opts->timeout > 0) opts->timeout *= 1000; + break; + case 'W': + if (sscanf( optarg, "%d", &opts->incoming_window ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'B': + if (sscanf( optarg, "%d", &opts->recv_count ) != 1) { + fprintf(stderr, "Option -%c requires an integer argument.\n", optopt); + usage(1); + } + break; + case 'V': enable_logging(); break; + case 'N': opts->name = optarg; break; + case 'T': opts->ca_db = optarg; break; + case 'C': opts->certificate = optarg; break; + case 'K': opts->privatekey = optarg; break; + case 'P': parse_password( optarg, &opts->password ); break; + + default: + usage(1); + } + } + + // default target if none specified + if (opts->targets.count == 0) addresses_add( &opts->targets, "amqp://0.0.0.0" ); +} + + +int main(int argc, char** argv) +{ + Options_t opts; + Statistics_t stats; + parse_options( argc, argv, &opts ); + + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *sh = sender_handler(&opts, &stats); + pn_handler_add(sh, pn_handshaker()); + pn_reactor_connection(reactor, sh); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + + pn_handler_free(sh); + addresses_free(&opts.targets); + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/versions.cmake ---------------------------------------------------------------------- diff --git a/c/versions.cmake b/c/versions.cmake new file mode 100644 index 0000000..049e6f2 --- /dev/null +++ b/c/versions.cmake @@ -0,0 +1,14 @@ +set(PN_LIB_CORE_MAJOR_VERSION 10) +set(PN_LIB_CORE_MINOR_VERSION 3) +set(PN_LIB_CORE_PATCH_VERSION 1) +set(PN_LIB_CORE_VERSION "${PN_LIB_CORE_MAJOR_VERSION}.${PN_LIB_CORE_MINOR_VERSION}.${PN_LIB_CORE_PATCH_VERSION}") + +set(PN_LIB_PROACTOR_MAJOR_VERSION 1) +set(PN_LIB_PROACTOR_MINOR_VERSION 3) +set(PN_LIB_PROACTOR_PATCH_VERSION 0) +set(PN_LIB_PROACTOR_VERSION "${PN_LIB_PROACTOR_MAJOR_VERSION}.${PN_LIB_PROACTOR_MINOR_VERSION}.${PN_LIB_PROACTOR_PATCH_VERSION}") + +set(PN_LIB_LEGACY_MAJOR_VERSION 11) +set(PN_LIB_LEGACY_MINOR_VERSION 4) +set(PN_LIB_LEGACY_PATCH_VERSION 0) +set(PN_LIB_LEGACY_VERSION "${PN_LIB_LEGACY_MAJOR_VERSION}.${PN_LIB_LEGACY_MINOR_VERSION}.${PN_LIB_LEGACY_PATCH_VERSION}") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/config.bat.in ---------------------------------------------------------------------- diff --git a/config.bat.in b/config.bat.in deleted file mode 100644 index d8c958e..0000000 --- a/config.bat.in +++ /dev/null @@ -1,51 +0,0 @@ -REM -REM Licensed to the Apache Software Foundation (ASF) under one -REM or more contributor license agreements. See the NOTICE file -REM distributed with this work for additional information -REM regarding copyright ownership. The ASF licenses this file -REM to you under the Apache License, Version 2.0 (the -REM "License"); you may not use this file except in compliance -REM with the License. You may obtain a copy of the License at -REM -REM http://www.apache.org/licenses/LICENSE-2.0 -REM -REM Unless required by applicable law or agreed to in writing, -REM software distributed under the License is distributed on an -REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -REM KIND, either express or implied. See the License for the -REM specific language governing permissions and limitations -REM under the License. -REM - -REM This is a generated file and will be overwritten the next -REM time that cmake is run. - -REM This build may be one of @CMAKE_CONFIGURATION_TYPES@ -REM Choose the configuration this script should reference: -SET PROTON_BUILD_CONFIGURATION=relwithdebinfo - -REM PROTON_HOME is the root of the proton checkout -REM PROTON_BUILD is where cmake was run - -set PROTON_HOME=@CMAKE_SOURCE_DIR@ -set PROTON_BUILD=@CMAKE_BINARY_DIR@ - -set PROTON_HOME=%PROTON_HOME:/=\% -set PROTON_BUILD=%PROTON_BUILD:/=\% - -set PROTON_BINDINGS=%PROTON_BUILD%\proton-c\bindings - -REM Python -set PYTHON_BINDINGS=%PROTON_BINDINGS%\python -set COMMON_PYPATH=%PROTON_HOME%\tests\python;%PROTON_HOME%\proton-c\bindings\python -set PYTHONPATH=%COMMON_PYPATH%;%PYTHON_BINDINGS% - -REM Ruby -set RUBY_BINDINGS=%PROTON_BINDINGS%\ruby -set RUBYLIB=%RUBY_BINDINGS%;%PROTON_HOME%\proton-c\bindings\ruby\lib;%PROTON_HOME%\tests\ruby - -REM test applications -set PATH=%PATH%;%PROTON_BUILD%\tests\tools\apps\c -set PATH=%PATH%;%PROTON_HOME%\tests\tools\apps\python -set PATH=%PATH%;%PROTON_HOME%\tests\python -set PATH=%PATH%;%PROTON_BUILD%\proton-c\%PROTON_BUILD_CONFIGURATION% http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/config.sh.in ---------------------------------------------------------------------- diff --git a/config.sh.in b/config.sh.in deleted file mode 100755 index bcb7ede..0000000 --- a/config.sh.in +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -merge_paths() { - # Merge paths, remove duplicates (keep first instance) - path=$(echo $* | sed 's/:/ /'g) # Split with spaces. - newpath="" - for d in $path; do # Remove duplicates - { echo $newpath | grep -q "\(:\|^\)$d\(:\|$\)"; } || newpath="$newpath:$d" - done - echo $newpath | sed 's/^://' # Remove leading : -} - -PROTON_HOME=@CMAKE_SOURCE_DIR@ -PROTON_BUILD=@CMAKE_BINARY_DIR@ - -PROTON_BINDINGS=$PROTON_BUILD/proton-c/bindings - -PYTHON_BINDINGS=$PROTON_BINDINGS/python - -# Python -COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/tools/py -export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS - -# Ruby -RUBY_BINDINGS=$PROTON_BINDINGS/ruby -RUBY_SRC=$PROTON_HOME/proton-c/bindings/ruby -export RUBYLIB=$RUBY_BINDINGS:$RUBY_SRC/lib:$RUBY_SRC/tests:$RUBY_SRC/spec - -# Go -export GOPATH="$PROTON_BUILD/proton-c/bindings/go" -# Help Go compiler find libraries and include files. -export C_INCLUDE_PATH="$(merge_paths $PROTON_HOME/proton-c/include $PROTON_BUILD/proton-c/include $C_INCLUDE_PATH)" -export LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LIBRARY_PATH)" -export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)" - - - -# test applications -export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)" - -# can the test harness use valgrind? -if [[ -x "$(type -p valgrind)" && "@ENABLE_VALGRIND@" == "ON" ]] ; then - export VALGRIND=$(type -p valgrind) - export VALGRIND_ARGS="@VALGRIND_OPTIONS@" -fi - -# can the test harness use saslpasswd2? -if [[ -x "$(type -p saslpasswd2)" ]] ; then - export SASLPASSWD=$(type -p saslpasswd2) -fi - -# Location of interop test files. -export PN_INTEROP_DIR=$PROTON_HOME/tests/interop http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt new file mode 100644 index 0000000..0311506 --- /dev/null +++ b/cpp/CMakeLists.txt @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +cmake_minimum_required(VERSION 2.8.12) + +enable_language(CXX) + +set(CMAKE_THREAD_PREFER_PTHREAD TRUE) +find_package(Threads) + +include(versions.cmake) + +set (BUILD_CPP_03 OFF CACHE BOOL "Compile the C++ binding as C++03 even when C++11 is available") + +# This effectively checks for cmake version 3.1 or later +if (DEFINED CMAKE_CXX_COMPILE_FEATURES) + if (BUILD_CPP_03) + set(STD 98) + else () + set(STD 11) + list(APPEND PLATFORM_LIBS "${CMAKE_THREAD_LIBS_INIT}") + endif () + set(CMAKE_CXX_STANDARD ${STD}) + set(CMAKE_CXX_EXTENSIONS OFF) + if (MSVC) # Compiler feature checks only needed for Visual Studio in this case + include(cpp.cmake) + elseif (STD EQUAL 11) + set(CPP_DEFINITIONS "HAS_CPP11") + endif() +else () + if (BUILD_CPP_03) + set(CXX_STANDARD "-std=c++98") + else () + include(CheckCXXCompilerFlag) + # These flags work with GCC/Clang/SunPro compilers + check_cxx_compiler_flag("-std=c++11" ACCEPTS_CXX11) + check_cxx_compiler_flag("-std=c++0x" ACCEPTS_CXX0X) + if (ACCEPTS_CXX11) + set(CXX_STANDARD "-std=c++11") + set(CPP_DEFINITIONS "HAS_CPP11") + list(APPEND PLATFORM_LIBS "${CMAKE_THREAD_LIBS_INIT}") + elseif(ACCEPTS_CXX0X) + set(CXX_STANDARD "-std=c++0x") + list(APPEND PLATFORM_LIBS "${CMAKE_THREAD_LIBS_INIT}") + include(cpp.cmake) # Compiler checks needed for C++0x as not all C++11 may be supported + else() + include(cpp.cmake) # Compiler checks needed as we have no idea whats going on here! + endif() + endif() +endif () + +# Construct #define lines to insert in config_presets.hpp +foreach(d ${CPP_DEFINITIONS}) + set(presets "${presets}#define PN_CPP_LIB_${d} 1\n") +endforeach() +# For Visual Studio define the app compile time macros now, for everything else don't +if(MSVC) + set(presets "${presets}\n// Compiled for MSVC version ${CMAKE_CXX_COMPILER_VERSION} (${MSVC_VERSION})\n") + foreach(d ${CPP_DEFINITIONS}) + set(presets "${presets}# define PN_CPP_${d} 1\n") + endforeach() +else() + set(presets "${presets}\n#if qpid_proton_cpp_EXPORTS\n") + foreach(d ${CPP_DEFINITIONS}) + set(presets "${presets}# define PN_CPP_${d} 1\n") + endforeach() + set(presets "${presets}#endif // qpid_proton_cpp_EXPORTS\n") +endif() + +configure_file(config_presets.hpp.in config_presets.hpp @ONLY) + +# Make these CACHE INTERNAL so they will be set for the C++ examples +set(CXX_EXAMPLE_FLAGS "${CXX_WARNING_FLAGS} ${CMAKE_CXX_FLAGS} ${CXX_STANDARD}" CACHE INTERNAL "") +set(CXX_EXAMPLE_LINK_FLAGS "${SANITIZE_FLAGS}" CACHE INTERNAL "") + +include_directories ( + "${PN_C_INCLUDE_DIR}" + "${CMAKE_SOURCE_DIR}/c/src" # Here because of a naughty looking dependency on message-internal.h + "${CMAKE_CURRENT_SOURCE_DIR}/include" + "${CMAKE_CURRENT_BINARY_DIR}" + ) + +add_definitions(${CXX_STANDARD} ${CXX_WARNING_FLAGS} "-DPN_CPP_USE_DEPRECATED_API=1") + +set(qpid-proton-cpp-source + src/binary.cpp + src/byte_array.cpp + src/map.cpp + src/connection.cpp + src/connection_driver.cpp + src/connection_options.cpp + src/container.cpp + src/proactor_container_impl.cpp + src/contexts.cpp + src/data.cpp + src/decimal.cpp + src/decoder.cpp + src/delivery.cpp + src/duration.cpp + src/encoder.cpp + src/endpoint.cpp + src/error.cpp + src/error_condition.cpp + src/handler.cpp + src/link.cpp + src/link_namer.cpp + src/listener.cpp + src/message.cpp + src/messaging_adapter.cpp + src/node_options.cpp + src/null.cpp + src/object.cpp + src/proton_bits.cpp + src/receiver.cpp + src/receiver_options.cpp + src/reconnect_options.cpp + src/returned.cpp + src/sasl.cpp + src/scalar_base.cpp + src/sender.cpp + src/sender_options.cpp + src/session.cpp + src/session_options.cpp + src/source.cpp + src/ssl.cpp + src/ssl_domain.cpp + src/target.cpp + src/terminus.cpp + src/timestamp.cpp + src/tracker.cpp + src/transfer.cpp + src/transport.cpp + src/type_id.cpp + src/url.cpp + src/uuid.cpp + src/value.cpp + src/work_queue.cpp + ) + +set_source_files_properties ( + ${qpid-proton-cpp-source} + PROPERTIES + COMPILE_FLAGS "${LTO}" + ) + +add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source}) + +target_link_libraries (qpid-proton-cpp LINK_PRIVATE ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor) + +set_target_properties ( + qpid-proton-cpp + PROPERTIES + LINKER_LANGUAGE CXX + VERSION "${PN_LIB_CPP_VERSION}" + SOVERSION "${PN_LIB_CPP_MAJOR_VERSION}" + LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}" + ) + +## Install + +install(TARGETS qpid-proton-cpp + EXPORT proton-cpp + RUNTIME DESTINATION bin + ARCHIVE DESTINATION ${LIB_INSTALL_DIR} + LIBRARY DESTINATION ${LIB_INSTALL_DIR}) + +# Install windows qpid-proton-cpp pdb files +if (MSVC) + install(FILES $ + DESTINATION bin + CONFIGURATIONS RelWithDebInfo Debug + OPTIONAL) +endif (MSVC) + +install (DIRECTORY "include/proton" DESTINATION ${INCLUDE_INSTALL_DIR} FILES_MATCHING PATTERN "*.hpp") +install (FILES "${CMAKE_CURRENT_BINARY_DIR}/config_presets.hpp" DESTINATION "${INCLUDE_INSTALL_DIR}/proton/internal") +install (DIRECTORY "examples/" + DESTINATION "${PROTON_SHARE}/examples/cpp" + PATTERN "ProtonCppConfig.cmake" EXCLUDE) + +add_subdirectory(examples) +add_subdirectory(docs) + +# Pkg config file +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/libqpid-proton-cpp.pc.in + ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-cpp.pc @ONLY) +install (FILES + ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-cpp.pc + DESTINATION ${LIB_INSTALL_DIR}/pkgconfig) + +if (DEFINED CMAKE_IMPORT_LIBRARY_PREFIX) +set(PROTONCPPLIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-cpp${CMAKE_IMPORT_LIBRARY_SUFFIX}) +set(PROTONCPPLIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-cpp${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX}) +else () +set(PROTONCPPLIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-cpp${CMAKE_SHARED_LIBRARY_SUFFIX}) +set(PROTONCPPLIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-cpp${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX}) +endif () + +include(WriteBasicConfigVersionFile) + +configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/ProtonCppConfig.cmake.in + ${CMAKE_CURRENT_BINARY_DIR}/ProtonCppConfig.cmake @ONLY) +write_basic_config_version_file( + ${CMAKE_CURRENT_BINARY_DIR}/ProtonCppConfigVersion.cmake + VERSION ${PN_VERSION} + COMPATIBILITY AnyNewerVersion) +install (FILES + ${CMAKE_CURRENT_BINARY_DIR}/ProtonCppConfig.cmake + ${CMAKE_CURRENT_BINARY_DIR}/ProtonCppConfigVersion.cmake + DESTINATION ${LIB_INSTALL_DIR}/cmake/ProtonCpp) + +macro(add_cpp_test test) + add_executable (${test} src/${test}.cpp) + target_link_libraries (${test} qpid-proton-cpp ${PLATFORM_LIBS}) + if (CMAKE_SYSTEM_NAME STREQUAL Windows) + add_test (NAME cpp-${test} + COMMAND ${PN_ENV_SCRIPT} + "PATH=$" + $ ${ARGN}) + else () + add_test (NAME cpp-${test} COMMAND ${memcheck-cmd} ${CMAKE_CURRENT_BINARY_DIR}/${test} ${ARGN}) + endif () +endmacro(add_cpp_test) + +add_cpp_test(codec_test) +add_cpp_test(connection_driver_test) +add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests) +add_cpp_test(message_test) +add_cpp_test(map_test) +add_cpp_test(scalar_test) +add_cpp_test(value_test) +add_cpp_test(container_test) +add_cpp_test(url_test) +add_cpp_test(reconnect_test) +add_cpp_test(link_test) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/ProtonCppConfig.cmake.in ---------------------------------------------------------------------- diff --git a/cpp/ProtonCppConfig.cmake.in b/cpp/ProtonCppConfig.cmake.in new file mode 100644 index 0000000..0068067 --- /dev/null +++ b/cpp/ProtonCppConfig.cmake.in @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Name: Proton +# Description: Qpid Proton C library +# Version: @PN_VERSION@ +# URL: http://qpid.apache.org/proton/ + +set (ProtonCpp_VERSION @PN_VERSION@) + +set (ProtonCpp_INCLUDE_DIRS @INCLUDEDIR@) +set (ProtonCpp_LIBRARIES optimized @LIBDIR@/@PROTONCPPLIB@ debug @LIBDIR@/@PROTONCPPLIBDEBUG@) + +set (ProtonCpp_FOUND True) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/README.md ---------------------------------------------------------------------- diff --git a/cpp/README.md b/cpp/README.md new file mode 100644 index 0000000..35278a4 --- /dev/null +++ b/cpp/README.md @@ -0,0 +1,43 @@ +# Qpid Proton C++ + +This is a C++ binding for the Proton API. + +The documentation includes a tutorial and API documentation. + +To generate the documentation go to your build directory, run `make +docs-cpp`, and open `cpp/docs/html/index.html` in a +browser. + +## Todo + +### Tests + +- Interop/type testing: proton/tests/interop, new interop suite +- More unit testing, measured code coverage +- Test examples against ActiveMQ and qpidd + +### Bugs + +- Error handling: + - examples exit silently on broker exit/not running, core on + no-such-queue (e.g., with qpidd) + +### Features + +- SASL/SSL support with interop tests. +- Reconnection +- Browsing +- Selectors +- AMQP described types and arrays, full support and tests +- Durable subscriptions & demos (see python changes) +- Transactions +- Heartbeats + +### Nice to have + +- C++11 lambda version of handlers +- Helpers (or at least doc) for multi-threaded use (container per connection) +- Usable support for decimal types +- Expose endpoint conditions as C++ proton::condition error class +- Selectables and 3rd party event loop support +- More efficient shared_ptr (single family per proton object) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/config_presets.hpp.in ---------------------------------------------------------------------- diff --git a/cpp/config_presets.hpp.in b/cpp/config_presets.hpp.in new file mode 100644 index 0000000..676681b --- /dev/null +++ b/cpp/config_presets.hpp.in @@ -0,0 +1,26 @@ +#ifndef PROTON_INTERNAL_CONFIG_PRESETS_HPP +#define PROTON_INTERNAL_CONFIG_PRESETS_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +@presets@ + #endif // PROTON_INTERNAL_CONFIG_PRESETS_HPP \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/cpp.cmake ---------------------------------------------------------------------- diff --git a/cpp/cpp.cmake b/cpp/cpp.cmake new file mode 100644 index 0000000..0791044 --- /dev/null +++ b/cpp/cpp.cmake @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Check C++ capabilities. + +include(CheckCXXSourceCompiles) + +macro (cxx_test prog name) + check_cxx_source_compiles("${prog}" HAS_${name}) + if (HAS_${name}) + list(APPEND CPP_DEFINITIONS "HAS_${name}") + else() + set(CPP_TEST_FAILED True) + endif() +endmacro() + +set(CPP_DEFINITIONS "") +set(CMAKE_REQUIRED_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_STANDARD} ${CXX_WARNING_FLAGS}") +cxx_test("#if defined(__cplusplus) && __cplusplus >= 201103\nint main(int, char**) { return 0; }\n#endif" CPP11) +# Don't need to check individual flags if compiler claims to be C++11 or later as they will be set automatically +if (NOT HAS_CPP11) + set(CPP_TEST_FAILED False) + cxx_test("long long ll; int main(int, char**) { return 0; }" LONG_LONG_TYPE) + cxx_test("int* x = nullptr; int main(int, char**) { return 0; }" NULLPTR) + cxx_test("#include \nvoid blah(std::string&&) {} int main(int, char**) { blah(\"hello\"); return 0; }" RVALUE_REFERENCES) + cxx_test("class x {explicit operator int(); }; int main(int, char**) { return 0; }" EXPLICIT_CONVERSIONS) + cxx_test("class x {x()=default; }; int main(int, char**) { return 0; }" DEFAULTED_FUNCTIONS) + cxx_test("class x {x(x&&)=default; }; int main(int, char**) { return 0; }" DEFAULTED_MOVE_INITIALIZERS) + cxx_test("class x {x()=delete; }; int main(int, char**) { return 0; }" DELETED_FUNCTIONS) + cxx_test("struct x {x() {}}; int main(int, char**) { static thread_local x foo; return 0; }" THREAD_LOCAL) + cxx_test("int main(int, char**) { int a=[](){return 42;}(); return a; }" LAMBDAS) + cxx_test("template void x(X... a) {} int main(int, char**) { x(1); x(43, \"\"); return 0; }" VARIADIC_TEMPLATES) + + cxx_test("#include \nint main(int, char**) { return 0; }" HEADER_RANDOM) + cxx_test("#include \nstd::unique_ptr u; int main(int, char**) { return 0; }" STD_UNIQUE_PTR) + cxx_test("#include \nstd::thread t; int main(int, char**) { return 0; }" STD_THREAD) + cxx_test("#include \nstd::mutex m; int main(int, char**) { return 0; }" STD_MUTEX) + cxx_test("#include \nstd::atomic a; int main(int, char**) { return 0; }" STD_ATOMIC) + + # If all the tests passed this is the same as if we have C++11 for the purposes of compilation + # (this shortens the compile command line for VS 2017 significantly) + if (NOT CPP_TEST_FAILED) + set(CPP_DEFINITIONS "HAS_CPP11") + endif() +endif() +unset(CMAKE_REQUIRED_FLAGS) # Don't contaminate later C tests with C++ flags http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/docs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/docs/CMakeLists.txt b/cpp/docs/CMakeLists.txt new file mode 100644 index 0000000..d512d15 --- /dev/null +++ b/cpp/docs/CMakeLists.txt @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +find_package(Doxygen) + +if (DOXYGEN_FOUND) + configure_file ( + ${CMAKE_CURRENT_SOURCE_DIR}/user.doxygen.in + ${CMAKE_CURRENT_BINARY_DIR}/user.doxygen) + + file(GLOB_RECURSE headers ../include/proton/*.hpp) + add_custom_target (docs-cpp + COMMAND ${CMAKE_COMMAND} -E remove_directory html # get rid of old files + COMMAND ${DOXYGEN_EXECUTABLE} user.doxygen + DEPENDS ${headers} + ) + add_dependencies (docs docs-cpp) + # HTML files are generated to ./html - put those in the install. + install (DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/html/" + DESTINATION "${PROTON_SHARE}/docs/api-cpp" + COMPONENT documentation + OPTIONAL) +endif (DOXYGEN_FOUND) + +set_directory_properties(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES html) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/docs/io.md ---------------------------------------------------------------------- diff --git a/cpp/docs/io.md b/cpp/docs/io.md new file mode 100644 index 0000000..2bda1cf --- /dev/null +++ b/cpp/docs/io.md @@ -0,0 +1,22 @@ +# IO integration {#io_page} + +**Unsettled API** - The `proton::io` interfaces are new and remain +subject to change. + +The `proton::io` namespace contains a service provider interface (SPI) +that allows you to embed Proton in alternative IO or threading +libraries. + +The `proton::io::connection_driver` class converts an AMQP-encoded +byte stream, read from any IO source, into `proton::messaging_handler` +calls. It generates an AMQP-encoded byte stream as output that can be +written to any IO destination. + +The connection driver has no threading or IO dependencies. It is not +thread-safe, but separate instances are independent and can be run +concurrently in a multithreaded framework. The driver is written in +portable C++98-compatible code. + +For examples of use, see +[the proton source code](qpid.apache.org/proton), in particular the +C++ `proton::container`. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/docs/main.md ---------------------------------------------------------------------- diff --git a/cpp/docs/main.md b/cpp/docs/main.md new file mode 100644 index 0000000..25ba973 --- /dev/null +++ b/cpp/docs/main.md @@ -0,0 +1,69 @@ +# Introduction {#mainpage} + +The Qpid Proton C++ API enables writing clients and servers that send +and receive messages using the AMQP protocol. It is part of the +[Qpid Proton](https://qpid.apache.org/proton/index.html) suite of +messaging APIs. + +The @ref overview_page presents the API's central concepts and +mechanics. + +The @ref tutorial_page guides you through some basic examples. See +[**Examples**](examples.html) for a complete list of the sample +programs, including more advanced ones. + +Qpid Proton C++ can be used in single- and multithreaded applications. +See @ref mt_page for guidance on writing efficient multithreaded +messaging applications. + +## Namespaces + +The main @ref proton namespace contains classes and functions +representing AMQP concepts and key elements of the API. Together they +form a "protocol engine" API to create AMQP @ref proton::connection +"connections" and @ref proton::link "links", handle @ref +proton::messaging\_handler "events", and send and receive @ref +proton::message "messages". See @ref overview_page for more +information. + +The main @ref proton namespace also contains C++ classes and functions +for handling AMQP- and API-specific data types. See @ref types_page +for more information. + +The @ref proton::codec namespace contains interfaces for AMQP data +encoding and decoding. + +The @ref proton::io namespace contains interfaces for integrating with +platform-native network IO. See @ref io_page for more information. + +## Conventions + +Elements of the API marked as **Unsettled API**, including any +elements contained within them, are still evolving and thus are +subject to change. They are available to use, but newer versions of +Proton may require changes to your application source code. + +Elements marked **Deprecated** are slated for removal in a future +release. + +Sections labeled **Thread safety** describe when and where it is safe +to call functions or access data across threads. + +Sections called **C++ versions** discuss features of the API that +depend on particular versions of C++ such as C++11. + +## URLs + +The API uses URLs to identify three different kinds of resources. All +URL argument names are suffixed with `_url`. + +Connection URLs (`conn_url` in argument lists) specify a target for +outgoing network connections. The path part of the URL is ignored. + +Address URLs (`addr_url`) extend the connection URL to reference an +AMQP node such as a queue or topic. The path of the URL, minus the +leading slash, is treated as the AMQP address of the node. + +Listener URLs (`listen_url`) specify a local network address and port for +accepting incoming TCP connections. The path part of the URL is ignored. +The host part may be empty, meaning "listen on all available interfaces". --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org