qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-dispatch git commit: DISPATCH-774: modify HTTP code to work with proton 0.17
Date Thu, 25 May 2017 18:02:50 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/0.8.x d4a6cc538 -> 3d2ea0ef5


DISPATCH-774: modify HTTP code to work with proton 0.17


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3d2ea0ef
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3d2ea0ef
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3d2ea0ef

Branch: refs/heads/0.8.x
Commit: 3d2ea0ef5accb69393aed2f6afb15ba73a5dc9a6
Parents: d4a6cc5
Author: Alan Conway <aconway@redhat.com>
Authored: Thu May 25 13:59:36 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Thu May 25 13:59:36 2017 -0400

----------------------------------------------------------------------
 src/http-libwebsockets.c | 70 +++++++++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3d2ea0ef/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index dc8ff58..96734f8 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -22,7 +22,9 @@
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/timer.h>
 
-#include <proton/connection_driver.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/transport.h>
 
 #include <libwebsockets.h>
 
@@ -90,8 +92,8 @@ static void buffer_set_size(buffer_t *buf, size_t size) {
 
 /* AMQPWS connection: set as lws user data and qd_conn->context */
 struct qd_http_connection_t {
-    pn_connection_driver_t driver;
-    qd_connection_t* qd_conn;
+    qd_connection_t *qd_conn;
+    pn_transport_t *transport;
     buffer_t wbuf;   /* LWS requires allocated header space at start of buffer */
     struct lws *wsi;
     char name[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port */
@@ -150,10 +152,11 @@ static int handle_events(qd_http_connection_t* c) {
         return unexpected_close(c->wsi, "not-established");
     }
     qd_connection_process(c->qd_conn);
-    if (pn_connection_driver_write_buffer(&c->driver).size) {
-        lws_callback_on_writable(c->wsi);
+    if (pn_transport_pending(c->transport) > 0) {
+         lws_callback_on_writable(c->wsi);
     }
-    if (pn_connection_driver_finished(&c->driver)) {
+    bool has_event = pn_collector_peek(pn_connection_collector(c->qd_conn->pn_conn));
+    if (pn_transport_closed(c->transport) && !has_event) {
         lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
         c->closed = true;
         qd_connection_process(c->qd_conn);
@@ -395,10 +398,18 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
         c->qd_conn->listener = hl->listener;
         lws_get_peer_simple(wsi, c->hostip, sizeof(c->hostip));
         strncpy(c->name, c->hostip, sizeof(c->name));
-        int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn,
NULL);
-        if (err) {
-            return unexpected_close(c->wsi, pn_code(err));
+
+        c->qd_conn->pn_conn = pn_connection();
+        c->transport = pn_transport();
+        c->qd_conn->collector = pn_collector();
+        if (!c->qd_conn->pn_conn || !c->transport || !c->qd_conn->collector)
{
+            if (c->qd_conn->pn_conn) pn_connection_free(c->qd_conn->pn_conn);
+            if (c->transport) pn_transport_free(c->transport);
+            if (c->qd_conn->collector) pn_collector_free(c->qd_conn->collector);
+            return unexpected_close(c->wsi, "out of memory");
         }
+        pn_connection_collect(c->qd_conn->pn_conn, c->qd_conn->collector);
+
         c->qd_conn->http = c;
         c->qd_conn->server        = hs->server;
         c->qd_conn->connection_id = qd_server_connection_id(c->qd_conn->server);
@@ -406,24 +417,23 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
         c->qd_conn->policy_counted = false;
         const qd_server_config_t *config = hl->listener->config;
         c->qd_conn->role = strdup(config->role);
-        c->qd_conn->pn_conn = c->driver.connection;
         pn_connection_set_context(c->qd_conn->pn_conn, ctx);
-        c->qd_conn->collector = c->driver.collector;
         qd_server_decorate_connection(c->qd_conn->server, c->qd_conn->pn_conn,
config);
 
         qd_log(hs->log, QD_LOG_DEBUG,
                "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
                qd_connection_connection_id(c->qd_conn), c->hostip);
-        pn_connection_driver_bind(&c->driver);
+        pn_transport_bind(c->transport, c->qd_conn->pn_conn);
         return handle_events(c);
     }
 
     case LWS_CALLBACK_SERVER_WRITEABLE: {
         if (handle_events(c)) return -1;
-        pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
-        if (dbuf.size) {
+        ssize_t pending = pn_transport_pending(c->transport);
+        if (pending > 0) {
+            pn_bytes_t dbuf = pn_bytes(pending, pn_transport_head(c->transport));
             /* lws_write() demands LWS_PRE bytes of free space before the data,
-             * so we must copy from the driver's buffer to larger temporary wbuf
+             * so we must copy from the transport buffer to larger temporary wbuf
              */
             buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
             if (c->wbuf.start == NULL) {
@@ -433,10 +443,10 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
             memcpy(buf, dbuf.start, dbuf.size);
             ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
             if (wrote < 0) {
-                pn_connection_driver_write_close(&c->driver);
+                pn_transport_close_head(c->transport);
                 return unexpected_close(c->wsi, "write-error");
             } else {
-                pn_connection_driver_write_done(&c->driver, wrote);
+                pn_transport_pop(c->transport, wrote);
             }
         }
         return handle_events(c);
@@ -445,13 +455,14 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
     case LWS_CALLBACK_RECEIVE: {
         while (len > 0) {
             if (handle_events(c)) return -1;
-            pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
-            if (dbuf.size == 0) {
-                return unexpected_close(c->wsi, "unexpected-data");
+            ssize_t cap = pn_transport_capacity(c->transport);
+            if (cap <= 0) {
+                return unexpected_close(c->wsi, "unexpected-close");
             }
+            pn_rwbytes_t dbuf = pn_rwbytes(cap, pn_transport_tail(c->transport));
             size_t copy = (len < dbuf.size) ? len : dbuf.size;
             memcpy(dbuf.start, in, copy);
-            pn_connection_driver_read_done(&c->driver, copy);
+            pn_transport_process(c->transport, copy);
             len -= copy;
             in = (char*)in + copy;
         }
@@ -459,7 +470,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
     }
 
     case LWS_CALLBACK_USER: {
-        pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+        pn_timestamp_t next_tick = pn_transport_tick(c->transport, hs->now);
         if (next_tick && next_tick > hs->now && next_tick < hs->next_tick)
{
             hs->next_tick = next_tick;
         }
@@ -467,20 +478,21 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons
reason,
     }
 
     case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
-        pn_connection_driver_read_close(&c->driver);
+        pn_transport_close_tail(c->transport);
         return handle_events(c);
     }
 
     case LWS_CALLBACK_CLOSED: {
         qd_log(wsi_log(wsi), QD_LOG_DEBUG, "HTTP connection closed to %s from %s",
                wsi_listener(wsi)->host_port, c->name);
-        if (c->driver.transport) {
-            pn_connection_driver_close(&c->driver);
+        if (c->transport) {
+            pn_transport_close_tail(c->transport);
+            pn_transport_close_head(c->transport);
             handle_events(c);
         }
-        pn_connection_driver_destroy(&c->driver);
-        c->qd_conn->pn_conn = NULL;
-        c->qd_conn->collector = NULL;
+        pn_transport_free(c->transport);
+        pn_connection_free(c->qd_conn->pn_conn);
+        pn_collector_free(c->qd_conn->collector);
         qd_connection_free(c->qd_conn);
         free(c->wbuf.start);
         return -1;
@@ -539,7 +551,7 @@ static void* http_thread_run(void* v) {
                 break;
             case W_WAKE: {
                 qd_http_connection_t *c = w.value;
-                pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+                pn_collector_put(c->qd_conn->collector, PN_OBJECT, c->qd_conn->pn_conn,
                                  PN_CONNECTION_WAKE);
                 handle_events(c);
                 break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message