qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject svn commit: r1512104 - /qpid/proton/trunk/proton-c/src/windows/driver.c
Date Thu, 08 Aug 2013 23:50:35 GMT
Author: cliffjansen
Date: Thu Aug  8 23:50:35 2013
New Revision: 1512104

URL: http://svn.apache.org/r1512104
Log:
proton-372, 362, 349, 314, 294 - windows driver.c catchup

Modified:
    qpid/proton/trunk/proton-c/src/windows/driver.c

Modified: qpid/proton/trunk/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/windows/driver.c?rev=1512104&r1=1512103&r2=1512104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/windows/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/windows/driver.c Thu Aug  8 23:50:35 2013
@@ -127,6 +127,7 @@ struct pn_driver_t {
   size_t closed_count;
   fd_set readfds;
   fd_set writefds;
+  fd_set exceptfds;
   // int max_fds;
   bool overflow;
   pn_socket_t ctrl[2]; //pipe for updating selectable status
@@ -296,6 +297,14 @@ static void pn_configure_sock(pn_socket_
   unsigned long arg = 1;
   if (ioctlsocket(sock, FIONBIO, &arg))
     perror("ioctlsocket");
+
+  //
+  // Disable the Nagle algorithm on TCP connections.
+  //
+  int flag = 1;
+  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+    perror("setsockopt");
+  }
 }
 
 pn_connector_t *pn_listener_accept(pn_listener_t *l)
@@ -499,6 +508,12 @@ void pn_connector_set_context(pn_connect
   ctor->context = context;
 }
 
+const char *pn_connector_name(const pn_connector_t *ctor)
+{
+  if (!ctor) return 0;
+  return ctor->name;
+}
+
 pn_listener_t *pn_connector_listener(pn_connector_t *ctor)
 {
   return ctor ? ctor->listener : NULL;
@@ -644,7 +659,9 @@ void pn_connector_process(pn_connector_t
               c->status &= ~PN_SEL_WR;
           }
         }
-      } else if (pending < 0) {
+      } else if (pending == 0) {
+        c->status &= ~PN_SEL_WR;
+      } else {
         c->output_done = true;
         c->status &= ~PN_SEL_WR;
       }
@@ -757,6 +774,7 @@ static void pn_driver_rebuild(pn_driver_
   // d->max_fds = -1;
   FD_ZERO(&d->readfds);
   FD_ZERO(&d->writefds);
+  FD_ZERO(&d->exceptfds);
 
   FD_SET(d->ctrl[0], &d->readfds);
   // if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
@@ -779,27 +797,28 @@ static void pn_driver_rebuild(pn_driver_
   for (unsigned i = 0; i < d->connector_count; i++)
   {
     if (!c->closed) {
+      FD_SET(c->fd, &d->exceptfds);
       d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
       if (c->status & PN_SEL_RD) {
         if (r_avail) {
           FD_SET(c->fd, &d->readfds);
-	  r_avail--;
-	}
-	else {
+          r_avail--;
+        }
+        else {
           d->overflow = true;
-	  break;
-	}
+          break;
+        }
       }
       if (c->status & PN_SEL_WR) {
         if (w_avail) {
           FD_SET(c->fd, &d->writefds);
-	  w_avail--;
-	}
-	else {
+          w_avail--;
+        }
+        else {
           d->overflow = true;
-	  break;
-	}
-      }	  
+          break;
+        }
+      }   
       // if (c->fd > d->max_fds) d->max_fds = c->fd;
     }
     c = c->connector_next;
@@ -833,10 +852,10 @@ int pn_driver_wait_2(pn_driver_t *d, int
       to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
     }
     else if (timeout < 0) {
-	to_arg = NULL;
+        to_arg = NULL;
     }
   }
-  int nfds = select(/* d->max_fds */ 0, &d->readfds, &d->writefds, NULL,
to_arg);
+  int nfds = select(/* d->max_fds */ 0, &d->readfds, &d->writefds, &d->exceptfds,
to_arg);
   if (nfds == SOCKET_ERROR) {
     errno = WSAGetLastError();
     pn_i_error_from_errno(d->error, "select");
@@ -845,9 +864,11 @@ int pn_driver_wait_2(pn_driver_t *d, int
   return 0;
 }
 
-void pn_driver_wait_3(pn_driver_t *d)
+int pn_driver_wait_3(pn_driver_t *d)
 {
+  bool woken = false;
   if (FD_ISSET(d->ctrl[0], &d->readfds)) {
+    woken = true;
     //clear the pipe
     char buffer[512];
     while (recv(d->ctrl[0], buffer, 512, 0) == 512);
@@ -870,16 +891,29 @@ void pn_driver_wait_3(pn_driver_t *d)
       c->pending_read = FD_ISSET(c->fd, &d->readfds);
       c->pending_write = FD_ISSET(c->fd, &d->writefds);
       c->pending_tick = (c->wakeup &&  c->wakeup <= now);
-// Query if need to set exceptfds as third fd_set for completeness on windows...
+// Unlike Posix no distinction of POLLERR and POLLHUP
 //      if (idx && d->fds[idx].revents & POLLERR)
 //          pn_connector_close(c);
-
+//      else if (idx && (d->fds[idx].revents & POLLHUP)) {
+//          [...]
+// Strategy, defer error to a recv or send if read or write pending.
+// Otherwise proclaim the connection dead.
+      if (!c->pending_read && !c->pending_write) {
+        if (FD_ISSET(c->fd, &d->exceptfds)) {
+          // can't defer error to a read or write, close now.
+          // How to get WSAlastError() equivalent info?
+          fprintf(stderr, "connector cleanup on unknown error %s\n", c->name);
+          pn_connector_close(c);
+        }
+      }
     }
     c = c->connector_next;
   }
 
   d->listener_next = d->listener_head;
   d->connector_next = d->connector_head;
+
+  return woken ? PN_INTR : 0;
 }
 
 //
@@ -898,8 +932,7 @@ int pn_driver_wait(pn_driver_t *d, int t
     int result = pn_driver_wait_2(d, timeout);
     if (result == -1)
         return pn_error_code(d->error);
-    pn_driver_wait_3(d);
-    return 0;
+    return pn_driver_wait_3(d);
 }
 
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {
@@ -1002,4 +1035,3 @@ static int pn_socket_pair (SOCKET sv[2])
   closesocket(sock);
   return 0;
 }
-



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message