qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: PROTON-1493: c-proactor make pn_proactor_interrupt async-signal-safe
Date Wed, 31 May 2017 14:49:40 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 8d862beab -> 5f8738f57


PROTON-1493: c-proactor make pn_proactor_interrupt async-signal-safe

pn_proactor_interrupt() will  often be used from signal handlers so must be
async-signal-safe. Updated the documentation and modified the implementations
of pn_proactor_interrupt() to use only async-signal-safe calls, no locks.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5f8738f5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5f8738f5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5f8738f5

Branch: refs/heads/master
Commit: 5f8738f573c3e9c39608714453b2425e3a105ec7
Parents: 8d862be
Author: Alan Conway <aconway@redhat.com>
Authored: Mon May 29 17:12:36 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed May 31 10:49:36 2017 -0400

----------------------------------------------------------------------
 INSTALL.md                         |  11 +++
 examples/c/proactor/broker.c       |   8 +--
 proton-c/include/proton/proactor.h |  11 +--
 proton-c/src/proactor/epoll.c      | 122 +++++++++++++++++---------------
 proton-c/src/proactor/libuv.c      |  37 +++++++---
 5 files changed, 112 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/INSTALL.md
----------------------------------------------------------------------
diff --git a/INSTALL.md b/INSTALL.md
index 8de93fe..e5e5db6 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -137,6 +137,17 @@ Note that if you wish to build debug version of proton for use with
 swig bindings on Windows, you must have the appropriate debug target
 libraries to link against.
 
+Other platforms
+---------------
+
+Proton can use the http://libuv.org IO library on any platform where
+it is available. Install the libuv library and header files and adapt
+the instructions for building on Linux.
+
+The libuv library is not required on Linux or Windows but if you wish
+you can use it instead of the default native IO by running cmake with
+`-Dproactor=libuv`
+
 Installing Language Bindings
 ----------------------------
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 7d95e7f..d9285db 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -195,9 +195,8 @@ typedef struct broker_t {
 } broker_t;
 
 void broker_stop(broker_t *b) {
-  /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
-  for (size_t i = 0; i < b->threads; ++i)
-    pn_proactor_interrupt(b->proactor);
+  /* Interrupt the proactor to stop the working threads. */
+  pn_proactor_interrupt(b->proactor);
 }
 
 /* Try to send if link is sender and has credit */
@@ -369,12 +368,13 @@ static void handle(broker_t* b, pn_event_t* e) {
 
  break;
 
-   case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
     broker_stop(b);
     break;
 
    case PN_PROACTOR_INTERRUPT:
     b->finished = true;
+    pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads
*/
     break;
 
    default:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 861afbe..9c7ce59 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -196,12 +196,13 @@ PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t
*even
 /**
  * Return a @ref PN_PROACTOR_INTERRUPT event as soon as possible.
  *
- * Exactly one @ref PN_PROACTOR_INTERRUPT event is generated for each call to
- * pn_proactor_interrupt().  If threads are blocked in pn_proactor_wait(), one
- * of them will be interrupted, otherwise the interrupt will be returned by a
- * future call to pn_proactor_wait(). Calling pn_proactor_interrupt().
+ * At least one PN_PROACTOR_INTERRUPT event will be returned after this call.
+ * Interrupts can be "coalesced" - if several pn_proactor_interrupt() calls
+ * happen close together, there may be only one PN_PROACTOR_INTERRUPT event that
+ * occurs after all of them.
  *
- * @note Thread safe
+ * @note Thread-safe and async-signal-safe: can be called in a signal handler.
+ * This is the only pn_proactor function that is async-signal-safe.
  */
 PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7490ecd..b258da3 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -302,11 +302,13 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
  * thread.  Conversely, a thread must never stop working without
  * checking if it has newly arrived work.
  *
- * External wake operations, like pn_connection_wake() and
- * pn_proactor_interrupt(), are built on top of the internal wake
- * mechanism.  The former coalesces multiple wakes until event
- * delivery, the latter does not.  The WAKEABLE implementation can be
- * modeled on whichever is more suited.
+ * External wake operations, like pn_connection_wake() and are built on top of
+ * the internal wake mechanism.  The former coalesces multiple wakes until event
+ * delivery, the latter does not.  The WAKEABLE implementation can be modeled on
+ * whichever is more suited.
+ *
+ * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated
+ * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
  */
 typedef enum {
   PROACTOR,
@@ -360,10 +362,10 @@ struct pn_proactor_t {
   pn_collector_t *collector;
   pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
   epoll_extended_t epoll_wake;
+  epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
-  size_t interrupts;            /* total pending interrupts */
-  size_t deferred_interrupts;   /* interrupts for current batch */
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
+  bool interrupt;
   bool inactive;
   bool timer_expired;
   bool timer_cancelled;
@@ -375,6 +377,8 @@ struct pn_proactor_t {
   bool wakes_in_progress;
   pcontext_t *wake_list_first;
   pcontext_t *wake_list_last;
+  // Interrupts have a dedicated eventfd because they must be async-signal safe.
+  int interruptfd;
 };
 
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
@@ -1470,6 +1474,16 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
 // proactor
 // ========================================================================
 
+/* Set up an epoll_extended_t to be used for wakeup or interrupts */
+static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+  ee->psocket = NULL;
+  ee->fd = eventfd;
+  ee->type = WAKE;
+  ee->wanted = EPOLLIN;
+  ee->polling = false;
+  start_polling(ee, epollfd);  // TODO: check for error
+}
+
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   if (!p) return NULL;
@@ -1478,26 +1492,24 @@ pn_proactor_t *pn_proactor() {
   pmutex_init(&p->eventfd_mutex);
   ptimer_init(&p->timer, 0);
 
-  if ((p->epollfd = epoll_create(1)) >= 0)
+  if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
-      if (p->timer.timerfd >= 0)
-        if ((p->collector = pn_collector()) != NULL) {
-          p->batch.next_event = &proactor_batch_next;
-          start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for error
-          p->timer_armed = true;
-
-          p->epoll_wake.psocket = NULL;
-          p->epoll_wake.fd = p->eventfd;
-          p->epoll_wake.type = WAKE;
-          p->epoll_wake.wanted = EPOLLIN;
-          p->epoll_wake.polling = false;
-          start_polling(&p->epoll_wake, p->epollfd);  // TODO: check for error
-          return p;
-        }
+      if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
+        if (p->timer.timerfd >= 0)
+          if ((p->collector = pn_collector()) != NULL) {
+            p->batch.next_event = &proactor_batch_next;
+            start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for
error
+            p->timer_armed = true;
+            epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd);
+            epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd);
+            return p;
+          }
+      }
     }
-
+  }
   if (p->epollfd >= 0) close(p->epollfd);
   if (p->eventfd >= 0) close(p->eventfd);
+  if (p->interruptfd >= 0) close(p->eventfd);
   ptimer_finalize(&p->timer);
   if (p->collector) pn_free(p->collector);
   free (p);
@@ -1510,6 +1522,8 @@ void pn_proactor_free(pn_proactor_t *p) {
   p->epollfd = -1;
   close(p->eventfd);
   p->eventfd = -1;
+  close(p->interruptfd);
+  p->interruptfd = -1;
   ptimer_finalize(&p->timer);
   while (p->contexts) {
     pcontext_t *ctx = p->contexts;
@@ -1551,34 +1565,23 @@ static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t)
{
 static bool proactor_update_batch(pn_proactor_t *p) {
   if (proactor_has_event(p))
     return true;
-  if (p->deferred_interrupts > 0) {
-    // drain these first
-    --p->deferred_interrupts;
-    --p->interrupts;
-    proactor_add_event(p, PN_PROACTOR_INTERRUPT);
-    return true;
-  }
 
   if (p->timer_expired) {
     p->timer_expired = false;
     proactor_add_event(p, PN_PROACTOR_TIMEOUT);
     return true;
   }
-
-  int ec = 0;
-  if (p->interrupts > 0) {
-    --p->interrupts;
+  if (p->interrupt) {
+    p->interrupt = false;
     proactor_add_event(p, PN_PROACTOR_INTERRUPT);
-    ec++;
-    if (p->interrupts > 0)
-      p->deferred_interrupts = p->interrupts;
+    return true;
   }
-  if (p->inactive && ec == 0) {
+  if (p->inactive) {
     p->inactive = false;
-    ec++;
     proactor_add_event(p, PN_PROACTOR_INACTIVE);
+    return true;
   }
-  return ec > 0;
+  return false;
 }
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
@@ -1590,10 +1593,12 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
   return log_event(p, e);
 }
 
-static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
-  bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) {
+  bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer)
!= 0;
   lock(&p->context.mutex);
-  if (timeout) {
+  if (event == PN_PROACTOR_INTERRUPT) {
+    p->interrupt = true;
+  } else if (event == PN_PROACTOR_TIMEOUT) {
     p->timer_armed = false;
     if (timer_fired && !p->timer_cancelled)
       p->timer_expired = true;
@@ -1667,17 +1672,20 @@ static bool proactor_remove(pcontext_t *ctx) {
   return can_free;
 }
 
-static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
+static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
+  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd
*/
+    return proactor_process(p, PN_PROACTOR_INTERRUPT);
+  }
   pcontext_t *ctx = wake_pop_front(p);
   if (ctx) {
     switch (ctx->type) {
-    case PROACTOR:
-      return proactor_process(p, false);
-    case PCONNECTION:
+     case PROACTOR:
+      return proactor_process(p, PN_EVENT_NONE);
+     case PCONNECTION:
       return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
-    case LISTENER:
-     return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
-    default:
+     case LISTENER:
+      return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
+     default:
       assert(ctx->type == WAKEABLE); // TODO: implement or remove
     }
   }
@@ -1710,9 +1718,9 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p,
bool can_blo
     epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
 
     if (ee->type == WAKE) {
-      batch = process_inbound_wake(p);
+      batch = process_inbound_wake(p, ee);
     } else if (ee->type == PROACTOR_TIMER) {
-      batch = proactor_process(p, true);
+      batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
     } else {
       pconnection_t *pc = psocket_pconnection(ee->psocket);
       if (pc) {
@@ -1772,11 +1780,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
 }
 
 void pn_proactor_interrupt(pn_proactor_t *p) {
-  lock(&p->context.mutex);
-  ++p->interrupts;
-  bool notify = wake(&p->context);
-  unlock(&p->context.mutex);
-  if (notify) wake_notify(&p->context);
+  if (p->interruptfd == -1)
+    return;
+  uint64_t increment = 1;
+  if (write(p->interruptfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
+    EPOLL_FATAL("setting eventfd", errno);
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index cf7a31b..8cd6dd7 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -52,7 +52,7 @@
   libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows:
 
   - At most one thread at a time is the "leader". The leader runs the UV loop till there
-  are events to process and then becomes a "worker"n
+  are events to process and then becomes a "worker"
 
   - Concurrent "worker" threads process events for separate connections or listeners.
   When they run out of work they become "followers"
@@ -227,10 +227,13 @@ struct pn_listener_t {
 typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t;
 
 struct pn_proactor_t {
+  /* Notification */
+  uv_async_t notify;
+  uv_async_t interrupt;
+
   /* Leader thread  */
   uv_cond_t cond;
   uv_loop_t loop;
-  uv_async_t async;
   uv_timer_t timer;
 
   /* Owner thread: proactor collector and batch can belong to leader or a worker */
@@ -241,7 +244,6 @@ struct pn_proactor_t {
   uv_mutex_t lock;
   work_queue_t worker_q;      /* ready for work, to be returned via pn_proactor_wait()  */
   work_queue_t leader_q;      /* waiting for attention by the leader thread */
-  size_t interrupt;           /* pending interrupts */
   timeout_state_t timeout_state;
   pn_millis_t timeout;
   size_t count;               /* connection/listener count for INACTIVE events */
@@ -250,12 +252,21 @@ struct pn_proactor_t {
   bool inactive;
   bool has_leader;
   bool batch_working;         /* batch is being processed in a worker thread */
+  bool need_interrupt;        /* Need a PN_PROACTOR_INTERRUPT event */
 };
 
 
 /* Notify the leader thread that there is something to do outside of uv_run() */
 static inline void notify(pn_proactor_t* p) {
-  uv_async_send(&p->async);
+  uv_async_send(&p->notify);
+}
+
+/* Set the interrupt flag in the leader thread to avoid race conditions. */
+void on_interrupt(uv_async_t *async) {
+  if (async->data) {
+    pn_proactor_t *p = (pn_proactor_t*)async->data;
+    p->need_interrupt = true;
+  }
 }
 
 /* Notify that this work item needs attention from the leader at the next opportunity */
@@ -814,8 +825,8 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
       p->inactive = false;
       return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
     }
-    if (p->interrupt > 0) {
-      --p->interrupt;
+    if (p->need_interrupt) {
+      p->need_interrupt = false;
       return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
     }
     if (p->timeout_state == TM_FIRED) {
@@ -1072,10 +1083,12 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
 }
 
 void pn_proactor_interrupt(pn_proactor_t *p) {
-  uv_mutex_lock(&p->lock);
-  ++p->interrupt;
-  uv_mutex_unlock(&p->lock);
-  notify(p);
+  /* NOTE: pn_proactor_interrupt must be async-signal-safe so we cannot use
+     locks to update shared proactor state here. Instead we use a dedicated
+     uv_async, the on_interrupt() callback will set the interrupt flag in the
+     safety of the leader thread.
+   */
+  uv_async_send(&p->interrupt);
 }
 
 void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
@@ -1155,7 +1168,9 @@ pn_proactor_t *pn_proactor() {
   uv_loop_init(&p->loop);
   uv_mutex_init(&p->lock);
   uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL);
+  uv_async_init(&p->loop, &p->notify, NULL);
+  uv_async_init(&p->loop, &p->interrupt, on_interrupt);
+  p->interrupt.data = p;
   uv_timer_init(&p->loop, &p->timer);
   p->timer.data = p;
   p->disconnect_cond = pn_condition();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message