qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-1766: [cpp] Remove thread-unsafe use of pn_record_t by proactor.
Date Tue, 20 Feb 2018 17:49:54 GMT
PROTON-1766: [cpp] Remove thread-unsafe use of pn_record_t by proactor.

The proactor was using pn_record_t (via pn_connection_attachments) to navigate
from pn_connection_t to proactor data in arbitrary threads. This was unsafe, and
cannot be made safe without locks in the proton core because users can also use
pn_connection_attachments() outside the proactor's control.

Replaced use of attachments with a direct pointer field in pn_connection_t
which is used by the proactor to point to its internal data. Currently
access is locked by a mutex but this can/should be an atomic operation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/233855fc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/233855fc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/233855fc

Branch: refs/heads/master
Commit: 233855fce76a550b9f890725c1ac2707f32fd028
Parents: a1da7f2
Author: Alan Conway <aconway@redhat.com>
Authored: Tue Feb 20 12:03:13 2018 -0500
Committer: Alan Conway <aconway@redhat.com>
Committed: Tue Feb 20 12:03:13 2018 -0500

----------------------------------------------------------------------
 proton-c/include/proton/connection_driver.h | 10 +++
 proton-c/src/core/connection_driver.c       |  2 +
 proton-c/src/core/engine-internal.h         |  1 +
 proton-c/src/core/engine.c                  |  1 +
 proton-c/src/proactor/epoll.c               | 89 +++++++++---------------
 5 files changed, 48 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/233855fc/proton-c/include/proton/connection_driver.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
index 8e39551..c99be1d 100644
--- a/proton-c/include/proton/connection_driver.h
+++ b/proton-c/include/proton/connection_driver.h
@@ -267,6 +267,16 @@ PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, const
char *
 PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list
ap);
 
 /**
+ * @cond INTERNAL
+ * @return pointer to location in the pn_connection_t private struct that can hold a `void*`.
+ * Only for use by IO integration code (e.g. pn_proactor_t implementations use this pointer)
+ */
+PN_EXTERN void **pn_connection_driver_data(pn_connection_t *connection);
+/**
+ *  @endcond
+ */
+
+/**
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/233855fc/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index da315f2..c253e72 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -187,3 +187,5 @@ pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t
*batch
     (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) :
     NULL;
 }
+
+void** pn_connection_driver_data(pn_connection_t *c) { return &c->driver_data; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/233855fc/proton-c/src/core/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine-internal.h b/proton-c/src/core/engine-internal.h
index a62b811..573ab56 100644
--- a/proton-c/src/core/engine-internal.h
+++ b/proton-c/src/core/engine-internal.h
@@ -248,6 +248,7 @@ struct pn_connection_t {
   pn_collector_t *collector;
   pn_record_t *context;
   pn_list_t *delivery_pool;
+  void *driver_data;            /* See pn_connection_driver_data */
 };
 
 struct pn_session_t {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/233855fc/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index 0656dd9..dd90b4d 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -536,6 +536,7 @@ pn_connection_t *pn_connection()
   conn->collector = NULL;
   conn->context = pn_record();
   conn->delivery_pool = pn_list(PN_OBJECT, 0);
+  conn->driver_data = NULL;
 
   return conn;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/233855fc/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 1a3b7c7..1afb78e 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -32,7 +32,6 @@
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
-#include <proton/object.h>
 #include <proton/proactor.h>
 #include <proton/transport.h>
 #include <proton/listener.h>
@@ -275,8 +274,6 @@ pn_timestamp_t pn_i_now2(void)
 const char *AMQP_PORT = "5672";
 const char *AMQP_PORT_NAME = "amqp";
 
-PN_HANDLE(PN_PROACTOR)
-
 // The number of times a connection event batch may be replenished for
 // a thread between calls to wait().  Some testing shows that
 // increasing this value above 1 actually slows performance slightly
@@ -404,7 +401,6 @@ struct pn_proactor_t {
   // wake subsystem
   int eventfd;
   pmutex eventfd_mutex;
-  pmutex bind_mutex;
   bool wakes_in_progress;
   pcontext_t *wake_list_first;
   pcontext_t *wake_list_last;
@@ -525,7 +521,6 @@ typedef struct pconnection_t {
   bool read_blocked;
   bool write_blocked;
   bool disconnected;
-  bool bound;
   int hog_count; // thread hogging limiter
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
@@ -535,6 +530,30 @@ typedef struct pconnection_t {
   pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
 } pconnection_t;
 
+/* Protects read/update of pn_connnection_t pointer to it's pconnection_t
+ *
+ * Global because pn_connection_wake()/pn_connection_proactor() navigate from
+ * the pn_connection_t before we know the proactor or driver. Critical sections
+ * are small: only get/set of the pn_connection_t driver pointer.
+ *
+ * TODO: replace mutex with atomic load/store
+ */
+static pthread_mutex_t driver_data_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+  if (!c) return NULL;
+  lock(&driver_data_mutex);
+  pconnection_t *pc = (pconnection_t*)*pn_connection_driver_data(c);
+  unlock(&driver_data_mutex);
+  return pc;
+}
+
+static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
+  lock(&driver_data_mutex);
+  *pn_connection_driver_data(c) = pc;
+  unlock(&driver_data_mutex);
+}
+
 /*
  * A listener can have mutiple sockets (as specified in the addrinfo).  They
  * are armed separately.  The individual psockets can be part of at most one
@@ -742,20 +761,6 @@ static int pclosefd(pn_proactor_t *p, int fd) {
 // pconnection
 // ========================================================================
 
-/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
-#define CID_pconnection CID_pn_object
-#define pconnection_inspect NULL
-#define pconnection_initialize NULL
-#define pconnection_hashcode NULL
-#define pconnection_compare NULL
-
-static void pconnection_finalize(void *vp_pconnection) {
-  pconnection_t *pc = (pconnection_t*)vp_pconnection;
-  pcontext_finalize(&pc->context);
-}
-
-static const pn_class_t pconnection_class = PN_CLASS(pconnection);
-
 static void pconnection_tick(pconnection_t *pc);
 
 static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t
*c, pn_transport_t *t, bool server, const char *addr)
@@ -767,18 +772,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
     return "pn_connection_driver_init failure";
   }
 
-  lock(&p->bind_mutex);
-  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  if (pn_record_get(r, PN_PROACTOR)) {
-    unlock(&p->bind_mutex);
-    pn_connection_driver_destroy(&pc->driver);
-    free(pc);
-    return "pn_connection_t already in use";
-  }
-  pn_record_def(r, PN_PROACTOR, &pconnection_class);
-  pn_record_set(r, PN_PROACTOR, pc);
-  pc->bound = true;
-  unlock(&p->bind_mutex);
+  set_pconnection(pc->driver.connection, pc);
 
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
@@ -807,7 +801,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
   }
   pmutex_init(&pc->rearm_mutex);
 
-  pn_decref(pc);                /* Will be deleted when the connection is */
   return NULL;
 }
 
@@ -818,16 +811,17 @@ static inline bool pconnection_is_final(pconnection_t *pc) {
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
+  if (pc->driver.connection) {
+    set_pconnection(pc->driver.connection, NULL);
+  }
   if (pc->addrinfo) {
     freeaddrinfo(pc->addrinfo);
   }
   pmutex_finalize(&pc->rearm_mutex);
   pn_condition_free(pc->disconnect_condition);
-  if (pc->bound)
-      pn_incref(pc);                /* Make sure we don't do a circular free */
   pn_connection_driver_destroy(&pc->driver);
-  pn_decref(pc);
-  /* Freed if not bound, otherwise pc is freed iff the pn_connection_t is freed. */
+  pcontext_finalize(&pc->context);
+  free(pc);
 }
 
 // call without lock, but only if pconnection_is_final() is true
@@ -881,7 +875,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) {
 
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
-  if (!pc->bound) return NULL;
+  if (!pc->driver.connection) return NULL;
   pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
   if (!e) {
     write_flush(pc);  // May generate transport event
@@ -972,12 +966,6 @@ static void pconnection_done(pconnection_t *pc) {
   if (notify) wake_notify(&pc->context);
 }
 
-static pconnection_t *get_pconnection(pn_connection_t* c) {
-  if (!c) return NULL;
-  pn_record_t *r = pn_connection_attachments(c);
-  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
-}
-
 // Return true unless error
 static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
   ssize_t n = send(pc->psocket.sockfd, wbuf.start, wbuf.size, MSG_NOSIGNAL);
@@ -1300,7 +1288,7 @@ static bool wake_if_inactive(pn_proactor_t *p) {
 }
 
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const
char *addr) {
-  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
   const char *err = pconnection_setup(pc, p, c, t, false, addr);
   if (err) {    /* TODO aconway 2017-09-13: errors must be reported as events */
@@ -1367,16 +1355,9 @@ void pn_proactor_release_connection(pn_connection_t *c) {
   bool notify = false;
   pconnection_t *pc = get_pconnection(c);
   if (pc) {
+    set_pconnection(c, NULL);
     lock(&pc->context.mutex);
-    // reverse lifecycle entanglement of pc and c from new_pconnection_t()
-    pn_incref(pc);
-    pn_proactor_t *p = pc->psocket.proactor;
-    lock(&p->bind_mutex);
-    pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-    pn_record_set(r, PN_PROACTOR, NULL);
     pn_connection_driver_release_connection(&pc->driver);
-    pc->bound = false;  // Transport unbound
-    unlock(&p->bind_mutex);
     pconnection_begin_close(pc);
     notify = wake(&pc->context);
     unlock(&pc->context.mutex);
@@ -1714,7 +1695,7 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) {
-  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
   const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, "");
   if (err) {
@@ -1785,7 +1766,6 @@ pn_proactor_t *pn_proactor() {
   p->epollfd = p->eventfd = p->timer.timerfd = -1;
   pcontext_init(&p->context, PROACTOR, p, p);
   pmutex_init(&p->eventfd_mutex);
-  pmutex_init(&p->bind_mutex);
   ptimer_init(&p->timer, 0);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
@@ -1839,7 +1819,6 @@ void pn_proactor_free(pn_proactor_t *p) {
 
   pn_collector_free(p->collector);
   pmutex_finalize(&p->eventfd_mutex);
-  pmutex_finalize(&p->bind_mutex);
   pcontext_finalize(&p->context);
   free(p);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message