From users-return-10224-apmail-qpid-users-archive=qpid.apache.org@qpid.apache.org Fri Mar 14 15:38:39 2014 Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8536CC537 for ; Fri, 14 Mar 2014 15:38:39 +0000 (UTC) Received: (qmail 11963 invoked by uid 500); 14 Mar 2014 15:38:39 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 11573 invoked by uid 500); 14 Mar 2014 15:38:38 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 11550 invoked by uid 99); 14 Mar 2014 15:38:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Mar 2014 15:38:34 +0000 X-ASF-Spam-Status: No, hits=0.3 required=5.0 tests=PLING_QUERY,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fraser.adams@blueyonder.co.uk designates 80.0.253.74 as permitted sender) Received: from [80.0.253.74] (HELO know-smtprelay-omc-10.server.virginmedia.net) (80.0.253.74) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Mar 2014 15:38:27 +0000 Received: from [192.168.1.108] ([82.38.120.72]) by know-smtprelay-10-imp with bizsmtp id dTe51n01i1Zorai01Te6T8; Fri, 14 Mar 2014 15:38:06 +0000 X-Originating-IP: [82.38.120.72] X-Spam: 0 X-Authority: v=2.1 cv=Qfbov6rv c=1 sm=1 tr=0 a=kn84lg4yEBBc+Mp7+mj2YQ==:117 a=kn84lg4yEBBc+Mp7+mj2YQ==:17 a=thNyya010k0A:10 a=96GN3GUJI5AA:10 a=3NElcqgl2aoA:10 a=a5Gf7U6LAAAA:8 a=r77TgQKjGQsHNAKrUKIA:9 a=9iDbn-4jx3cA:10 a=cKsnjEOsciEA:10 a=KkGM3SOzKlyxgBl42ooA:9 a=wPNLvfGTeEIA:10 a=mV9VRH-2AAAA:8 a=eZxP2M26cf0Z3xWgOTsA:9 a=CuV1Ly35MvFyjNt-:21 a=BxtKmD6EqQNrB3hO:21 a=BrEg7cQOSxCDRAQa-ggA:9 a=vWVX9GSWtpxVdmt9:21 a=vYZr3run1kcA4ojF:21 Message-ID: <5323225D.4080206@blueyonder.co.uk> Date: Fri, 14 Mar 2014 15:38:05 +0000 From: Fraser Adams User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:17.0) Gecko/20130329 Thunderbird/17.0.5 MIME-Version: 1.0 To: "users@qpid.apache.org" Subject: Proton's new async features - any examples? 'cause I'm scratching my head! Content-Type: multipart/mixed; boundary="------------010805060501020108030803" X-Virus-Checked: Checked by ClamAV on apache.org --------------010805060501020108030803 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Hey folks, I've noticed that PROTON-525/531/534 cover work to expose some bits of messenger that were previously internal and allow messenger to be driven from an external poll/select/epoll. I'm quite interested in this from the perspective of the JavaScript bindings that I'm working on, but to be honest I'm currently left scratching my head trying to figure out how the new APIs are intended to work. I don't suppose that there are any examples available? I currently have a recv-async.c and send-async.c (attached) they are still a bit hacky as they are currently work in progress as I push the necessary features into emscripten (the C->JavaScript compiler I'm using) but they both work in either native C or JavaScript (the emscripten_set_network_callback gets triggered by WebSocket activity and allows fully async behaviour, so I don't need any nasty polling). I've just merged the latest proton-c stuff to the branch I'm working on for the JavaScript bindings and everything is still working nicely with the current approach, but I'm guessing that the new capabilities might be able to make things more efficient? I'm currently working on actual binding code, so I can call messenger direct from native JavaScript as opposed to compiling C/C++ into JavaScript, so far it has got a lot of parallels with the python bindings - though clearly only async stuff makes any sense for JavaScript. I'd really appreciate tips and code samples from the folks who have been working on this API. Cheers, Frase --------------010805060501020108030803 Content-Type: text/x-csrc; name="send-async.c" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="send-async.c" /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "proton/message.h" #include "proton/messenger.h" #include "proton/driver.h" #include "pncompat/misc_funcs.inc" #include #include #include #include #if EMSCRIPTEN #include void emscripten_set_network_callback(void (*func)()); #endif #define check(messenger) \ { \ if(pn_messenger_errno(messenger)) \ { \ die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \ } \ } \ // FA Temporarily make global pn_message_t * message; pn_messenger_t * messenger; pn_tracker_t tracker; int tracked = 1; int running = 1; void die(const char *file, int line, const char *message) { fprintf(stderr, "%s:%i: %s\n", file, line, message); exit(1); } void usage(void) { printf("Usage: send [-a addr] [message]\n"); printf("-a \tThe target address [amqp[s]://domain[/name]]\n"); printf("message\tA text string to send.\n"); exit(0); } void process(void) { //printf(" *** process ***\n"); // Process outgoing messages pn_status_t status = pn_messenger_status(messenger, tracker); //printf("status = %d\n", status); if (status != PN_STATUS_PENDING) { printf("status = %d\n", status); //pn_messenger_settle(messenger, tracker, 0); //tracked--; if (running) { printf("stopping\n"); pn_messenger_stop(messenger); running = 0; } } if (pn_messenger_stopped(messenger)) { printf("exiting\n"); pn_message_free(message); pn_messenger_free(messenger); exit(0); } } // Callback used by emscripten to ensure pn_messenger_work gets called. void work(void) { //printf(" *** work ***\n"); int err = pn_messenger_work(messenger, 0); printf("err = %d\n", err); if (err >= 0) { process(); } err = pn_messenger_work(messenger, 0); printf("err = %d\n", err); if (err >= 0) { process(); } } int main(int argc, char** argv) { int c; opterr = 0; char * address = (char *) "amqp://0.0.0.0"; char * msgtext = (char *) "Hello World!"; while((c = getopt(argc, argv, "ha:b:c:")) != -1) { switch(c) { case 'a': address = optarg; break; case 'h': usage(); break; case '?': if(optopt == 'a') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else if(isprint(optopt)) { fprintf(stderr, "Unknown option `-%c'.\n", optopt); } else { fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); } return 1; default: abort(); } } if (optind < argc) msgtext = argv[optind]; // pn_message_t * message; // pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger(NULL); pn_messenger_set_blocking(messenger, false); // Put messenger into non-blocking mode. pn_messenger_set_outgoing_window(messenger, 1024); // FA Addition. pn_messenger_start(messenger); pn_message_set_address(message, address); pn_data_t *body = pn_message_body(message); pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext)); pn_messenger_put(messenger, message); check(messenger); tracker = pn_messenger_outgoing_tracker(messenger); //printf("tracker = %lld\n", (long long int)tracker); #if EMSCRIPTEN //emscripten_set_main_loop(work, 0, 0); emscripten_set_network_callback(work); #else while (1) { pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. process(); } #endif return 0; } --------------010805060501020108030803 Content-Type: text/x-csrc; name="recv-async.c" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="recv-async.c" /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "proton/message.h" #include "proton/messenger.h" #include "pncompat/misc_funcs.inc" #include #include #include #if EMSCRIPTEN #include void emscripten_set_network_callback(void (*func)()); #endif #define check(messenger) \ { \ if(pn_messenger_errno(messenger)) \ { \ die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \ } \ } \ // FA Temporarily make these global pn_message_t * message; pn_messenger_t * messenger; void die(const char *file, int line, const char *message) { fprintf(stderr, "%s:%i: %s\n", file, line, message); exit(1); } void usage(void) { printf("Usage: recv [options] \n"); printf("-c \tPath to the certificate file.\n"); printf("-k \tPath to the private key file.\n"); printf("-p \tPassword for the private key.\n"); printf("\tAn address.\n"); exit(0); } void process(void) { //printf(" *** process ***\n"); // Process incoming messages while(pn_messenger_incoming(messenger)) { printf("in while loop\n"); pn_messenger_get(messenger, message); check(messenger); pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger); char buffer[1024]; size_t buffsize = sizeof(buffer); pn_data_t *body = pn_message_body(message); pn_data_format(body, buffer, &buffsize); printf("Address: %s\n", pn_message_get_address(message)); const char* subject = pn_message_get_subject(message); printf("Subject: %s\n", subject ? subject : "(no subject)"); printf("Content: %s\n", buffer); int err = pn_messenger_accept(messenger, tracker, 0); printf("err = %d\n", err); } } // Callback used by emscripten to ensure pn_messenger_work gets called. void work(void) { //printf(" *** work ***\n"); int err = pn_messenger_work(messenger, 0); printf("err = %d\n", err); if (err >= 0) { process(); } err = pn_messenger_work(messenger, 0); printf("err = %d\n", err); if (err >= 0) { process(); } } int main(int argc, char** argv) { char* certificate = NULL; char* privatekey = NULL; char* password = NULL; char* address = (char *) "amqp://~0.0.0.0"; int c; opterr = 0; while((c = getopt(argc, argv, "hc:k:p:")) != -1) { switch(c) { case 'h': usage(); break; case 'c': certificate = optarg; break; case 'k': privatekey = optarg; break; case 'p': password = optarg; break; case '?': if(optopt == 'c' || optopt == 'k' || optopt == 'p') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else if(isprint(optopt)) { fprintf(stderr, "Unknown option `-%c'.\n", optopt); } else { fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt); } return 1; default: abort(); } } if (optind < argc) { address = argv[optind]; } // pn_message_t * message; // pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger(NULL); pn_messenger_set_blocking(messenger, false); // FA Addition. //pn_messenger_set_incoming_window(messenger, 1024); // FA Addition. /* load the various command line options if they're set */ if(certificate) { pn_messenger_set_certificate(messenger, certificate); } if(privatekey) { pn_messenger_set_private_key(messenger, privatekey); } if(password) { pn_messenger_set_password(messenger, password); } pn_messenger_start(messenger); check(messenger); pn_messenger_subscribe(messenger, address); check(messenger); pn_messenger_recv(messenger, -1); // Receive as many messages as messenger can buffer #if EMSCRIPTEN //emscripten_set_main_loop(work, 0, 0); emscripten_set_network_callback(work); #else while (1) { pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity. process(); } #endif return 0; } --------------010805060501020108030803 Content-Type: text/plain; charset=us-ascii --------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org For additional commands, e-mail: users-help@qpid.apache.org --------------010805060501020108030803--