qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1333552 - in /qpid/proton/trunk/proton-c: bindings/php/examples/client.php bindings/php/examples/server.php include/proton/driver.h src/dispatcher/dispatcher.c src/dispatcher/dispatcher.h src/driver.c src/engine/engine.c src/sasl/sasl.c
Date Thu, 03 May 2012 17:06:48 GMT
Author: rhs
Date: Thu May  3 17:06:47 2012
New Revision: 1333552

URL: http://svn.apache.org/viewvc?rev=1333552&view=rev
Log:
added arg checking; fixed shutdown for sasl failure; removed pn_connector_eos; fixed driver
bug

Modified:
    qpid/proton/trunk/proton-c/bindings/php/examples/client.php
    qpid/proton/trunk/proton-c/bindings/php/examples/server.php
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/sasl/sasl.c

Modified: qpid/proton/trunk/proton-c/bindings/php/examples/client.php
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/examples/client.php?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/examples/client.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/examples/client.php Thu May  3 17:06:47 2012
@@ -21,14 +21,12 @@ $handler = function($c) {
   $sasl = pn_connector_sasl($c);
   switch (pn_sasl_state($sasl)) {
     case PN_SASL_CONF:
+    case PN_SASL_STEP:
+    case PN_SASL_IDLE:
     case PN_SASL_FAIL:
-      pn_connector_eos($c);
       return;
     case PN_SASL_PASS:
       break;
-    case PN_SASL_STEP:
-    case PN_SASL_IDLE:
-      return;
   }
 
   global $count, $counter, $sent, $rcvd;
@@ -88,7 +86,9 @@ $handler = function($c) {
 
     if (pn_updated($delivery)) {
       // the disposition was updated, let's report it and settle the delivery
-      //print("disposition for $tag: " . pn_remote_disp($delivery) . "\n");
+      print "disposition for $tag: " .
+        pn_local_disp($delivery) . " " .
+        pn_remote_disp($delivery) . "\n";
       // we could clear the updated flag if we didn't want to settle
       // pn_clear($delivery);
       pn_settle($delivery);

Modified: qpid/proton/trunk/proton-c/bindings/php/examples/server.php
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/examples/server.php?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/examples/server.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/examples/server.php Thu May  3 17:06:47 2012
@@ -37,12 +37,10 @@ $handler = function($c) {
       }
       break;
     case PN_SASL_FAIL:
-      pn_connector_eos($c);
+    case PN_SASL_IDLE:
       return;
     case PN_SASL_PASS:
       break;
-    case PN_SASL_IDLE:
-      return;
     }
   }
 
@@ -173,7 +171,6 @@ while (TRUE) {
   // cycle through all listeners with I/O activity
   while ($l = pn_driver_listener($driver)) {
     $c = pn_listener_accept($l);
-    print("listener $l -> $c\n");
     pn_connector_set_context($c, $handler);
   }
 

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Thu May  3 17:06:47 2012
@@ -57,7 +57,6 @@ pn_sasl_t *pn_connector_sasl(pn_connecto
 pn_connection_t *pn_connector_connection(pn_connector_t *ctor);
 void *pn_connector_context(pn_connector_t *ctor);
 void pn_connector_set_context(pn_connector_t *ctor, void *context);
-void pn_connector_eos(pn_connector_t *ctor);
 void pn_connector_close(pn_connector_t *ctor);
 bool pn_connector_closed(pn_connector_t *ctor);
 void pn_connector_destroy(pn_connector_t *ctor);

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Thu May  3 17:06:47 2012
@@ -48,6 +48,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
   disp->available = 0;
 
   disp->halt = false;
+  disp->batch = true;
 
   return disp;
 }
@@ -127,7 +128,7 @@ ssize_t pn_dispatcher_input(pn_dispatche
       available -= n;
       read += n;
 
-      if (disp->halt) break;
+      if (!disp->batch) break;
     } else {
       break;
     }

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Thu May  3 17:06:47 2012
@@ -50,6 +50,7 @@ struct pn_dispatcher_t {
   char *output;
   void *context;
   bool halt;
+  bool batch;
   char scratch[SCRATCH];
 };
 

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Thu May  3 17:06:47 2012
@@ -61,6 +61,7 @@ struct pn_listener_t {
   pn_listener_t *next;
   pn_listener_t *prev;
   int idx;
+  bool pending;
   int fd;
   void *context;
 };
@@ -72,6 +73,9 @@ struct pn_connector_t {
   pn_connector_t *next;
   pn_connector_t *prev;
   int idx;
+  bool pending_tick;
+  bool pending_read;
+  bool pending_write;
   int fd;
   int status;
   bool closed;
@@ -101,6 +105,7 @@ struct pn_connector_t {
 
 static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l)
 {
+  if (!l->driver) return;
   LL_ADD(d->listener_head, d->listener_tail, l);
   l->driver = d;
   d->listener_count++;
@@ -108,6 +113,8 @@ static void pn_driver_add_listener(pn_dr
 
 static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l)
 {
+  if (!l->driver) return;
+
   if (l == d->listener_next) {
     d->listener_next = l->next;
   }
@@ -155,14 +162,17 @@ pn_listener_t *pn_listener(pn_driver_t *
 
 pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context)
 {
+  if (!driver) return NULL;
+
   pn_listener_t *l = malloc(sizeof(pn_listener_t));
   if (!l) return NULL;
   l->driver = driver;
   l->next = NULL;
   l->prev = NULL;
+  l->idx = 0;
+  l->pending = false;
   l->fd = fd;
   l->context = context;
-  l->idx = 0;
 
   pn_driver_add_listener(driver, l);
   return l;
@@ -188,9 +198,7 @@ static void pn_configure_sock(int sock) 
 
 pn_connector_t *pn_listener_accept(pn_listener_t *l)
 {
-  if (!(l->idx && l->driver && l->driver->fds[l->idx].revents
& POLLIN)) {
-    return NULL;
-  }
+  if (!l || !l->pending) return NULL;
 
   struct sockaddr_in addr = {0};
   addr.sin_family = AF_INET;
@@ -237,6 +245,7 @@ void pn_listener_destroy(pn_listener_t *
 
 static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c)
 {
+  if (!c->driver) return;
   LL_ADD(d->connector_head, d->connector_tail, c);
   c->driver = d;
   d->connector_count++;
@@ -244,6 +253,8 @@ static void pn_driver_add_connector(pn_d
 
 static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c)
 {
+  if (!c->driver) return;
+
   if (c == d->connector_next) {
     d->connector_next = c->next;
   }
@@ -256,6 +267,8 @@ static void pn_driver_remove_connector(p
 pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
                              const char *port, void *context)
 {
+  if (!driver) return NULL;
+
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
   if (code) {
@@ -293,15 +306,20 @@ static ssize_t pn_connector_write_sasl_h
 static ssize_t pn_connector_write_sasl(pn_connector_t *ctor);
 static ssize_t pn_connector_write_amqp_header(pn_connector_t *ctor);
 static ssize_t pn_connector_write_amqp(pn_connector_t *ctor);
-static ssize_t pn_connector_write_eos(pn_connector_t *ctor);
 
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context)
 {
+  if (!driver) return NULL;
+
   pn_connector_t *c = malloc(sizeof(pn_connector_t));
   if (!c) return NULL;
   c->driver = driver;
   c->next = NULL;
   c->prev = NULL;
+  c->pending_tick = false;
+  c->pending_read = false;
+  c->pending_write = false;
+  c->idx = 0;
   c->fd = fd;
   c->status = PN_SEL_RD | PN_SEL_WR;
   c->closed = false;
@@ -321,7 +339,6 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->output_done = false;
   c->context = context;
   c->listener = NULL;
-  c->idx = 0;
 
   pn_connector_trace(c, driver->trace);
 
@@ -361,13 +378,6 @@ pn_listener_t *pn_connector_listener(pn_
   return ctor ? ctor->listener : NULL;
 }
 
-void pn_connector_eos(pn_connector_t *ctor)
-{
-  if (!ctor) return;
-
-  ctor->process_input = pn_connector_write_eos;
-}
-
 void pn_connector_close(pn_connector_t *ctor)
 {
   // XXX: should probably signal engine and callback here
@@ -585,11 +595,6 @@ static ssize_t pn_connector_write_amqp(p
   return pn_output(transport, pn_connector_output(ctor), pn_connector_available(ctor));
 }
 
-static ssize_t pn_connector_write_eos(pn_connector_t *ctor)
-{
-  return PN_EOS;
-}
-
 static time_t pn_connector_tick(pn_connector_t *ctor, time_t now)
 {
   // XXX: should probably have a function pointer for this and switch it with different layers
@@ -601,18 +606,21 @@ static time_t pn_connector_tick(pn_conne
 
 void pn_connector_process(pn_connector_t *c) {
   if (c) {
-    int idx = c->idx;
-    if (!idx) return;
-    pn_driver_t *d = c->driver;
-    if (d->fds[idx].revents & POLLIN) {
+    if (c->pending_tick) {
+      // XXX: should handle timing also
+      c->tick(c, 0);
+      c->pending_tick = false;
+    }
+
+    if (c->pending_read) {
       c->read(c);
-      d->fds[idx].revents &= ~POLLIN;
+      c->pending_read = false;
     }
     pn_connector_process_input(c);
     pn_connector_process_output(c);
-    if (d->fds[idx].revents & POLLOUT) {
+    if (c->pending_write) {
       c->write(c);
-      d->fds[idx].revents &= ~POLLOUT;
+      c->pending_write = false;
     }
     if (c->output_size == 0 && c->input_done && c->output_done)
{
       fprintf(stderr, "closed\n");
@@ -672,7 +680,9 @@ void pn_driver_destroy(pn_driver_t *d)
 
 void pn_driver_wakeup(pn_driver_t *d)
 {
-  write(d->ctrl[1], "x", 1);
+  if (d) {
+    write(d->ctrl[1], "x", 1);
+  }
 }
 
 static void pn_driver_rebuild(pn_driver_t *d)
@@ -713,18 +723,6 @@ static void pn_driver_rebuild(pn_driver_
 void pn_driver_wait(pn_driver_t *d, int timeout) {
   pn_driver_rebuild(d);
 
-  pn_connector_t *c = d->connector_head;
-  while (c) {
-    // XXX: should do this in process
-    // XXX: should handle timing also
-    c->tick(c, 0);
-    c = c->next;
-  }
-
-  // XXX: double rebuild necessary now due to separating of read/write
-  // and processing
-  pn_driver_rebuild(d);
-
   DIE_IFE(poll(d->fds, 1 + d->listener_count + d->connector_count, timeout));
 
   if (d->fds[0].revents & POLLIN) {
@@ -740,21 +738,35 @@ void pn_driver_wait(pn_driver_t *d, int 
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {
   if (!d) return NULL;
 
-  pn_listener_t *l = d->listener_next;
-  if (!l) return NULL;
+  while (d->listener_next) {
+    pn_listener_t *l = d->listener_next;
+    d->listener_next = l->next;
 
-  if (!(l->idx && d->fds[l->idx].revents & POLLIN)) {
-    return NULL;
+    l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
+
+    if (l->pending) {
+      return l;
+    }
   }
 
-  d->listener_next = l->next;
-  return l;
+  return NULL;
 }
 
 pn_connector_t *pn_driver_connector(pn_driver_t *d) {
   if (!d) return NULL;
 
-  pn_connector_t *c = d->connector_next;
-  if (c) { d->connector_next = c->next; }
-  return c;
+  while (d->connector_next) {
+    pn_connector_t *c = d->connector_next;
+    d->connector_next = c->next;
+
+    int idx = c->idx;
+    c->pending_read = (idx && d->fds[idx].revents & POLLIN);
+    c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+
+    if (c->pending_read || c->pending_write || c->pending_tick) {
+      return c;
+    }
+  }
+
+  return NULL;
 }

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu May  3 17:06:47 2012
@@ -419,12 +419,14 @@ pn_error_t *pn_connection_error(pn_conne
 
 void pn_connection_set_container(pn_connection_t *connection, const char *container)
 {
+  if (!connection) return;
   if (connection->container) free(connection->container);
   connection->container = strdup(container);
 }
 
 void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname)
 {
+  if (!connection) return;
   if (connection->hostname) free(connection->hostname);
   connection->hostname = strdup(hostname);
 }

Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1333552&r1=1333551&r2=1333552&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Thu May  3 17:06:47 2012
@@ -58,6 +58,7 @@ pn_sasl_t *pn_sasl()
 {
   pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
   sasl->disp = pn_dispatcher(1, sasl);
+  sasl->disp->batch = false;
 
   pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
   pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
@@ -277,20 +278,28 @@ void pn_sasl_process(pn_sasl_t *sasl)
     pn_server_done(sasl);
     sasl->sent_done = true;
     sasl->rcvd_done = true;
+    sasl->disp->halt = true;
   }
 }
 
 ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
 {
+  ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available);
+  if (n < 0) return n;
+
   if (sasl->rcvd_done) {
     if (pn_sasl_state(sasl) == PN_SASL_PASS) {
-      return PN_EOS;
+      if (n) {
+        return n;
+      } else {
+        return PN_EOS;
+      }
     } else {
       // XXX: should probably do something better here
       return PN_ERR;
     }
   } else {
-    return pn_dispatcher_input(sasl->disp, bytes, available);
+    return n;
   }
 }
 
@@ -341,4 +350,5 @@ void pn_do_outcome(pn_dispatcher_t *disp
   sasl->outcome = pn_to_uint8(pn_list_get(disp->args, SASL_OUTCOME_CODE));
   sasl->rcvd_done = true;
   sasl->sent_done = true;
+  disp->halt = true;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message