qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/4] qpid-proton git commit: PROTON-1460: epoll proactor fix hang in test-refuse
Date Mon, 08 May 2017 19:03:59 GMT
PROTON-1460: epoll proactor fix hang in test-refuse

Removed separate bool read_closed/write_closed flags in pconnection_t and always
call pn_connection_driver_read/write_close() to check status.  The
pn_connection_driver_read/write_close() are (nowadays!)  simple bool checks so
no extra cost, and this avoids bugs where the pn_connection_t flags get out
of sync with the transport - which was the case of the problem in test_refuse.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd84aa28
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd84aa28
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd84aa28

Branch: refs/heads/master
Commit: cd84aa28dee04b381f266c99ed2d8cdcddb8c688
Parents: c8ec3bb
Author: Alan Conway <aconway@redhat.com>
Authored: Mon May 8 11:08:49 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Mon May 8 12:32:24 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 56 ++++++++++++++++----------------------
 1 file changed, 23 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd84aa28/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 42682e3..2545045 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -430,8 +430,6 @@ typedef struct pconnection_t {
   bool connected;
   bool read_blocked;
   bool write_blocked;
-  bool read_closed;
-  bool write_closed;
   bool disconnected;
   int hog_count; // thread hogging limiter
   pn_event_t *cached_event;
@@ -588,8 +586,6 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t
*c, bo
   pc->connected = false;
   pc->read_blocked = true;
   pc->write_blocked = true;
-  pc->read_closed = true;
-  pc->write_closed = true;
   pc->disconnected = false;
   pc->hog_count = 0;;
   pc->cached_event = NULL;
@@ -639,7 +635,6 @@ static void pconnection_cleanup(pconnection_t *pc) {
 void pconnection_begin_close(pconnection_t *pc) {
   if (!pc->context.closing) {
     pc->context.closing = true;
-    pc->read_closed = pc->write_closed = true;
     stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
     pc->current_arm = 0;
     pn_connection_driver_close(&pc->driver);
@@ -678,6 +673,15 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
   return e;
 }
 
+/* Shortcuts */
+static inline bool pconnection_rclosed(pconnection_t  *pc) {
+  return pn_connection_driver_read_closed(&pc->driver);
+}
+
+static inline bool pconnection_wclosed(pconnection_t  *pc) {
+  return pn_connection_driver_write_closed(&pc->driver);
+}
+
 /* Call only from working context (no competitor for pc->current_arm or
    connection driver).  If true returned, caller must do
    pconnection_rearm().
@@ -688,10 +692,11 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
  */
 static bool pconnection_rearm_check(pconnection_t *pc) {
-  if (pc->read_closed && pc->write_closed) return false;
-
-  uint32_t wanted_now = (pc->read_blocked && !pc->read_closed) ? EPOLLIN :
0;
-  if (!pc->write_closed) {
+  if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
+    return false;
+  }
+  uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN
: 0;
+  if (!pconnection_wclosed(pc)) {
     if (pc->write_blocked)
       wanted_now |= EPOLLOUT;
     else {
@@ -715,7 +720,7 @@ static inline void pconnection_rearm(pconnection_t *pc) {
 static inline bool pconnection_work_pending(pconnection_t *pc) {
   if (pc->new_events || pc->wake_count || pc->tick_pending)
     return true;
-  if (!pc->read_blocked && !pc->read_closed)
+  if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
   pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
   return (wbuf.size > 0 && !pc->write_blocked);
@@ -729,7 +734,7 @@ static void pconnection_done(pconnection_t *pc) {
   pc->hog_count = 0;
   if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
     notify = wake(&pc->context);
-  } else if (pc->read_closed && pn_connection_driver_finished(&pc->driver))
{
+  } else if (pconnection_rclosed(pc) && pn_connection_driver_finished(&pc->driver))
{
     pconnection_begin_close(pc);
     if (pconnection_is_final(pc)) {
       unlock(&pc->context.mutex);
@@ -840,12 +845,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
   }
   if (pc->tick_pending) {
     pc->tick_pending = false;
-    if (!(pc->read_closed && pc->write_closed))
+    if (!(pconnection_rclosed(pc) && pconnection_wclosed(pc)))
       tick_required = true;
   }
 
   if (pc->new_events) {
-    if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pc->read_closed &&
!pc->write_closed)
+    if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc)
&& !pconnection_wclosed(pc))
       pconnection_maybe_connect_lh(pc);
     else
       pconnection_connected_lh(pc); /* Non error event means we are connected */
@@ -878,35 +883,25 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
   // read... tick... write
   // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write
 
-  if (!pc->read_closed) {
+  if (!pconnection_rclosed(pc)) {
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    if (rbuf.size == 0) {
-      if (pn_connection_driver_read_closed(&pc->driver))
-        pc->read_closed = true;
-    }
-    else if (!pc->read_blocked) {
+    if (rbuf.size >= 0 && !pc->read_blocked) {
       ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
 
       if (n > 0) {
         pn_connection_driver_read_done(&pc->driver, n);
         pconnection_tick(pc);         /* check for tick changes. */
         tick_required = false;
-        if (pn_connection_driver_read_closed(&pc->driver)) {
-          // No more blocks on read in case peer doesn't send shutdown.
-          pc->read_closed = true;
-        }
-        else if ((size_t) n < rbuf.size)
+        if (!pn_connection_driver_read_closed(&pc->driver) && (size_t)n <
rbuf.size)
           pc->read_blocked = true;
       }
       else if (n == 0) {
         pn_connection_driver_read_close(&pc->driver);
-        pc->read_closed = true;
       }
       else if (errno == EWOULDBLOCK)
         pc->read_blocked = true;
       else if (!(errno == EAGAIN || errno == EINTR)) {
         psocket_error(&pc->psocket, errno, pc->disconnected ? "Disconnected" :
"on read from");
-        pc->read_closed = pc->write_closed = true;
       }
     }
   }
@@ -916,18 +911,16 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
     tick_required = false;
   }
 
-  while (!pc->write_blocked && !pc->write_closed) {
+  while (!pc->write_blocked && !pconnection_wclosed(pc)) {
     pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
     if (wbuf.size > 0) {
       if (!pconnection_write(pc, wbuf)) {
         psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" :
"on write to");
-        pc->read_closed = pc->write_closed = true;
       }
     }
     else {
       if (pn_connection_driver_write_closed(&pc->driver)) {
         shutdown(pc->psocket.sockfd, SHUT_WR);
-        pc->write_closed = true;
         pc->write_blocked = true;
       }
       else
@@ -961,7 +954,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events,
 
   pc->context.working = false;
   pc->hog_count = 0;
-  if (pc->read_closed && pn_connection_driver_finished(&pc->driver)) {
+  if (pconnection_rclosed(pc) && pn_connection_driver_finished(&pc->driver))
{
     pconnection_begin_close(pc);
     if (pconnection_is_final(pc)) {
       unlock(&pc->context.mutex);
@@ -1008,8 +1001,6 @@ static void pconnection_start(pconnection_t *pc) {
   getpeername(fd, (struct sockaddr*)&pc->remote.ss, &len);
 
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
-  pc->read_closed = false;
-  pc->write_closed = false;
   epoll_extended_t *ee = &pc->psocket.epoll_io;
   ee->fd = pc->psocket.sockfd;
   ee->wanted = EPOLLIN | EPOLLOUT;
@@ -1768,7 +1759,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond)
{
           pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
         }
         pn_connection_driver_close(&pc->driver);
-        pc->read_closed = true;
       }
     } else {
       pn_listener_t *l = pcontext_listener(ctx);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message