Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8BC8D200D0E for ; Tue, 12 Sep 2017 01:18:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7FDF21609C6; Mon, 11 Sep 2017 23:18:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A05D61609C4 for ; Tue, 12 Sep 2017 01:18:17 +0200 (CEST) Received: (qmail 42383 invoked by uid 500); 11 Sep 2017 23:18:16 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 42373 invoked by uid 99); 11 Sep 2017 23:18:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Sep 2017 23:18:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3E8B2F578A; Mon, 11 Sep 2017 23:18:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jross@apache.org To: commits@qpid.apache.org Date: Mon, 11 Sep 2017 23:18:16 -0000 Message-Id: <9b3344d5c1a64680b1efc679b39bdeaf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] qpid-proton git commit: PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level archived-at: Mon, 11 Sep 2017 23:18:19 -0000 Repository: qpid-proton Updated Branches: refs/heads/master 6888ab5e5 -> 564e0ca4c http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c deleted file mode 100644 index 6fd74a5..0000000 --- a/examples/c/proactor/receive.c +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -typedef struct app_data_t { - const char *host, *port; - const char *amqp_address; - const char *container_id; - int message_count; - - pn_proactor_t *proactor; - int received; - bool finished; -} app_data_t; - -static const int BATCH = 1000; /* Batch size for unlimited receive */ - -static int exit_code = 0; - -static void check_condition(pn_event_t *e, pn_condition_t *cond) { - if (pn_condition_is_set(cond)) { - fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), - pn_condition_get_name(cond), pn_condition_get_description(cond)); - pn_connection_close(pn_event_connection(e)); - exit_code = 1; - } -} - -#define MAX_SIZE 1024 - -static void decode_message(pn_delivery_t *dlv) { - static char buffer[MAX_SIZE]; - ssize_t len; - // try to decode the message body - if (pn_delivery_pending(dlv) < MAX_SIZE) { - // read in the raw data - len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); - if (len > 0) { - // decode it into a proton message - pn_message_t *m = pn_message(); - if (PN_OK == pn_message_decode(m, buffer, len)) { - pn_string_t *s = pn_string(NULL); - pn_inspect(pn_message_body(m), s); - printf("%s\n", pn_string_get(s)); - pn_free(s); - } - pn_message_free(m); - } - } -} - -/* Return true to continue, false to exit */ -static bool handle(app_data_t* app, pn_event_t* event) { - switch (pn_event_type(event)) { - - case PN_CONNECTION_INIT: { - pn_connection_t* c = pn_event_connection(event); - pn_connection_set_container(c, app->container_id); - pn_connection_open(c); - pn_session_t* s = pn_session(c); - pn_session_open(s); - pn_link_t* l = pn_receiver(s, "my_receiver"); - pn_terminus_set_address(pn_link_source(l), app->amqp_address); - pn_link_open(l); - /* cannot receive without granting credit: */ - pn_link_flow(l, app->message_count ? app->message_count : BATCH); - } break; - - case PN_DELIVERY: { - /* A message has been received */ - pn_link_t *link = NULL; - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { - link = pn_delivery_link(dlv); - decode_message(dlv); - /* Accept the delivery */ - pn_delivery_update(dlv, PN_ACCEPTED); - /* done with the delivery, move to the next and free it */ - pn_link_advance(link); - pn_delivery_settle(dlv); /* dlv is now freed */ - - if (app->message_count == 0) { - /* receive forever - see if more credit is needed */ - if (pn_link_credit(link) < BATCH/2) { - /* Grant enough credit to bring it up to BATCH: */ - pn_link_flow(link, BATCH - pn_link_credit(link)); - } - } else if (++app->received >= app->message_count) { - /* done receiving, close the endpoints */ - printf("%d messages received\n", app->received); - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); - } - } - } break; - - case PN_TRANSPORT_CLOSED: - check_condition(event, pn_transport_condition(pn_event_transport(event))); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition(event, pn_session_remote_condition(pn_event_session(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_LINK_REMOTE_CLOSE: - case PN_LINK_REMOTE_DETACH: - check_condition(event, pn_link_remote_condition(pn_event_link(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_PROACTOR_INACTIVE: - return false; - break; - - default: - break; - } - return true; -} - -void run(app_data_t *app) { - /* Loop and handle events */ - do { - pn_event_batch_t *events = pn_proactor_wait(app->proactor); - for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { - if (!handle(app, e)) { - return; - } - } - pn_proactor_done(app->proactor, events); - } while(true); -} - -int main(int argc, char **argv) { - struct app_data_t app = {0}; - int i = 0; - app.container_id = argv[i++]; /* Should be unique */ - app.host = (argc > 1) ? argv[i++] : ""; - app.port = (argc > 1) ? argv[i++] : "amqp"; - app.amqp_address = (argc > i) ? argv[i++] : "examples"; - app.message_count = (argc > i) ? atoi(argv[i++]) : 10; - - /* Create the proactor and connect */ - app.proactor = pn_proactor(); - char addr[PN_MAX_ADDR]; - pn_proactor_addr(addr, sizeof(addr), app.host, app.port); - pn_proactor_connect(app.proactor, pn_connection(), addr); - run(&app); - pn_proactor_free(app.proactor); - return exit_code; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c deleted file mode 100644 index 43da8b0..0000000 --- a/examples/c/proactor/send.c +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -typedef struct app_data_t { - const char *host, *port; - const char *amqp_address; - const char *container_id; - int message_count; - - pn_proactor_t *proactor; - pn_rwbytes_t message_buffer; - int sent; - int acknowledged; -} app_data_t; - -static int exit_code = 0; - -static void check_condition(pn_event_t *e, pn_condition_t *cond) { - if (pn_condition_is_set(cond)) { - fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), - pn_condition_get_name(cond), pn_condition_get_description(cond)); - pn_connection_close(pn_event_connection(e)); - exit_code = 1; - } -} - -/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ -static pn_bytes_t encode_message(app_data_t* app) { - /* Construct a message with the map { "sequence": app.sent } */ - pn_message_t* message = pn_message(); - pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ - pn_data_t* body = pn_message_body(message); - pn_data_put_map(body); - pn_data_enter(body); - pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); - pn_data_put_int(body, app->sent); /* The sequence number */ - pn_data_exit(body); - - /* encode the message, expanding the encode buffer as needed */ - if (app->message_buffer.start == NULL) { - static const size_t initial_size = 128; - app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); - } - /* app->message_buffer is the total buffer space available. */ - /* mbuf wil point at just the portion used by the encoded message */ - pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); - int status = 0; - while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->message_buffer.size *= 2; - app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); - mbuf.size = app->message_buffer.size; - } - if (status != 0) { - fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); - exit(1); - } - pn_message_free(message); - return pn_bytes(mbuf.size, mbuf.start); -} - -/* Returns true to continue, false if finished */ -static bool handle(app_data_t* app, pn_event_t* event) { - switch (pn_event_type(event)) { - - case PN_CONNECTION_INIT: { - pn_connection_t* c = pn_event_connection(event); - pn_connection_set_container(c, app->container_id); - pn_connection_open(c); - pn_session_t* s = pn_session(pn_event_connection(event)); - pn_session_open(s); - pn_link_t* l = pn_sender(s, "my_sender"); - pn_terminus_set_address(pn_link_target(l), app->amqp_address); - pn_link_open(l); - break; - } - - case PN_LINK_FLOW: { - /* The peer has given us some credit, now we can send messages */ - pn_link_t *sender = pn_event_link(event); - while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { - ++app->sent; - // Use sent counter as unique delivery tag. - pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); - pn_bytes_t msgbuf = encode_message(app); - pn_link_send(sender, msgbuf.start, msgbuf.size); - pn_link_advance(sender); - } - break; - } - - case PN_DELIVERY: { - /* We received acknowledgedment from the peer that a message was delivered. */ - pn_delivery_t* d = pn_event_delivery(event); - if (pn_delivery_remote_state(d) == PN_ACCEPTED) { - if (++app->acknowledged == app->message_count) { - printf("%d messages sent and acknowledged\n", app->acknowledged); - pn_connection_close(pn_event_connection(event)); - /* Continue handling events till we receive TRANSPORT_CLOSED */ - } - } else { - fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d)); - pn_connection_close(pn_event_connection(event)); - exit_code=1; - } - break; - } - - case PN_TRANSPORT_CLOSED: - check_condition(event, pn_transport_condition(pn_event_transport(event))); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition(event, pn_session_remote_condition(pn_event_session(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_LINK_REMOTE_CLOSE: - case PN_LINK_REMOTE_DETACH: - check_condition(event, pn_link_remote_condition(pn_event_link(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_PROACTOR_INACTIVE: - return false; - - default: break; - } - return true; -} - -void run(app_data_t *app) { - /* Loop and handle events */ - do { - pn_event_batch_t *events = pn_proactor_wait(app->proactor); - for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { - if (!handle(app, e)) { - return; - } - } - pn_proactor_done(app->proactor, events); - } while(true); -} - -int main(int argc, char **argv) { - struct app_data_t app = {0}; - int i = 0; - app.container_id = argv[i++]; /* Should be unique */ - app.host = (argc > 1) ? argv[i++] : ""; - app.port = (argc > 1) ? argv[i++] : "amqp"; - app.amqp_address = (argc > i) ? argv[i++] : "examples"; - app.message_count = (argc > i) ? atoi(argv[i++]) : 10; - - app.proactor = pn_proactor(); - char addr[PN_MAX_ADDR]; - pn_proactor_addr(addr, sizeof(addr), app.host, app.port); - pn_proactor_connect(app.proactor, pn_connection(), addr); - run(&app); - pn_proactor_free(app.proactor); - free(app.message_buffer.start); - return exit_code; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/thread.h ---------------------------------------------------------------------- diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h deleted file mode 100644 index 3b9f19e..0000000 --- a/examples/c/proactor/thread.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H -#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */ - -#ifdef _WIN32 - -#include -#include -#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */ -#include -#include - -#define pthread_function DWORD WINAPI -#define pthread_function_return DWORD -#define pthread_t HANDLE -#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL) -#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread)) -#define pthread_mutex_T HANDLE -#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL)) -#define pthread_mutex_destroy(pobject) CloseHandle(*pobject) -#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE) -#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject) - -#else - -#include - -#endif - -#endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt deleted file mode 100644 index bd6163f..0000000 --- a/examples/c/reactor/CMakeLists.txt +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -find_package(Proton REQUIRED) - -set (reactor-examples - sender.c - receiver.c - ) - -set_source_files_properties ( - ${reactor-examples} - PROPERTIES - COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}" - ) - -if (BUILD_WITH_CXX) - set_source_files_properties ( - ${reactor-examples} - PROPERTIES LANGUAGE CXX - ) -endif (BUILD_WITH_CXX) - -include_directories(${Proton_INCLUDE_DIRS}) -add_executable(sender sender.c) -add_executable(receiver receiver.c) -target_link_libraries(sender ${Proton_LIBRARIES}) -target_link_libraries(receiver ${Proton_LIBRARIES}) - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/README ---------------------------------------------------------------------- diff --git a/examples/c/reactor/README b/examples/c/reactor/README deleted file mode 100644 index 8d61893..0000000 --- a/examples/c/reactor/README +++ /dev/null @@ -1,30 +0,0 @@ -These example clients require a broker or similar intermediary that -supports the AMQP 1.0 protocol, allows anonymous connections and -accepts links to and from a node named 'examples'. - ------------------------------------------------------------------- - -sender.c - -A simple message sending client. This example sends all messages but -the last as pre-settled (no ack required). It then pends waiting for -an ack for the last message sent before exiting. - -Use the '-h' command line option for a list of supported parameters. - ------------------------------------------------------------------- - -receiver.c - -A simple message consuming client. This example receives messages -from a target (default 'examples'). Received messages are -acknowledged if they are sent un-settled. The client will try to -decode the message payload assuming it has been generated by the -sender example. - -Use the '-h' command line option for a list of supported parameters. - - - - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/receiver.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c deleted file mode 100644 index e72a6d9..0000000 --- a/examples/c/reactor/receiver.c +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include - -#include "pncompat/misc_funcs.inc" - -#include "proton/reactor.h" -#include "proton/message.h" -#include "proton/connection.h" -#include "proton/session.h" -#include "proton/link.h" -#include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" -#include "proton/transport.h" -#include "proton/url.h" - -static int quiet = 0; - -// Credit batch if unlimited receive (-c 0) -static const int CAPACITY = 100; -#define MAX_SIZE 512 - -// Example application data. This data will be instantiated in the event -// handler, and is available during event processing. In this example it -// holds configuration and state information. -// -typedef struct { - int count; // # of messages to receive before exiting - const char *source; // name of the source node to receive from - pn_message_t *message; // holds the received message -} app_data_t; - -// helper to pull pointer to app_data_t instance out of the pn_handler_t -// -#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) - -// Called when reactor exits to clean up app_data -// -static void delete_handler(pn_handler_t *handler) -{ - app_data_t *d = GET_APP_DATA(handler); - if (d->message) { - pn_decref(d->message); - d->message = NULL; - } -} - - -/* Process each event posted by the reactor. - */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t type) -{ - app_data_t *data = GET_APP_DATA(handler); - - switch (type) { - - case PN_CONNECTION_INIT: { - // Create and open all the endpoints needed to send a message - // - pn_connection_t *conn; - pn_session_t *ssn; - pn_link_t *receiver; - - conn = pn_event_connection(event); - pn_connection_open(conn); - ssn = pn_session(conn); - pn_session_open(ssn); - receiver = pn_receiver(ssn, "MyReceiver"); - pn_terminus_set_address(pn_link_source(receiver), data->source); - pn_link_open(receiver); - // cannot receive without granting credit: - pn_link_flow(receiver, data->count ? data->count : CAPACITY); - } break; - - case PN_DELIVERY: { - // A message has been received - // - pn_link_t *link = NULL; - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { - // A full message has arrived - if (!quiet) { - ssize_t len; - pn_bytes_t bytes; - bool found = false; - - // try to decode the message body - if (pn_delivery_pending(dlv) < MAX_SIZE) { - static char buffer[MAX_SIZE]; - // read in the raw data - len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); - if (len > 0) { - // decode it into a proton message - pn_message_clear(data->message); - if (PN_OK == pn_message_decode(data->message, buffer, - len)) { - // Assuming the message came from the sender - // example, try to parse out a single string from - // the payload - // - pn_data_scan(pn_message_body(data->message), "?S", - &found, &bytes); - } - } - } - if (found) { - fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size, - bytes.start); - } else { - fprintf(stdout, "Message received!\n"); - } - } - - link = pn_delivery_link(dlv); - - if (!pn_delivery_settled(dlv)) { - // remote has not settled, so it is tracking the delivery. Ack - // it. - pn_delivery_update(dlv, PN_ACCEPTED); - } - - // done with the delivery, move to the next and free it - pn_link_advance(link); - pn_delivery_settle(dlv); // dlv is now freed - - if (data->count == 0) { - // send forever - see if more credit is needed - if (pn_link_credit(link) < CAPACITY/2) { - // Grant enough credit to bring it up to CAPACITY: - pn_link_flow(link, CAPACITY - pn_link_credit(link)); - } - } else if (--data->count == 0) { - // done receiving, close the endpoints - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); - } - } - } break; - - case PN_TRANSPORT_ERROR: { - // The connection to the peer failed. - // - pn_transport_t *tport = pn_event_transport(event); - pn_condition_t *cond = pn_transport_condition(tport); - fprintf(stderr, "Network transport failed!\n"); - if (pn_condition_is_set(cond)) { - const char *name = pn_condition_get_name(cond); - const char *desc = pn_condition_get_description(cond); - fprintf(stderr, " Error: %s Description: %s\n", - (name) ? name : "", - (desc) ? desc : ""); - } - // pn_reactor_process() will exit with a false return value, stopping - // the main loop. - } break; - - default: - break; - } -} - -static void usage(void) -{ - printf("Usage: receiver \n"); - printf("-a \tThe host address [localhost:5672]\n"); - printf("-c \t# of messages to receive, 0=receive forever [1]\n"); - printf("-s \tSource address [examples]\n"); - printf("-i \tContainer name [ReceiveExample]\n"); - printf("-q \tQuiet - turn off stdout\n"); - exit(1); -} - -int main(int argc, char** argv) -{ - const char *address = "localhost"; - const char *container = "ReceiveExample"; - int c; - pn_reactor_t *reactor = NULL; - pn_url_t *url = NULL; - pn_connection_t *conn = NULL; - - /* create a handler for the connection's events. - * event_handler will be called for each event. The handler will allocate - * a app_data_t instance which can be accessed when the event_handler is - * called. - */ - pn_handler_t *handler = pn_handler_new(event_handler, - sizeof(app_data_t), - delete_handler); - - /* set up the application data with defaults */ - app_data_t *app_data = GET_APP_DATA(handler); - memset(app_data, 0, sizeof(app_data_t)); - app_data->count = 1; - app_data->source = "examples"; - app_data->message = pn_message(); - - /* Attach the pn_handshaker() handler. This handler deals with endpoint - * events from the peer so we don't have to. - */ - { - pn_handler_t *handshaker = pn_handshaker(); - pn_handler_add(handler, handshaker); - pn_decref(handshaker); - } - - /* command line options */ - opterr = 0; - while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) { - switch(c) { - case 'h': usage(); break; - case 'a': address = optarg; break; - case 'c': - app_data->count = atoi(optarg); - if (app_data->count < 0) usage(); - break; - case 's': app_data->source = optarg; break; - case 'i': container = optarg; break; - case 'q': quiet = 1; break; - default: - usage(); - break; - } - } - - reactor = pn_reactor(); - - url = pn_url_parse(address); - if (url == NULL) { - fprintf(stderr, "Invalid host address %s\n", address); - exit(1); - } - conn = pn_reactor_connection_to_host(reactor, - pn_url_get_host(url), - pn_url_get_port(url), - handler); - pn_decref(url); - pn_decref(handler); - - // the container name should be unique for each client - pn_connection_set_container(conn, container); - - // wait up to 5 seconds for activity before returning from - // pn_reactor_process() - pn_reactor_set_timeout(reactor, 5000); - - pn_reactor_start(reactor); - - while (pn_reactor_process(reactor)) { - /* Returns 'true' until the connection is shut down. - * pn_reactor_process() will return true at least once every 5 seconds - * (due to the timeout). If no timeout was configured, - * pn_reactor_process() returns as soon as it finishes processing all - * pending I/O and events. Once the connection has closed, - * pn_reactor_process() will return false. - */ - } - pn_decref(reactor); - - return 0; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/sender.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c deleted file mode 100644 index 6c3cdb3..0000000 --- a/examples/c/reactor/sender.c +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include - -#include "pncompat/misc_funcs.inc" - -#include "proton/reactor.h" -#include "proton/message.h" -#include "proton/connection.h" -#include "proton/session.h" -#include "proton/link.h" -#include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" -#include "proton/transport.h" -#include "proton/url.h" - - -static int quiet = 0; - -// Example application data. This data will be instantiated in the event -// handler, and is available during event processing. In this example it -// holds configuration and state information. -// -typedef struct { - int count; // # messages to send - int anon; // use anonymous link if true - const char *target; // name of destination target - char *msg_data; // pre-encoded outbound message - size_t msg_len; // bytes in msg_data -} app_data_t; - -// helper to pull pointer to app_data_t instance out of the pn_handler_t -// -#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) - -// Called when reactor exits to clean up app_data -// -static void delete_handler(pn_handler_t *handler) -{ - app_data_t *d = GET_APP_DATA(handler); - if (d->msg_data) { - free(d->msg_data); - d->msg_data = NULL; - } -} - -/* Process each event posted by the reactor. - */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t type) -{ - app_data_t *data = GET_APP_DATA(handler); - - switch (type) { - - case PN_CONNECTION_INIT: { - // Create and open all the endpoints needed to send a message - // - pn_connection_t *conn; - pn_session_t *ssn; - pn_link_t *sender; - - conn = pn_event_connection(event); - pn_connection_open(conn); - ssn = pn_session(conn); - pn_session_open(ssn); - sender = pn_sender(ssn, "MySender"); - // we do not wait for ack until the last message - pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); - if (!data->anon) { - pn_terminus_set_address(pn_link_target(sender), data->target); - } - pn_link_open(sender); - } break; - - case PN_LINK_FLOW: { - // the remote has given us some credit, now we can send messages - // - static long tag = 0; // a simple tag generator - pn_delivery_t *delivery; - pn_link_t *sender = pn_event_link(event); - int credit = pn_link_credit(sender); - while (credit > 0 && data->count > 0) { - --credit; - --data->count; - ++tag; - delivery = pn_delivery(sender, - pn_dtag((const char *)&tag, sizeof(tag))); - pn_link_send(sender, data->msg_data, data->msg_len); - pn_link_advance(sender); - if (data->count > 0) { - // send pre-settled until the last one, then wait for an ack on - // the last sent message. This allows the sender to send - // messages as fast as possible and then exit when the consumer - // has dealt with the last one. - // - pn_delivery_settle(delivery); - } - } - } break; - - case PN_DELIVERY: { - // Since the example sends all messages but the last pre-settled - // (pre-acked), only the last message's delivery will get updated with - // the remote state (acked/nacked). - // - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) { - uint64_t rs = pn_delivery_remote_state(dlv); - int done = 1; - switch (rs) { - case PN_RECEIVED: - // This is not a terminal state - it is informational, and the - // peer is still processing the message. - done = 0; - break; - case PN_ACCEPTED: - pn_delivery_settle(dlv); - if (!quiet) fprintf(stdout, "Send complete!\n"); - break; - case PN_REJECTED: - case PN_RELEASED: - case PN_MODIFIED: - pn_delivery_settle(dlv); - fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs); - break; - default: - // ??? no other terminal states defined, so ignore anything else - pn_delivery_settle(dlv); - fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs); - break; - } - - if (done) { - // initiate clean shutdown of the endpoints - pn_link_t *link = pn_delivery_link(dlv); - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); - } - } - } break; - - case PN_TRANSPORT_ERROR: { - // The connection to the peer failed. - // - pn_transport_t *tport = pn_event_transport(event); - pn_condition_t *cond = pn_transport_condition(tport); - fprintf(stderr, "Network transport failed!\n"); - if (pn_condition_is_set(cond)) { - const char *name = pn_condition_get_name(cond); - const char *desc = pn_condition_get_description(cond); - fprintf(stderr, " Error: %s Description: %s\n", - (name) ? name : "", - (desc) ? desc : ""); - } - // pn_reactor_process() will exit with a false return value, stopping - // the main loop. - } break; - - default: - break; - } -} - -static void usage(void) -{ - printf("Usage: send \n"); - printf("-a \tThe host address [localhost:5672]\n"); - printf("-c \t# of messages to send [1]\n"); - printf("-t \tTarget address [examples]\n"); - printf("-n \tUse an anonymous link [off]\n"); - printf("-i \tContainer name [SendExample]\n"); - printf("-q \tQuiet - turn off stdout\n"); - printf("message \tA text string to send.\n"); - exit(1); -} - -int main(int argc, char** argv) -{ - const char *address = "localhost"; - const char *msgtext = "Hello World!"; - const char *container = "SendExample"; - int c; - pn_message_t *message = NULL; - pn_data_t *body = NULL; - pn_reactor_t *reactor = NULL; - pn_url_t *url = NULL; - pn_connection_t *conn = NULL; - - /* Create a handler for the connection's events. event_handler() will be - * called for each event and delete_handler will be called when the - * connection is released. The handler will allocate an app_data_t - * instance which can be accessed when the event_handler is called. - */ - pn_handler_t *handler = pn_handler_new(event_handler, - sizeof(app_data_t), - delete_handler); - - /* set up the application data with defaults */ - app_data_t *app_data = GET_APP_DATA(handler); - memset(app_data, 0, sizeof(app_data_t)); - app_data->count = 1; - app_data->target = "examples"; - - /* Attach the pn_handshaker() handler. This handler deals with endpoint - * events from the peer so we don't have to. - */ - { - pn_handler_t *handshaker = pn_handshaker(); - pn_handler_add(handler, handshaker); - pn_decref(handshaker); - } - - /* command line options */ - opterr = 0; - while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) { - switch(c) { - case 'h': usage(); break; - case 'a': address = optarg; break; - case 'c': - app_data->count = atoi(optarg); - if (app_data->count < 1) usage(); - break; - case 't': app_data->target = optarg; break; - case 'n': app_data->anon = 1; break; - case 'i': container = optarg; break; - case 'q': quiet = 1; break; - default: - usage(); - break; - } - } - if (optind < argc) msgtext = argv[optind]; - - - // create a single message and pre-encode it so we only have to do that - // once. All transmits will use the same pre-encoded message simply for - // speed. - // - message = pn_message(); - pn_message_set_address(message, app_data->target); - body = pn_message_body(message); - pn_data_clear(body); - - // This message's body contains a single string - if (pn_data_fill(body, "S", msgtext)) { - fprintf(stderr, "Error building message!\n"); - exit(1); - } - pn_data_rewind(body); - { - // encode the message, expanding the encode buffer as needed - // - size_t len = 128; - char *buf = (char *)malloc(len); - int rc = 0; - do { - rc = pn_message_encode(message, buf, &len); - if (rc == PN_OVERFLOW) { - free(buf); - len *= 2; - buf = (char *)malloc(len); - } - } while (rc == PN_OVERFLOW); - app_data->msg_len = len; - app_data->msg_data = buf; - } - pn_decref(message); // message no longer needed - - reactor = pn_reactor(); - - url = pn_url_parse(address); - if (url == NULL) { - fprintf(stderr, "Invalid host address %s\n", address); - exit(1); - } - conn = pn_reactor_connection_to_host(reactor, - pn_url_get_host(url), - pn_url_get_port(url), - handler); - pn_decref(url); - pn_decref(handler); - - // the container name should be unique for each client - pn_connection_set_container(conn, container); - - // wait up to 5 seconds for activity before returning from - // pn_reactor_process() - pn_reactor_set_timeout(reactor, 5000); - - pn_reactor_start(reactor); - - while (pn_reactor_process(reactor)) { - /* Returns 'true' until the connection is shut down. - * pn_reactor_process() will return true at least once every 5 seconds - * (due to the timeout). If no timeout was configured, - * pn_reactor_process() returns as soon as it finishes processing all - * pending I/O and events. Once the connection has closed, - * pn_reactor_process() will return false. - */ - } - pn_decref(reactor); - - return 0; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/receive.c b/examples/c/receive.c new file mode 100644 index 0000000..6fd74a5 --- /dev/null +++ b/examples/c/receive.c @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + int received; + bool finished; +} app_data_t; + +static const int BATCH = 1000; /* Batch size for unlimited receive */ + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +#define MAX_SIZE 1024 + +static void decode_message(pn_delivery_t *dlv) { + static char buffer[MAX_SIZE]; + ssize_t len; + // try to decode the message body + if (pn_delivery_pending(dlv) < MAX_SIZE) { + // read in the raw data + len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); + if (len > 0) { + // decode it into a proton message + pn_message_t *m = pn_message(); + if (PN_OK == pn_message_decode(m, buffer, len)) { + pn_string_t *s = pn_string(NULL); + pn_inspect(pn_message_body(m), s); + printf("%s\n", pn_string_get(s)); + pn_free(s); + } + pn_message_free(m); + } + } +} + +/* Return true to continue, false to exit */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_t* s = pn_session(c); + pn_session_open(s); + pn_link_t* l = pn_receiver(s, "my_receiver"); + pn_terminus_set_address(pn_link_source(l), app->amqp_address); + pn_link_open(l); + /* cannot receive without granting credit: */ + pn_link_flow(l, app->message_count ? app->message_count : BATCH); + } break; + + case PN_DELIVERY: { + /* A message has been received */ + pn_link_t *link = NULL; + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { + link = pn_delivery_link(dlv); + decode_message(dlv); + /* Accept the delivery */ + pn_delivery_update(dlv, PN_ACCEPTED); + /* done with the delivery, move to the next and free it */ + pn_link_advance(link); + pn_delivery_settle(dlv); /* dlv is now freed */ + + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(link) < BATCH/2) { + /* Grant enough credit to bring it up to BATCH: */ + pn_link_flow(link, BATCH - pn_link_credit(link)); + } + } else if (++app->received >= app->message_count) { + /* done receiving, close the endpoints */ + printf("%d messages received\n", app->received); + pn_session_t *ssn = pn_link_session(link); + pn_link_close(link); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); + } + } + } break; + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + break; + + default: + break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + int i = 0; + app.container_id = argv[i++]; /* Should be unique */ + app.host = (argc > 1) ? argv[i++] : ""; + app.port = (argc > 1) ? argv[i++] : "amqp"; + app.amqp_address = (argc > i) ? argv[i++] : "examples"; + app.message_count = (argc > i) ? atoi(argv[i++]) : 10; + + /* Create the proactor and connect */ + app.proactor = pn_proactor(); + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect(app.proactor, pn_connection(), addr); + run(&app); + pn_proactor_free(app.proactor); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/send.c ---------------------------------------------------------------------- diff --git a/examples/c/send.c b/examples/c/send.c new file mode 100644 index 0000000..43da8b0 --- /dev/null +++ b/examples/c/send.c @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +typedef struct app_data_t { + const char *host, *port; + const char *amqp_address; + const char *container_id; + int message_count; + + pn_proactor_t *proactor; + pn_rwbytes_t message_buffer; + int sent; + int acknowledged; +} app_data_t; + +static int exit_code = 0; + +static void check_condition(pn_event_t *e, pn_condition_t *cond) { + if (pn_condition_is_set(cond)) { + fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + exit_code = 1; + } +} + +/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ +static pn_bytes_t encode_message(app_data_t* app) { + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_t* body = pn_message_body(message); + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = 128; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); +} + +/* Returns true to continue, false if finished */ +static bool handle(app_data_t* app, pn_event_t* event) { + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + pn_connection_t* c = pn_event_connection(event); + pn_connection_set_container(c, app->container_id); + pn_connection_open(c); + pn_session_t* s = pn_session(pn_event_connection(event)); + pn_session_open(s); + pn_link_t* l = pn_sender(s, "my_sender"); + pn_terminus_set_address(pn_link_target(l), app->amqp_address); + pn_link_open(l); + break; + } + + case PN_LINK_FLOW: { + /* The peer has given us some credit, now we can send messages */ + pn_link_t *sender = pn_event_link(event); + while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { + ++app->sent; + // Use sent counter as unique delivery tag. + pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(sender, msgbuf.start, msgbuf.size); + pn_link_advance(sender); + } + break; + } + + case PN_DELIVERY: { + /* We received acknowledgedment from the peer that a message was delivered. */ + pn_delivery_t* d = pn_event_delivery(event); + if (pn_delivery_remote_state(d) == PN_ACCEPTED) { + if (++app->acknowledged == app->message_count) { + printf("%d messages sent and acknowledged\n", app->acknowledged); + pn_connection_close(pn_event_connection(event)); + /* Continue handling events till we receive TRANSPORT_CLOSED */ + } + } else { + fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d)); + pn_connection_close(pn_event_connection(event)); + exit_code=1; + } + break; + } + + case PN_TRANSPORT_CLOSED: + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + + case PN_CONNECTION_REMOTE_CLOSE: + check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_SESSION_REMOTE_CLOSE: + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + + case PN_PROACTOR_INACTIVE: + return false; + + default: break; + } + return true; +} + +void run(app_data_t *app) { + /* Loop and handle events */ + do { + pn_event_batch_t *events = pn_proactor_wait(app->proactor); + for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(app, e)) { + return; + } + } + pn_proactor_done(app->proactor, events); + } while(true); +} + +int main(int argc, char **argv) { + struct app_data_t app = {0}; + int i = 0; + app.container_id = argv[i++]; /* Should be unique */ + app.host = (argc > 1) ? argv[i++] : ""; + app.port = (argc > 1) ? argv[i++] : "amqp"; + app.amqp_address = (argc > i) ? argv[i++] : "examples"; + app.message_count = (argc > i) ? atoi(argv[i++]) : 10; + + app.proactor = pn_proactor(); + char addr[PN_MAX_ADDR]; + pn_proactor_addr(addr, sizeof(addr), app.host, app.port); + pn_proactor_connect(app.proactor, pn_connection(), addr); + run(&app); + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/thread.h ---------------------------------------------------------------------- diff --git a/examples/c/thread.h b/examples/c/thread.h new file mode 100644 index 0000000..1bd5595 --- /dev/null +++ b/examples/c/thread.h @@ -0,0 +1,49 @@ +#ifndef _PROTON_EXAMPLES_C_THREADS_H +#define _PROTON_EXAMPLES_C_THREADS_H 1 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */ + +#ifdef _WIN32 + +#include +#include +#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */ +#include +#include + +#define pthread_function DWORD WINAPI +#define pthread_function_return DWORD +#define pthread_t HANDLE +#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL) +#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread)) +#define pthread_mutex_T HANDLE +#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL)) +#define pthread_mutex_destroy(pobject) CloseHandle(*pobject) +#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE) +#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject) + +#else + +#include + +#endif + +#endif /* thread.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/proton-c/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt index 50ea677..c562025 100644 --- a/proton-c/src/tests/CMakeLists.txt +++ b/proton-c/src/tests/CMakeLists.txt @@ -63,9 +63,9 @@ if(HAS_PROACTOR) endif(WIN32) if(WIN32) - # set(path "$;$") + # set(path "$;$") else(WIN32) - set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c/proactor:$ENV{PATH}") + set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c:$ENV{PATH}") endif(WIN32) # Add the tools directory for the 'proctest' module set_search_path(pypath "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org