qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-1842: epoll proactor - add secondary/chained epollfd to maintain 1-1 count between epoll registrations and eventual callbacks on pconnections
Date Wed, 15 Aug 2018 06:17:26 GMT
PROTON-1842: epoll proactor - add secondary/chained epollfd to maintain 1-1 count between epoll
registrations and eventual callbacks on pconnections


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/803a47ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/803a47ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/803a47ed

Branch: refs/heads/master
Commit: 803a47ed386497f6467dd4f5fb4f9f57d545695d
Parents: c601b98
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Tue Aug 14 23:14:47 2018 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Tue Aug 14 23:14:47 2018 -0700

----------------------------------------------------------------------
 c/src/proactor/epoll.c | 189 +++++++++++++++++++++++++++++++++++++-------
 1 file changed, 159 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/803a47ed/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 209c0fe..9ea839f 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -117,8 +117,10 @@ typedef struct acceptor_t acceptor_t;
 typedef enum {
   WAKE,   /* see if any work to do in proactor/psocket context */
   PCONNECTION_IO,
+  PCONNECTION_IO_2,
   PCONNECTION_TIMER,
   LISTENER_IO,
+  CHAINED_EPOLL,
   PROACTOR_TIMER } epoll_type_t;
 
 // Data to use with epoll.
@@ -299,6 +301,7 @@ static bool start_polling(epoll_extended_t *ee, int epollfd) {
 
 static void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
+  // TODO: is EPOLL_CTL_DEL ever needed beyond auto de-register when ee->fd is closed?
   if (ee->fd == -1 || !ee->polling || epollfd == -1)
     return;
   struct epoll_event ev = {0};
@@ -327,14 +330,37 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
  * thread.  Conversely, a thread must never stop working without
  * checking if it has newly arrived work.
  *
- * External wake operations, like pn_connection_wake() and are built on top of
- * the internal wake mechanism.  The former coalesces multiple wakes until event
- * delivery, the latter does not.  The WAKEABLE implementation can be modeled on
- * whichever is more suited.
+ * External wake operations, like pn_connection_wake() are built on top of
+ * the internal wake mechanism.
  *
  * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated
  * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
  */
+
+/*
+ * **** epollfd and epollfd_2 ****
+ *
+ * This implementation allows multiple threads to call epoll_wait()
+ * concurrently (as opposed to having a single thread call
+ * epoll_wait() and feed work to helper threads).  Unfortunately
+ * with this approach, it is not possible to change the event
+ * mask in one thread and be certain if zero or one callbacks occurred
+ * on the previous event mask.  This can greatly complicate ordered
+ * shutdown.  (See PROTON-1842)
+ *
+ * Currently, only pconnection sockets change between EPOLLIN,
+ * EPOLLOUT, or both.  The rest use a constant EPOLLIN event mask.
+ * Instead of trying to change the event mask for pconnection sockets,
+ * if there is a missing attribute, it is added (EPOLLIN or EPOLLOUT)
+ * as an event mask on the secondary or chained epollfd_2.  epollfd_2
+ * is part of the epollfd fd set, so active events in epollfd_2 are
+ * also seen in epollfd (but require a separate epoll_wait() and
+ * rearm() to extract).
+ *
+ * Using this method and EPOLLONESHOT, it is possible to wait for all
+ * outstanding armings on a socket to "resolve" via epoll_wait()
+ * callbacks before freeing resources.
+ */
 typedef enum {
   PROACTOR,
   PCONNECTION,
@@ -383,11 +409,13 @@ typedef struct psocket_t {
 struct pn_proactor_t {
   pcontext_t context;
   int epollfd;
+  int epollfd_2;
   ptimer_t timer;
   pn_collector_t *collector;
   pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
   epoll_extended_t epoll_wake;
   epoll_extended_t epoll_interrupt;
+  epoll_extended_t epoll_secondary;
   pn_event_batch_t batch;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
   // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
@@ -508,6 +536,7 @@ typedef struct pconnection_t {
   psocket_t psocket;
   pcontext_t context;
   uint32_t new_events;
+  uint32_t new_events_2;
   int wake_count;
   bool server;                /* accept, not connect */
   bool tick_pending;
@@ -517,6 +546,7 @@ typedef struct pconnection_t {
   ptimer_t timer;  // TODO: review one timerfd per connection
   // Following values only changed by (sole) working context:
   uint32_t current_arm;  // active epoll io events
+  uint32_t current_arm_2;  // secondary active epoll io events
   bool connected;
   bool read_blocked;
   bool write_blocked;
@@ -528,6 +558,8 @@ typedef struct pconnection_t {
   struct addrinfo *addrinfo;         /* Resolved address list */
   struct addrinfo *ai;               /* Current connect address */
   pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
+  epoll_extended_t epoll_io_2;
+  epoll_extended_t *rearm_target;    /* main or secondary epollfd */
 } pconnection_t;
 
 /* Protects read/update of pn_connnection_t pointer to it's pconnection_t
@@ -591,7 +623,7 @@ struct pn_listener_t {
   pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
 };
 
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout,
bool topup);
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout,
bool topup, bool is_io_2);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(pcontext_t *ctx);
@@ -689,6 +721,24 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
+// Only used by pconnection_t if two separate epoll interests in play
+static void rearm_2(pn_proactor_t *p, epoll_extended_t *ee) {
+  // Delay registration until first use.  It's not OK to register or arm
+  // with an event mask of 0 (documented below).  It is OK to leave a
+  // disabled event registered until the next EPOLLONESHOT.
+  if (!ee->polling) {
+    ee->fd = ee->psocket->sockfd;
+    start_polling(ee, p->epollfd_2);
+  } else {
+    struct epoll_event ev = {0};
+    ev.data.ptr = ee;
+    ev.events = ee->wanted | EPOLLONESHOT;
+    memory_barrier(ee);
+    if (epoll_ctl(p->epollfd_2, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+      EPOLL_FATAL("arming polled file descriptor (secondary)", errno);
+  }
+}
+
 static void listener_list_append(acceptor_t **start, acceptor_t *item) {
   assert(item->next == NULL);
   if (*start) {
@@ -776,6 +826,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
   pc->new_events = 0;
+  pc->new_events_2 = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
   pc->timer_armed = false;
@@ -783,6 +834,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
   pc->disconnect_condition = NULL;
 
   pc->current_arm = 0;
+  pc->current_arm_2 = 0;
   pc->connected = false;
   pc->read_blocked = true;
   pc->write_blocked = true;
@@ -800,6 +852,13 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
   }
   pmutex_init(&pc->rearm_mutex);
 
+  epoll_extended_t *ee = &pc->epoll_io_2;
+  ee->psocket = &pc->psocket;
+  ee->fd = -1;
+  ee->type = PCONNECTION_IO_2;
+  ee->wanted = 0;
+  ee->polling = false;
+
   /* Set the pconnection_t backpointer last.
      Connections that were released by pn_proactor_release_connection() must not reveal themselves
      to be re-associated with a proactor till setup is complete.
@@ -812,10 +871,13 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t
*p, pn_con
 // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true),
timer cancelled.
 // Return true when all possible outstanding epoll events associated with this pconnection
have been processed.
 static inline bool pconnection_is_final(pconnection_t *pc) {
-  return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
+  return !pc->current_arm && !pc->current_arm_2 && !pc->timer_armed
&& !pc->context.wake_ops;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
+  // Ensure any lingering pconnection_rearm is all done.
+  lock(&pc->rearm_mutex);  unlock(&pc->rearm_mutex);
+
   if (pc->driver.connection) {
     set_pconnection(pc->driver.connection, NULL);
   }
@@ -846,12 +908,10 @@ static void pconnection_cleanup(pconnection_t *pc) {
 
 // Call with lock held or from forced_shutdown
 static void pconnection_begin_close(pconnection_t *pc) {
-  lock(&pc->rearm_mutex);
-  unlock(&pc->rearm_mutex);  // Allow parallel pconnection_rearm() to complete
   if (!pc->context.closing) {
     pc->context.closing = true;
-    if (pc->current_arm != 0 && !pc->new_events) {
-      // Force io callback via an EPOLLHUP
+    if (pc->current_arm || pc->current_arm_2) {
+      // Force EPOLLHUP callback(s)
       shutdown(pc->psocket.sockfd, SHUT_RDWR);
     }
 
@@ -870,6 +930,8 @@ static void pconnection_forced_shutdown(pconnection_t *pc) {
   // Called by proactor_free, no competing threads, no epoll activity.
   pc->current_arm = 0;
   pc->new_events = 0;
+  pc->current_arm_2 = 0;
+  pc->new_events_2 = 0;
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
   pc->timer_armed = false;
@@ -888,7 +950,7 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
     write_flush(pc);  // May generate transport event
     e = pn_connection_driver_next_event(&pc->driver);
     if (!e && pc->hog_count < HOG_MAX) {
-      if (pconnection_process(pc, 0, false, true)) {
+      if (pconnection_process(pc, 0, false, true, false)) {
         e = pn_connection_driver_next_event(&pc->driver);
       }
     }
@@ -915,6 +977,7 @@ static inline bool pconnection_wclosed(pconnection_t  *pc) {
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
 */
 static bool pconnection_rearm_check(pconnection_t *pc) {
+  if (pc->current_arm && pc->current_arm_2) return false;  // Maxed out
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
     return false;
   }
@@ -928,22 +991,38 @@ static bool pconnection_rearm_check(pconnection_t *pc) {
         wanted_now |= EPOLLOUT;
     }
   }
-  if (!wanted_now || pc->current_arm == wanted_now) return false;
+  if (!wanted_now) return false;
+
+  uint32_t have_now = pc->current_arm ?  pc->current_arm : pc->current_arm_2;
+  uint32_t needed = wanted_now & ~have_now;
+  if (!needed) return false;
 
   lock(&pc->rearm_mutex);      /* unlocked in pconnection_rearm... */
-  pc->psocket.epoll_io.wanted = wanted_now;
-  pc->current_arm = wanted_now;
+  // Always favour main epollfd
+  if (!pc->current_arm) {
+    pc->current_arm = pc->psocket.epoll_io.wanted = needed;
+    pc->rearm_target = &pc->psocket.epoll_io;
+  } else {
+    pc->current_arm_2 = pc->epoll_io_2.wanted = needed;
+    pc->rearm_target = &pc->epoll_io_2;
+  }
   return true;                     /* ... so caller MUST call pconnection_rearm */
 }
 
 /* Call without lock */
 static inline void pconnection_rearm(pconnection_t *pc) {
-  rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
+  if (pc->rearm_target == &pc->psocket.epoll_io) {
+    rearm(pc->psocket.proactor, pc->rearm_target);
+  } else {
+    rearm_2(pc->psocket.proactor, pc->rearm_target);
+  }
+  pc->rearm_target = NULL;
   unlock(&pc->rearm_mutex);
+  // Return immediately.  pc may have just been freed by another thread.
 }
 
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+  if (pc->new_events || pc->new_events_2 || pc->wake_count || pc->tick_pending
|| pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
@@ -969,8 +1048,9 @@ static void pconnection_done(pconnection_t *pc) {
   }
   bool rearm = pconnection_rearm_check(pc);
   unlock(&pc->context.mutex);
-  if (rearm) pconnection_rearm(pc);
   if (notify) wake_notify(&pc->context);
+  if (rearm) pconnection_rearm(pc);  // May free pc on another thread.  Return.
+  return;
 }
 
 // Return true unless error
@@ -1011,11 +1091,12 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc);
  * May be called concurrently from multiple threads:
  *   pn_event_batch_t loop (topup is true)
  *   timer (timeout is true)
- *   socket io (events != 0)
+ *   socket io (events != 0) from PCONNECTION_IO
+ *      and PCONNECTION_IO_2 event masks (possibly simultaneously)
  *   one or more wake()
  * Only one thread becomes (or always was) the working thread.
  */
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout,
bool topup) {
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout,
bool topup, bool is_io_2) {
   bool inbound_wake = !(events | timeout | topup);
   bool rearm_timer = false;
   bool timer_fired = false;
@@ -1031,7 +1112,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
   lock(&pc->context.mutex);
 
   if (events) {
-    pc->new_events = events;
+    if (is_io_2)
+      pc->new_events_2 = events;
+    else
+      pc->new_events = events;
     events = 0;
   }
   else if (timer_fired) {
@@ -1088,19 +1172,28 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
     tick_required = !closed;
   }
 
+  uint32_t update_events = 0;
   if (pc->new_events) {
+    update_events = pc->new_events;
     pc->current_arm = 0;
+    pc->new_events = 0;
+  }
+  if (pc->new_events_2) {
+    update_events |= pc->new_events_2;
+    pc->current_arm_2 = 0;
+    pc->new_events_2 = 0;
+  }
+  if (update_events) {
     if (!pc->context.closing) {
-      if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc)
&& !pconnection_wclosed(pc))
+      if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc)
&& !pconnection_wclosed(pc))
         pconnection_maybe_connect_lh(pc);
       else
         pconnection_connected_lh(pc); /* Non error event means we are connected */
-      if (pc->new_events & EPOLLOUT)
+      if (update_events & EPOLLOUT)
         pc->write_blocked = false;
-      if (pc->new_events & EPOLLIN)
+      if (update_events & EPOLLIN)
         pc->read_blocked = false;
     }
-    pc->new_events = 0;
   }
 
   if (pc->context.closing && pconnection_is_final(pc)) {
@@ -1189,7 +1282,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
   bool rearm_pc = pconnection_rearm_check(pc);  // holds rearm_mutex until pconnection_rearm()
below
 
   unlock(&pc->context.mutex);
-  if (rearm_pc) pconnection_rearm(pc);
+  if (rearm_pc) pconnection_rearm(pc);  // May free pc on another thread.  Return right away.
   return NULL;
 }
 
@@ -1765,6 +1858,16 @@ static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int
epollfd) {
   start_polling(ee, epollfd);  // TODO: check for error
 }
 
+/* Set up the epoll_extended_t to be used for secondary socket events */
+static void epoll_secondary_init(epoll_extended_t *ee, int epoll_fd_2, int epollfd) {
+  ee->psocket = NULL;
+  ee->fd = epoll_fd_2;
+  ee->type = CHAINED_EPOLL;
+  ee->wanted = EPOLLIN;
+  ee->polling = false;
+  start_polling(ee, epollfd);  // TODO: check for error
+}
+
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   if (!p) return NULL;
@@ -1773,7 +1876,7 @@ pn_proactor_t *pn_proactor() {
   pmutex_init(&p->eventfd_mutex);
   ptimer_init(&p->timer, 0);
 
-  if ((p->epollfd = epoll_create(1)) >= 0) {
+  if ((p->epollfd = epoll_create(1)) >= 0 && (p->epollfd_2 = epoll_create(1))
>= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
       if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
         if (p->timer.timerfd >= 0)
@@ -1783,12 +1886,14 @@ pn_proactor_t *pn_proactor() {
             p->timer_armed = true;
             epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd);
             epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd);
+            epoll_secondary_init(&p->epoll_secondary, p->epollfd_2, p->epollfd);
             return p;
           }
       }
     }
   }
   if (p->epollfd >= 0) close(p->epollfd);
+  if (p->epollfd_2 >= 0) close(p->epollfd_2);
   if (p->eventfd >= 0) close(p->eventfd);
   if (p->interruptfd >= 0) close(p->interruptfd);
   ptimer_finalize(&p->timer);
@@ -1802,6 +1907,8 @@ void pn_proactor_free(pn_proactor_t *p) {
   p->shutting_down = true;
   close(p->epollfd);
   p->epollfd = -1;
+  close(p->epollfd_2);
+  p->epollfd_2 = -1;
   close(p->eventfd);
   p->eventfd = -1;
   close(p->interruptfd);
@@ -1906,6 +2013,26 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t
even
   return NULL;
 }
 
+static pn_event_batch_t *proactor_chained_epoll_wait(pn_proactor_t *p) {
+  // process one ready pconnection socket event from the secondary/chained epollfd_2
+  struct epoll_event ev = {0};
+  int n = epoll_wait(p->epollfd_2, &ev, 1, 0);
+  if (n < 0) {
+    if (errno != EINTR)
+      perror("epoll_wait"); // TODO: proper log
+  } else if (n > 0) {
+    assert(n == 1);
+    rearm(p, &p->epoll_secondary);
+    epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+    memory_barrier(ee);
+    assert(ee->type == PCONNECTION_IO_2);
+    pconnection_t *pc = psocket_pconnection(ee->psocket);
+    return pconnection_process(pc, ev.events, false, false, true);
+  }
+  rearm(p, &p->epoll_secondary);
+  return NULL;
+}
+
 static void proactor_add(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
@@ -1963,7 +2090,7 @@ static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t
      case PROACTOR:
       return proactor_process(p, PN_EVENT_NONE);
      case PCONNECTION:
-      return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
+      return pconnection_process((pconnection_t *) ctx->owner, 0, false, false, false);
      case LISTENER:
       return listener_process(&((pn_listener_t *) ctx->owner)->acceptors[0].psocket,
0);
      default:
@@ -1977,7 +2104,7 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p,
bool can_blo
   int timeout = can_block ? -1 : 0;
   while(true) {
     pn_event_batch_t *batch = NULL;
-    struct epoll_event ev;
+    struct epoll_event ev = {0};
     int n = epoll_wait(p->epollfd, &ev, 1, timeout);
 
     if (n < 0) {
@@ -2003,14 +2130,16 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p,
bool can_blo
       batch = process_inbound_wake(p, ee);
     } else if (ee->type == PROACTOR_TIMER) {
       batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
+    } else if (ee->type == CHAINED_EPOLL) {
+      batch = proactor_chained_epoll_wait(p);  // expect a PCONNECTION_IO_2
     } else {
       pconnection_t *pc = psocket_pconnection(ee->psocket);
       if (pc) {
         if (ee->type == PCONNECTION_IO) {
-          batch = pconnection_process(pc, ev.events, false, false);
+          batch = pconnection_process(pc, ev.events, false, false, false);
         } else {
           assert(ee->type == PCONNECTION_TIMER);
-          batch = pconnection_process(pc, 0, true, false);
+          batch = pconnection_process(pc, 0, true, false, false);
         }
       }
       else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message