qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-1470: C proactor common address parser.
Date Tue, 02 May 2017 15:44:47 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 2c6c57e29 -> 0cf7ea634


PROTON-1470: C proactor common address parser.

Updated libuv and epoll to use it.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1991745e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1991745e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1991745e

Branch: refs/heads/master
Commit: 1991745e0d2982b7d27f7467695bb6b283edbd36
Parents: 2c6c57e
Author: Alan Conway <aconway@redhat.com>
Authored: Tue May 2 10:04:08 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Tue May 2 11:28:08 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c             | 53 +++++++++-----------------
 proton-c/src/proactor/libuv.c             | 23 +++++------
 proton-c/src/proactor/proactor-internal.h | 18 ++++-----
 proton-c/src/proactor/proactor.c          | 37 ++++++++++++++----
 proton-c/src/tests/proactor.c             | 51 +++++++++++++++++++++----
 5 files changed, 107 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1991745e/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 36d7263..72fcac7 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -195,8 +195,6 @@ pn_timestamp_t pn_i_now2(void)
 const char *COND_NAME = "proactor";
 const char *AMQP_PORT = "5672";
 const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
 
 PN_HANDLE(PN_PROACTOR)
 
@@ -295,8 +293,8 @@ typedef struct psocket_t {
   epoll_extended_t epoll_io;
   bool is_conn;
   bool closing;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
 } psocket_t;
 
 struct pn_proactor_t {
@@ -402,7 +400,8 @@ static inline void wake_done(pcontext_t *ctx) {
 }
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host,
const char *port) {
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *addr)
+{
   ps->epoll_io.psocket = ps;
   ps->epoll_io.fd = -1;
   ps->epoll_io.type = is_conn ? PCONNECTION_IO : LISTENER_IO;
@@ -415,14 +414,7 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn,
const ch
   ps->is_conn = is_conn;
   ps->closing = false;
   ps->sockfd = -1;
-
-  /* For platforms that don't know about "amqp" and "amqps" service names. */
-  if (port && strcmp(port, AMQP_PORT_NAME) == 0)
-    port = AMQP_PORT;
-  else if (port && strcmp(port, AMQPS_PORT_NAME) == 0)
-    port = AMQPS_PORT;
-  strncpy(ps->host, host, sizeof(ps->host));
-  strncpy(ps->port, port, sizeof(ps->port));
+  pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
 }
 
 struct pn_netaddr_t {
@@ -566,7 +558,8 @@ static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
 static void pconnection_tick(pconnection_t *pc);
 
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server,
const char *host, const char *port) {
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server,
const char *addr)
+{
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   if (!pc) return NULL;
   if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
@@ -577,7 +570,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t
*c, bo
     abort();
   }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
-  psocket_init(&pc->psocket, p,  true, host, port);
+  psocket_init(&pc->psocket, p,  true, addr);
   pc->new_events = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
@@ -973,10 +966,10 @@ void pconnection_start(pconnection_t *pc) {
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
 
   int fd = pc->psocket.sockfd;
-  socklen_t len = sizeof(pc->local);
-  getsockname(fd, (struct sockaddr*)&pc->local, &len);
-  len = sizeof(pc->remote);
-  getpeername(fd, (struct sockaddr*)&pc->remote, &len);
+  socklen_t len = sizeof(pc->local.ss);
+  getsockname(fd, (struct sockaddr*)&pc->local.ss, &len);
+  len = sizeof(pc->remote.ss);
+  getpeername(fd, (struct sockaddr*)&pc->remote.ss, &len);
 
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
   pc->read_closed = false;
@@ -988,11 +981,7 @@ void pconnection_start(pconnection_t *pc) {
 }
 
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
-  char *buf = strdup(addr);
-  assert(buf); // TODO: memory safety
-  char *host = buf;
-  char *port = pni_split_host_port(buf);
-  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+  pconnection_t *pc = new_pconnection_t(p, c, false, addr);
   assert(pc); // TODO: memory safety
   // TODO: check case of proactor shutting down
   lock(&pc->context.mutex);
@@ -1001,7 +990,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const
char *addr)
 
   struct addrinfo *ai = NULL;
   int fd = -1;
-  if (!getaddrinfo(*host ? host : NULL, *port ? port : NULL, 0, &ai)) {
+  if (!getaddrinfo(pc->psocket.host, pc->psocket.port, 0, &ai)) {
     fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
     if (fd >= 0) {
       configure_socket(fd);
@@ -1010,7 +999,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const
char *addr)
         pconnection_start(pc);
         unlock(&pc->context.mutex);
         freeaddrinfo(ai);
-        free(buf);
         return;
       }
     }
@@ -1022,7 +1010,6 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const
char *addr)
   if (fd != -1) close (fd);
   unlock(&pc->context.mutex);
   if (notify) wake_notify(&pc->context);
-  free(buf);
   return;
 }
 
@@ -1087,14 +1074,10 @@ pn_listener_t *pn_listener() {
 
 void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
 {
-  char *buf = strdup(addr);
-  assert(buf);  // TODO:  memory safety
-  char *host = buf;
-  char *port = pni_split_host_port(buf);
   // TODO: check listener not already listening for this or another proactor
   lock(&l->context.mutex);
   l->context.proactor = p;;
-  psocket_init(&l->psocket, p, false, host, port);
+  psocket_init(&l->psocket, p, false, addr);
   l->backlog = backlog;
   proactor_add(&l->psocket);
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
@@ -1103,7 +1086,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char
*addr, in
 
   struct addrinfo *ai = NULL;
   int fd = -1;
-  if (!getaddrinfo(*host ? host : NULL, *port ? port : NULL, 0, &ai)) {
+  if (!getaddrinfo(l->psocket.host, l->psocket.port, 0, &ai)) {
     fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
     if (fd >= 0) {
       int yes = 1;
@@ -1117,7 +1100,6 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char
*addr, in
             unlock(&l->context.mutex);
             if (notify) wake_notify(&l->context);
             freeaddrinfo(ai);
-            free(buf);
             return;
           }
     }
@@ -1127,7 +1109,6 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char
*addr, in
   unlock(&l->context.mutex);
   if (notify) wake_notify(&l->context);
   if (ai) freeaddrinfo(ai);
-  free(buf);
   return;
 }
 
@@ -1282,7 +1263,7 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 
 void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   // TODO: fuller sanity check on input args
-  pconnection_t *pc = new_pconnection_t(l->psocket.proactor, c, true, l->psocket.host,
l->psocket.port);
+  pconnection_t *pc = new_pconnection_t(l->psocket.proactor, c, true, "");
   assert(pc);  // TODO: memory safety
   int err = 0;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1991745e/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 0a05424..dce6dba 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -67,8 +67,6 @@
 
 const char *AMQP_PORT = "5672";
 const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
 
 PN_HANDLE(PN_PROACTOR)
 
@@ -135,8 +133,8 @@ static void work_init(work_t* w, pn_proactor_t* p, struct_type type) {
 
 /* A resolvable address */
 typedef struct addr_t {
-  char host_port[PN_MAX_ADDR];
-  char *host, *port;            /* Point into addr after destructive pni_split_host_port
*/
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
   uv_getaddrinfo_t getaddrinfo; /* UV getaddrinfo request, contains list of addrinfo */
   struct addrinfo* addrinfo;    /* The current addrinfo being tried */
 } addr_t;
@@ -280,9 +278,7 @@ static void work_start(work_t *w) {
 }
 
 static void parse_addr(addr_t *addr, const char *str) {
-  strncpy(addr->host_port, str, sizeof(addr->host_port));
-  addr->host = addr->host_port;
-  addr->port = pni_split_host_port(addr->host_port);
+  pni_parse_addr(str, addr->addr_buf, sizeof(addr->addr_buf), &addr->host, &addr->port);
 }
 
 /* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
@@ -508,10 +504,10 @@ static void on_connect_fail(uv_handle_t *handle) {
 
 static void pconnection_addresses(pconnection_t *pc) {
   int len;
-  len = sizeof(pc->local);
-  uv_tcp_getsockname(&pc->tcp, (struct sockaddr*)&pc->local, &len);
-  len = sizeof(pc->remote);
-  uv_tcp_getpeername(&pc->tcp, (struct sockaddr*)&pc->remote, &len);
+  len = sizeof(pc->local.ss);
+  uv_tcp_getsockname(&pc->tcp, (struct sockaddr*)&pc->local.ss, &len);
+  len = sizeof(pc->remote.ss);
+  uv_tcp_getpeername(&pc->tcp, (struct sockaddr*)&pc->remote.ss, &len);
 }
 
 /* Outgoing connection */
@@ -563,8 +559,7 @@ static int leader_resolve(pn_proactor_t *p, addr_t *addr, bool listen)
{
   if (listen) {
     hints.ai_flags |= AI_PASSIVE | AI_ALL;
   }
-  int err = uv_getaddrinfo(&p->loop, &addr->getaddrinfo, NULL,
-                           *addr->host ? addr->host : NULL, addr->port, &hints);
+  int err = uv_getaddrinfo(&p->loop, &addr->getaddrinfo, NULL, addr->host,
addr->port, &hints);
   addr->addrinfo = addr->getaddrinfo.addrinfo; /* Start with the first addrinfo */
   return err;
 }
@@ -1112,7 +1107,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = pconnection(p, c, false);
   assert(pc);                                  /* FIXME aconway 2017-03-31: memory safety
*/
-  pn_connection_open(pc->driver.connection); /* Auto-open */
+  pn_connection_open(pc->driver.connection);   /* Auto-open */
   parse_addr(&pc->addr, addr);
   work_start(&pc->work);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1991745e/proton-c/src/proactor/proactor-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h
index bf79cd5..894cb5b 100644
--- a/proton-c/src/proactor/proactor-internal.h
+++ b/proton-c/src/proactor/proactor-internal.h
@@ -20,18 +20,18 @@
  * under the License.
  */
 
+#include <proton/type_compat.h>
+#include <proton/import_export.h>
+
 /*
- * Given a "host:port" string, split it in-place like so:
- *
- *     char *host = host_port;
- *     char *port = pn_split_host_port(host_port);
- *
- * Note this modifies the original host_port string by replacing the last ':'
- * character with '\0'.
+ * Parse a pn_proactor_addr string, copy data into buf as necessary.
+ * Set *host and *port to point to the host and port strings.
  *
- * If there is no ':', the returned pointer is an empty string, not NULL.
+ * If the port is empty, replace it with "5672", if it is "amqp" or "amqps"
+ * replace it with the numeric port value.
  *
+ * @return 0 on success, PN_OVERFLOW if buf is too small.
  */
-char* pni_split_host_port(char *host_port);
+PNP_EXTERN int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host,
const char **port);
 
 #endif // PROACTOR_NETADDR_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1991745e/proton-c/src/proactor/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor.c b/proton-c/src/proactor/proactor.c
index a7939e8..af7b057 100644
--- a/proton-c/src/proactor/proactor.c
+++ b/proton-c/src/proactor/proactor.c
@@ -21,6 +21,7 @@
 /* Common platform-independent implementation for proactor libraries */
 
 #include "proactor-internal.h"
+#include <proton/error.h>
 #include <proton/proactor.h>
 
 #include <stdio.h>
@@ -28,18 +29,38 @@
 #include <string.h>
 
 
+static const char *AMQP_PORT = "5672";
+static const char *AMQP_PORT_NAME = "amqp";
+static const char *AMQPS_PORT = "5671";
+static const char *AMQPS_PORT_NAME = "amqps";
+
 int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) {
   return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : "");
 }
 
-char* pni_split_host_port(char *host_port) {
-  char *port = strrchr(host_port, ':');
-  if (port) {
-    *port = '\0';
-    ++port;
+int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, const char
**port)
+{
+  size_t hplen = strlen(addr);
+  if (hplen >= len) {
+    return PN_OVERFLOW;
+  }
+  memcpy(buf, addr, hplen+1);
+  char *p = strrchr(buf, ':');
+  if (p) {
+    *port = p + 1;
+    *p = '\0';
+    if (**port == '\0' || !strcmp(*port, AMQP_PORT_NAME)) {
+      *port = AMQP_PORT;
+    } else if (!strcmp(*port, AMQPS_PORT_NAME)) {
+      *port = AMQPS_PORT;
+    }
   } else {
-    port = host_port + strlen(host_port); /* Empty string, point to trailing \0 */
+    *port = AMQP_PORT;
   }
-  return port;
+  if (*buf) {
+    *host = buf;
+  } else {
+    *host = NULL;
+  }
+  return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1991745e/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index ba73033..b70bf61 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -19,6 +19,8 @@
 
 #include "test_tools.h"
 #include "test_config.h"
+#include "../proactor/proactor-internal.h"
+
 #include <proton/condition.h>
 #include <proton/connection.h>
 #include <proton/event.h>
@@ -72,13 +74,6 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 #define TEST_LOG_EQUAL(T, A, PT) \
   TEST_ETYPES_EQUAL((T), (A), sizeof(A)/sizeof(*A), (PT).log, (PT).log_len)
 
-#if 0                           /* FIXME aconway 2017-03-31:  */
-/* Return the last event in the proactor_test's log or PN_EVENT_NONE if it is empty */
-static pn_event_type_t  proactor_test_last_event(proactor_test_t *pt) {
-  return pt->log_len ? pt->log[pt->log_len - 1] : PN_EVENT_NONE;
-}
-#endif
-
 /* Set this to a pn_condition() to save condition data */
 pn_condition_t *last_condition = NULL;
 
@@ -719,9 +714,48 @@ static void test_proactor_addr(test_t *t) {
   TEST_STR_EQUAL(t, "1:2:3:4:", addr);
 }
 
+static void test_parse_addr(test_t *t) {
+  char buf[1024];
+  const char *host, *port;
+
+  TEST_CHECK(t, 0 == pni_parse_addr("foo:bar", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "foo", host);
+  TEST_STR_EQUAL(t, "bar", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("foo:", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "foo", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":bar", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "bar", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":amqps", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "5671", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":amqp", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("::1:2:3", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "::1:2", host);
+  TEST_STR_EQUAL(t, "3", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr(":::", buf, sizeof(buf), &host, &port));
+  TEST_STR_EQUAL(t, "::", host);
+  TEST_STR_EQUAL(t, "5672", port);
+
+  TEST_CHECK(t, 0 == pni_parse_addr("", buf, sizeof(buf), &host, &port));
+  TEST_CHECKF(t, NULL == host, "expected null, got: %s", host);
+  TEST_STR_EQUAL(t, "5672", port);
+}
+
 /* Test pn_proactor_addr funtions */
 
-/* FIXME aconway 2017-03-30: windows will need winsock2.h etc.
+/* Windows will need winsock2.h etc.
    These headers are *only* needed for test_netaddr and only for the getnameinfo part.
    This is the only non-portable part of the proactor test suite.
    */
@@ -866,6 +900,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_release_free(&t));
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
   RUN_ARGV_TEST(failed, t, test_proactor_addr(&t));
+  RUN_ARGV_TEST(failed, t, test_parse_addr(&t));
   RUN_ARGV_TEST(failed, t, test_netaddr(&t));
   RUN_ARGV_TEST(failed, t, test_disconnect(&t));
   RUN_ARGV_TEST(failed, t, test_abort(&t));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message