qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [3/3] qpid-proton git commit: PROTON-1452: Add pn_proactor_disconnect
Date Wed, 29 Mar 2017 22:22:54 GMT
PROTON-1452: Add pn_proactor_disconnect

/**
 * Disconnect all connections and listeners currently active in the proactor.
 *
 * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
 * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
 * will be generated when all connections and listeners are disconnected.
 *
 * Note the proactor remains active, connections and listeners created after a call to
 * pn_proactor_disconnect() are not affected by it.
 *
 * @param condition if not NULL the condition data is copied to the transports and listeners.
 */
PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/edebc4ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/edebc4ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/edebc4ec

Branch: refs/heads/master
Commit: edebc4ecf693514fe3becd64bbeb1c8361b31e62
Parents: 893cb00
Author: Alan Conway <aconway@redhat.com>
Authored: Wed Mar 29 17:58:00 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed Mar 29 18:15:58 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h |  14 ++++
 proton-c/src/proactor/libuv.c      |  87 ++++++++++++++++----
 proton-c/src/tests/proactor.c      | 137 +++++++++++++++++++++++++-------
 3 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index b77feff..d13c6d6 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -60,6 +60,20 @@ typedef struct pn_proactor_addr_t pn_proactor_addr_t;
 PNP_EXTERN pn_proactor_t *pn_proactor(void);
 
 /**
+ * Disconnect all connections and listeners currently active in the proactor.
+ *
+ * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
+ * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
+ * will be generated when all connections and listeners are disconnected.
+ *
+ * Note the proactor remains active, connections and listeners created after a call to
+ * pn_proactor_disconnect() are not affected by it.
+ *
+ * @param condition if not NULL the condition data is copied to the transports and listeners.
+ */
+PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
+
+/**
  * Free the proactor. Abort any open network connections and clean up all
  * associated resources.
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 266db98..2f5e369 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,6 +240,8 @@ struct pn_proactor_t {
   timeout_state_t timeout_state;
   pn_millis_t timeout;
   size_t count;               /* connection/listener count for INACTIVE events */
+  pn_condition_t *disconnect_cond; /* disconnect condition */
+  bool disconnect;            /* disconnect requested */
   bool inactive;
   bool has_leader;
   bool batch_working;         /* batch is being processed in a worker thread */
@@ -906,6 +908,34 @@ void pconnection_detach(pconnection_t *pc) {
   }
 }
 
+static void on_proactor_disconnect(uv_handle_t* h, void* v) {
+  if (h->type == UV_TCP) {
+    switch (*(struct_type*)h->data) {
+     case T_CONNECTION: {
+       pconnection_t *pc = (pconnection_t*)h->data;
+       pn_condition_t *cond = pc->work.proactor->disconnect_cond;
+       if (cond) {
+         pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+       }
+       pn_connection_driver_close(&pc->driver);
+       work_notify(&pc->work);
+       break;
+     }
+     case T_LSOCKET: {
+       pn_listener_t *l = ((lsocket_t*)h->data)->parent;
+       pn_condition_t *cond = l->work.proactor->disconnect_cond;
+       if (cond) {
+         pn_condition_copy(pn_listener_condition(l), cond);
+       }
+       pn_listener_close(l);
+       break;
+     }
+     default:
+      break;
+    }
+  }
+}
+
 /* Process the leader_q and the UV loop, in the leader thread */
 static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
   /* Set timeout timer if there was a request, let it count down while we process work */
@@ -914,6 +944,13 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode
mode) {
     uv_timer_stop(&p->timer);
     uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
   }
+  /* If disconnect was requested, walk the socket list */
+  if (p->disconnect) {
+    p->disconnect = false;
+    uv_mutex_unlock(&p->lock);
+    uv_walk(&p->loop, on_proactor_disconnect, NULL);
+    uv_mutex_lock(&p->lock);
+  }
   pn_event_batch_t *batch = NULL;
   for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
     assert(!w->working);
@@ -1035,6 +1072,20 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
   notify(p);
 }
 
+void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
+  uv_mutex_lock(&p->lock);
+  if (!p->disconnect) {
+    p->disconnect = true;
+    if (cond) {
+      pn_condition_copy(p->disconnect_cond, cond);
+    } else {
+      pn_condition_clear(p->disconnect_cond);
+    }
+    notify(p);
+  }
+  uv_mutex_unlock(&p->lock);
+}
+
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
@@ -1045,9 +1096,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 
 void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
-  p->timeout_state = TM_NONE;
+  if (p->timeout_state != TM_NONE) {
+    p->timeout_state = TM_NONE;
+    notify(p);
+  }
   uv_mutex_unlock(&p->lock);
-  notify(p);
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
@@ -1071,20 +1124,6 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char
*addr, int
   return 0;
 }
 
-pn_proactor_t *pn_proactor() {
-  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
-  p->collector = pn_collector();
-  p->batch.next_event = &proactor_batch_next;
-  if (!p->collector) return NULL;
-  uv_loop_init(&p->loop);
-  uv_mutex_init(&p->lock);
-  uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL);
-  uv_timer_init(&p->loop, &p->timer);
-  p->timer.data = p;
-  return p;
-}
-
 static void on_proactor_free(uv_handle_t* h, void* v) {
   uv_safe_close(h, NULL);       /* Close the handle */
   if (h->type == UV_TCP) {      /* Put the corresponding work item on the leader_q for
cleanup */
@@ -1108,6 +1147,21 @@ static void work_free(work_t *w) {
   }
 }
 
+pn_proactor_t *pn_proactor() {
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
+  p->collector = pn_collector();
+  p->batch.next_event = &proactor_batch_next;
+  if (!p->collector) return NULL;
+  uv_loop_init(&p->loop);
+  uv_mutex_init(&p->lock);
+  uv_cond_init(&p->cond);
+  uv_async_init(&p->loop, &p->async, NULL);
+  uv_timer_init(&p->loop, &p->timer);
+  p->timer.data = p;
+  p->disconnect_cond = pn_condition();
+  return p;
+}
+
 void pn_proactor_free(pn_proactor_t *p) {
   /* Close all open handles */
   uv_walk(&p->loop, on_proactor_free, NULL);
@@ -1125,6 +1179,7 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
   pn_collector_free(p->collector);
+  pn_condition_free(p->disconnect_cond);
   free(p);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index e87774e..d22958c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,13 +32,10 @@
 
 static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
 
-static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+static const char *localhost = ""; /* host for connect/listen */
 
 typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
 
-/* Save the last condition description of a handled event here  */
-char last_condition[1024] = {0};
-
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
   test_handler_fn handler;
@@ -66,18 +63,22 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
+/* Set this to a pn_condition() to save condition data */
+pn_condition_t *last_condition = NULL;
+
 static void save_condition(pn_event_t *e) {
-  /* TODO aconway 2017-03-23: extend pn_event_condition to include listener */
-  last_condition[0] = '\0';
-  pn_condition_t *cond = NULL;
-  if (pn_event_listener(e)) {
-    cond = pn_listener_condition(pn_event_listener(e));
-  } else {
-    cond = pn_event_condition(e);
-  }
-  if (cond && pn_condition_is_set(cond)) {
-    const char *desc = pn_condition_get_description(cond);
-    strncpy(last_condition, desc, sizeof(last_condition));
+  if (last_condition) {
+    pn_condition_t *cond = NULL;
+    if (pn_event_listener(e)) {
+      cond = pn_listener_condition(pn_event_listener(e));
+    } else {
+      cond = pn_event_condition(e);
+    }
+    if (cond) {
+      pn_condition_copy(last_condition, cond);
+    } else {
+      pn_condition_clear(last_condition);
+    }
   }
 }
 
@@ -85,6 +86,7 @@ static void save_condition(pn_event_t *e) {
  * all proactors return NULL
  */
 static pn_event_type_t proactor_test_get(proactor_test_t *pts, size_t n) {
+  if (last_condition) pn_condition_clear(last_condition);
   while (true) {
     bool busy = false;
     for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
@@ -343,14 +345,14 @@ static void test_errors(test_t *t) {
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
   pn_listener_free(l);
 
@@ -358,7 +360,7 @@ static void test_errors(test_t *t) {
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
   if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
-    TEST_STR_IN(t, "connection refused", last_condition);
+    TEST_STR_IN(t, "connection refused", pn_condition_get_description(last_condition));
     TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
     sock_close(port.sock);
     PROACTOR_TEST_FREE(pts);
@@ -380,7 +382,7 @@ static void test_ipv4_ipv6(test_t *t) {
   pn_event_type_t e = PROACTOR_TEST_GET(pts);
   bool has_ipv6 = (e != PN_LISTENER_CLOSE);
   if (!has_ipv6) {
-    TEST_LOGF(t, "skip IPv6 tests: %s", last_condition);
+    TEST_LOGF(t, "skip IPv6 tests: %s", pn_condition_get_description(last_condition));
   }
   PROACTOR_TEST_DRAIN(pts);
 
@@ -390,7 +392,7 @@ static void test_ipv4_ipv6(test_t *t) {
   pn_proactor_listen(server, l4, port4.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port4.sock);
-  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  pn_condition_get_description(last_condition));
   PROACTOR_TEST_DRAIN(pts);
 
   /* Empty address listens on both IPv4 and IPv6 on all interfaces */
@@ -400,20 +402,20 @@ static void test_ipv4_ipv6(test_t *t) {
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   e = PROACTOR_TEST_GET(pts);
-  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  pn_condition_get_description(last_condition));
   PROACTOR_TEST_DRAIN(pts);
 
 #define EXPECT_CONNECT(TP, HOST) do {                                   \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST)));
\
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
-    TEST_STR_EQUAL(t, "", last_condition);                              \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));   \
+    TEST_CHECK(t, !pn_condition_is_set(last_condition));                 \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
 #define EXPECT_FAIL(TP, HOST) do {                                      \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST)));
\
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
-    TEST_STR_IN(t, "refused", last_condition);                          \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));   \
+    TEST_STR_IN(t, "refused", pn_condition_get_description(last_condition)); \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
@@ -529,9 +531,9 @@ static void test_ssl(test_t *t) {
   pn_proactor_connect(client, c, port.host_port);
   /* Open ok at both ends */
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_STR_EQUAL(t,"", last_condition);
+  TEST_CHECK(t, !pn_condition_is_set(last_condition));
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_STR_EQUAL(t, "", last_condition);
+  TEST_CHECK(t, !pn_condition_is_set(last_condition));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
 
@@ -585,8 +587,87 @@ static void test_addr(test_t *t) {
   pn_connection_free(s);
 }
 
+/* Test simple client/server connection with 2 proactors */
+static void test_disconnect(test_t *t) {
+  proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+
+  test_port_t port = test_port(localhost);
+  pn_listener_t* l = pn_listener();
+  pn_proactor_listen(server, l, port.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+
+  test_port_t port2 = test_port(localhost);
+  pn_listener_t* l2 = pn_listener();
+  pn_proactor_listen(server, l2, port2.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port2.sock);
+
+  /* We will disconnect one connection after it is remote-open */
+  pn_proactor_connect(client, pn_connection(), port.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_proactor_connect(client, pn_connection(), port2.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+
+  pn_condition_t *cond = pn_condition();
+  pn_condition_set_name(cond, "test-name");
+  pn_condition_set_description(cond, "test-description");
+
+  pn_proactor_disconnect(client, cond);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+  /* Note: pn_transport adds "(connection aborted)" on client side if transport closed early.
*/
+  TEST_STR_EQUAL(t, "test-description (connection aborted)", pn_condition_get_description(last_condition));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  pn_proactor_disconnect(server, cond);
+  int expect_tclose = 2, expect_lclose = 2;
+  while (expect_tclose || expect_lclose) {
+    pn_event_type_t et = PROACTOR_TEST_RUN(pts);
+    switch (et) {
+     case PN_TRANSPORT_CLOSED:
+      TEST_CHECK(t, --expect_tclose >= 0);
+        TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+        TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+      break;
+     case PN_LISTENER_CLOSE:
+      TEST_CHECK(t, --expect_lclose >= 0);
+      TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+      TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+      break;
+     default:
+      TEST_ERRORF(t, "%s unexpected: want %d TRANSPORT_CLOSED, %d LISTENER_CLOSE",
+                  pn_event_type_name(et), expect_tclose, expect_lclose);
+      expect_lclose = expect_tclose = 0;
+      continue;
+    }
+  }
+
+  pn_condition_free(cond);
+  pn_listener_free(l);
+  pn_listener_free(l2);
+
+  /* Make sure the proactors are still functional */
+  test_port_t port3 = test_port(localhost);
+  pn_listener_t* l3 = pn_listener();
+  pn_proactor_listen(server, l3, port3.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port3.sock);
+  pn_proactor_connect(client, pn_connection(), port3.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_proactor_disconnect(client, NULL);
+
+  PROACTOR_TEST_DRAIN(pts);     /* Drain will  */
+  PROACTOR_TEST_FREE(pts);
+  pn_listener_free(l3);
+}
+
 int main(int argc, char **argv) {
   int failed = 0;
+  last_condition = pn_condition();
   RUN_ARGV_TEST(failed, t, test_inactive(&t));
   RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
   RUN_ARGV_TEST(failed, t, test_errors(&t));
@@ -596,5 +677,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
   RUN_ARGV_TEST(failed, t, test_addr(&t));
+  RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+  pn_condition_free(last_condition);
   return failed;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message