qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [5/5] qpid-proton git commit: fixed handler pattern to use simple type aliasing; added a transfer test; use 127.0.0.1 instead of localhost
Date Tue, 06 Jan 2015 19:28:38 GMT
fixed handler pattern to use simple type aliasing; added a transfer test; use 127.0.0.1 instead
of localhost


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e5e8df4f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e5e8df4f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e5e8df4f

Branch: refs/heads/master
Commit: e5e8df4f99870fd83bf74069c2b80dfaf9ee6b62
Parents: e12a2ab
Author: Rafael Schloming <rhs@alum.mit.edu>
Authored: Tue Jan 6 14:26:49 2015 -0500
Committer: Rafael Schloming <rhs@alum.mit.edu>
Committed: Tue Jan 6 14:27:51 2015 -0500

----------------------------------------------------------------------
 proton-c/include/proton/handlers.h     |   7 +-
 proton-c/include/proton/reactor.h      |   1 -
 proton-c/src/handlers/flowcontroller.c |  25 ++---
 proton-c/src/handlers/handshaker.c     |  20 ++--
 proton-c/src/reactor/connection.c      |   5 +
 proton-c/src/reactor/handler.c         |   6 +-
 proton-c/src/reactor/reactor.c         |  42 ++++-----
 proton-c/src/tests/reactor.c           | 137 ++++++++++++++++++++++++----
 8 files changed, 170 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/include/proton/handlers.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h
index d2d732b..99242d1 100644
--- a/proton-c/include/proton/handlers.h
+++ b/proton-c/include/proton/handlers.h
@@ -40,14 +40,11 @@ extern "C" {
  * @{
  */
 
-typedef struct pn_handshaker_t pn_handshaker_t;
-typedef struct pn_flowcontroller_t pn_flowcontroller_t;
+typedef pn_handler_t pn_handshaker_t;
+typedef pn_handler_t pn_flowcontroller_t;
 
 PN_EXTERN pn_handshaker_t *pn_handshaker(void);
-PN_EXTERN pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker);
-
 PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window);
-PN_EXTERN pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller);
 
 /** @}
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index 6ede7e9..f75e0aa 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -50,7 +50,6 @@ PN_EXTERN pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *,
pn_event
                                        void (*finalize)(pn_handler_t *));
 PN_EXTERN void pn_handler_free(pn_handler_t *handler);
 PN_EXTERN void *pn_handler_mem(pn_handler_t *handler);
-PN_EXTERN pn_handler_t *pn_handler_cast(void *mem);
 PN_EXTERN void pn_handler_add(pn_handler_t *handler, pn_handler_t *child);
 PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/handlers/flowcontroller.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/flowcontroller.c b/proton-c/src/handlers/flowcontroller.c
index 2a164fe..903dfe4 100644
--- a/proton-c/src/handlers/flowcontroller.c
+++ b/proton-c/src/handlers/flowcontroller.c
@@ -23,9 +23,13 @@
 #include <proton/handlers.h>
 #include <assert.h>
 
-struct pn_flowcontroller_t {
+typedef struct {
   int window;
-};
+} pni_flowcontroller_t;
+
+pni_flowcontroller_t *pni_flowcontroller(pn_handler_t *handler) {
+  return (pni_flowcontroller_t *) pn_handler_mem(handler);
+}
 
 static void pni_topup(pn_link_t *link, int window) {
   int delta = window - pn_link_credit(link);
@@ -33,7 +37,7 @@ static void pni_topup(pn_link_t *link, int window) {
 }
 
 static void pn_flowcontroller_dispatch(pn_handler_t *handler, pn_event_t *event) {
-  pn_flowcontroller_t *fc = (pn_flowcontroller_t *) pn_handler_mem(handler);
+  pni_flowcontroller_t *fc = pni_flowcontroller(handler);
   int window = fc->window;
 
   switch (pn_event_type(event)) {
@@ -54,12 +58,11 @@ static void pn_flowcontroller_dispatch(pn_handler_t *handler, pn_event_t
*event)
 }
 
 pn_flowcontroller_t *pn_flowcontroller(int window) {
-  pn_handler_t *handler = pn_handler_new(pn_flowcontroller_dispatch, sizeof(pn_flowcontroller_t),
NULL);
-  pn_flowcontroller_t *flowcontroller = (pn_flowcontroller_t *) pn_handler_mem(handler);
-  flowcontroller->window = window;
-  return flowcontroller;
-}
-
-pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller) {
-  return pn_handler_cast(flowcontroller);
+  // XXX: a window of 1 doesn't work because we won't necessarily get
+  // notified when the one allowed delivery is settled
+  assert(window > 1);
+  pn_flowcontroller_t *handler = pn_handler_new(pn_flowcontroller_dispatch, sizeof(pni_flowcontroller_t),
NULL);
+  pni_flowcontroller_t *fc = pni_flowcontroller(handler);
+  fc->window = window;
+  return handler;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/handlers/handshaker.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/handshaker.c b/proton-c/src/handlers/handshaker.c
index 647867a..2c5cd12 100644
--- a/proton-c/src/handlers/handshaker.c
+++ b/proton-c/src/handlers/handshaker.c
@@ -25,12 +25,16 @@
 #include <proton/handlers.h>
 #include <assert.h>
 
-struct pn_handshaker_t {
+typedef struct {
   pn_map_t *handlers;
-};
+} pni_handshaker_t;
+
+pni_handshaker_t *pni_handshaker(pn_handler_t *handler) {
+  return (pni_handshaker_t *) pn_handler_mem(handler);
+}
 
 static void pn_handshaker_finalize(pn_handler_t *handler) {
-  pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler);
+  pni_handshaker_t *handshaker = pni_handshaker(handler);
   pn_free(handshaker->handlers);
 }
 
@@ -92,12 +96,8 @@ static void pn_handshaker_dispatch(pn_handler_t *handler, pn_event_t *event)
{
 }
 
 pn_handshaker_t *pn_handshaker(void) {
-  pn_handler_t *handler = pn_handler_new(pn_handshaker_dispatch, sizeof(pn_handshaker_t),
pn_handshaker_finalize);
-  pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler);
+  pn_handler_t *handler = pn_handler_new(pn_handshaker_dispatch, sizeof(pni_handshaker_t),
pn_handshaker_finalize);
+  pni_handshaker_t *handshaker = pni_handshaker(handler);
   handshaker->handlers = NULL;
-  return handshaker;
-}
-
-pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker) {
-  return pn_handler_cast(handshaker);
+  return handler;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
index b917946..7bce15a 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -48,7 +48,12 @@ pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor,
pn_socke
 void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) {
   assert(reactor);
   assert(event);
+
   pn_connection_t *conn = pn_event_connection(event);
+  if (!(pn_connection_state(conn) & PN_REMOTE_UNINIT)) {
+    return;
+  }
+
   pn_transport_t *transport = pn_transport();
   pn_sasl_t *sasl = pn_sasl(transport);
   pn_sasl_mechanisms(sasl, "ANONYMOUS");

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/reactor/handler.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/handler.c b/proton-c/src/reactor/handler.c
index cbbbb5d..d35a5ed 100644
--- a/proton-c/src/reactor/handler.c
+++ b/proton-c/src/reactor/handler.c
@@ -22,6 +22,7 @@
 #include <proton/object.h>
 #include <proton/reactor.h>
 #include <proton/event.h>
+#include <string.h>
 #include <assert.h>
 
 struct pn_handler_t {
@@ -57,6 +58,7 @@ pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t
*), siz
   pn_handler_t *handler = (pn_handler_t *) pn_class_new(&clazz, sizeof(pn_handler_t)
+ size);
   handler->dispatch = dispatch;
   handler->finalize = finalize;
+  memset(pn_handler_mem(handler), 0, size);
   return handler;
 }
 
@@ -78,10 +80,6 @@ void *pn_handler_mem(pn_handler_t *handler) {
   return (void *) (handler + 1);
 }
 
-pn_handler_t *pn_handler_cast(void *mem) {
-  return ((pn_handler_t *) mem) - 1;
-}
-
 void pn_handler_add(pn_handler_t *handler, pn_handler_t *child) {
   if (!handler->children) {
     handler->children = pn_list(PN_OBJECT, 0);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 46fbca9..141c2a7 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -154,36 +154,35 @@ static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event)
{
   }
 }
 
-pn_record_t *pni_attachments(const pn_class_t *clazz, void *instance) {
-  switch (pn_class_id(clazz)) {
-  case CID_pn_connection:
-    return pn_connection_attachments((pn_connection_t *) instance);
-  case CID_pn_session:
-    return pn_session_attachments((pn_session_t *) instance);
-  case CID_pn_link:
-    return pn_link_attachments((pn_link_t *) instance);
-  default:
-    return NULL;
-  }
+pn_handler_t *pni_record_get_handler(pn_record_t *record) {
+  return (pn_handler_t *) pn_record_get(record, PN_HANDLER);
 }
 
-pn_handler_t *pn_event_handler(pn_event_t *event) {
-  pn_record_t *record = pni_attachments(pn_event_class(event), pn_event_context(event));
-  if (record) {
-    return (pn_handler_t *) pn_record_get(record, PN_HANDLER);
-  } else {
-    return NULL;
+pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) {
+  pn_handler_t *handler = NULL;
+  pn_link_t *link = pn_event_link(event);
+  if (link) {
+    handler = pni_record_get_handler(pn_link_attachments(link));
+    if (handler) { return handler; }
+  }
+  pn_session_t *session = pn_event_session(event);
+  if (session) {
+    handler = pni_record_get_handler(pn_session_attachments(session));
+    if (handler) { return handler; }
   }
+  pn_connection_t *connection = pn_event_connection(event);
+  if (connection) {
+    handler = pni_record_get_handler(pn_connection_attachments(connection));
+    if (handler) { return handler; }
+  }
+  return default_handler;
 }
 
 void pn_reactor_process(pn_reactor_t *reactor) {
   assert(reactor);
   pn_event_t *event;
   while ((event = pn_collector_peek(reactor->collector))) {
-    pn_handler_t *handler = pn_event_handler(event);
-    if (!handler) {
-      handler = reactor->handler;
-    }
+    pn_handler_t *handler = pn_event_handler(event, reactor->handler);
     pn_handler_dispatch(handler, event);
     pni_reactor_dispatch(reactor, event);
     pn_collector_pop(reactor->collector);
@@ -237,4 +236,3 @@ void pn_reactor_run(pn_reactor_t *reactor) {
   while (pn_reactor_work(reactor, 1000)) {}
   pn_reactor_stop(reactor);
 }
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e5e8df4f/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index bb93d42..3d932ea 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -21,8 +21,9 @@
 
 #include <proton/reactor.h>
 #include <proton/handlers.h>
-#include <proton/connection.h>
 #include <proton/event.h>
+#include <proton/connection.h>
+#include <proton/session.h>
 #include <proton/link.h>
 #include <proton/delivery.h>
 #include <stdlib.h>
@@ -209,11 +210,16 @@ static server_t *smem(pn_handler_t *handler) {
   return (server_t *) pn_handler_mem(handler);
 }
 
+#include <stdio.h>
+
 static void server_dispatch(pn_handler_t *handler, pn_event_t *event) {
   server_t *srv = smem(handler);
   pn_list_add(srv->events, (void *) pn_event_type(event));
   switch (pn_event_type(event)) {
   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_open(pn_event_connection(event));
+    break;
+  case PN_CONNECTION_REMOTE_CLOSE:
     pn_acceptor_close(srv->reactor, srv->acceptor);
     pn_connection_close(pn_event_connection(event));
     pn_connection_release(pn_event_connection(event));
@@ -237,11 +243,13 @@ static void client_dispatch(pn_handler_t *handler, pn_event_t *event)
{
   pn_connection_t *conn = pn_event_connection(event);
   switch (pn_event_type(event)) {
   case PN_CONNECTION_INIT:
-    pn_connection_set_hostname(conn, "localhost:5672");
+    pn_connection_set_hostname(conn, "127.0.0.1:5672");
     pn_connection_open(conn);
     break;
-  case PN_CONNECTION_REMOTE_CLOSE:
+  case PN_CONNECTION_REMOTE_OPEN:
     pn_connection_close(conn);
+    break;
+  case PN_CONNECTION_REMOTE_CLOSE:
     pn_connection_release(conn);
     break;
   default:
@@ -264,45 +272,129 @@ static void test_reactor_connect(void) {
   pn_reactor_connection(reactor, ch);
   pn_reactor_run(reactor);
   expect(srv->events, PN_CONNECTION_INIT, PN_CONNECTION_BOUND,
-         PN_CONNECTION_REMOTE_OPEN, PN_CONNECTION_LOCAL_CLOSE,
-         PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_UNBOUND,
-         PN_CONNECTION_FINAL, END);
+         PN_TRANSPORT, PN_CONNECTION_REMOTE_OPEN,
+         PN_CONNECTION_LOCAL_OPEN, PN_TRANSPORT,
+         PN_CONNECTION_REMOTE_CLOSE, PN_TRANSPORT_TAIL_CLOSED,
+         PN_CONNECTION_LOCAL_CLOSE, PN_TRANSPORT,
+         PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
+         PN_CONNECTION_UNBOUND, PN_CONNECTION_FINAL, END);
   pn_free(srv->events);
   expect(cli->events, PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN,
-         PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN,
-         PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_LOCAL_CLOSE,
-         PN_CONNECTION_UNBOUND, PN_CONNECTION_FINAL, END);
+         PN_CONNECTION_BOUND, PN_TRANSPORT, PN_TRANSPORT,
+         PN_CONNECTION_REMOTE_OPEN, PN_CONNECTION_LOCAL_CLOSE,
+         PN_TRANSPORT, PN_TRANSPORT_HEAD_CLOSED,
+         PN_CONNECTION_REMOTE_CLOSE, PN_TRANSPORT_TAIL_CLOSED,
+         PN_TRANSPORT_CLOSED, PN_CONNECTION_UNBOUND,
+         PN_CONNECTION_FINAL, END);
   pn_free(cli->events);
   pn_decref(ch);
   pn_reactor_free(reactor);
 }
 
-/*
-void dispatch(pn_handler_t *handler, pn_event_t *event) {
+typedef struct {
+  int received;
+} sink_t;
+
+static sink_t *sink(pn_handler_t *handler) {
+  return (sink_t *) pn_handler_mem(handler);
+}
+
+void sink_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  sink_t *snk = sink(handler);
   pn_delivery_t *dlv = pn_event_delivery(event);
   switch (pn_event_type(event)) {
   case PN_DELIVERY:
     if (!pn_delivery_partial(dlv)) {
       pn_delivery_settle(dlv);
+      snk->received++;
+    }
+    break;
+  default:
+    break;
+  }
+}
+
+typedef struct {
+  int remaining;
+} source_t;
+
+static source_t *source(pn_handler_t *handler) {
+  return (source_t *) pn_handler_mem(handler);
+}
+
+
+void source_dispatch(pn_handler_t *handler, pn_event_t *event) {
+  source_t *src = source(handler);
+  pn_connection_t *conn = pn_event_connection(event);
+  switch (pn_event_type(event)) {
+  case PN_CONNECTION_INIT:
+    {
+      pn_connection_set_hostname(conn, "127.0.0.1:5678");
+      pn_session_t *ssn = pn_session(conn);
+      pn_link_t *snd = pn_sender(ssn, "sender");
+      pn_connection_open(conn);
+      pn_session_open(ssn);
+      pn_link_open(snd);
+      // XXX: these keep the connection alive even when we release it
+      pn_decref(ssn);
+      pn_decref(snd);
     }
     break;
+  case PN_LINK_FLOW:
+    {
+      pn_link_t *link = pn_event_link(event);
+      while (pn_link_credit(link) > 0 && src->remaining > 0) {
+        pn_delivery_t *dlv = pn_delivery(link, pn_dtag("", 0));
+        assert(dlv);
+        pn_delivery_settle(dlv);
+        src->remaining--;
+      }
+
+      if (!src->remaining) {
+        pn_connection_close(conn);
+      }
+    }
+    break;
+  case PN_CONNECTION_REMOTE_CLOSE:
+    printf("rc=%i\n", pn_refcount(conn));
+    pn_connection_release(conn);
+    break;
   default:
     break;
   }
 }
 
-static void test_reactor_flow(void) {
+static void test_reactor_transfer(int count, int window) {
   pn_reactor_t *reactor = pn_reactor();
-  assert(reactor);
-  pn_handler_t *root = pn_reactor_handler(reactor);
-  assert(root);
-  pn_handler_add(root, pn_handler(dispatch));
-  pn_handler_add(root, pn_handler_cast(pn_handshaker()));
-  pn_handler_add(root, pn_handler_cast(pn_flowcontroller(4*1024)));
-  pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);
+
+  pn_handler_t *sh = pn_handler_new(server_dispatch, sizeof(server_t), NULL);
+  server_t *srv = smem(sh);
+  pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5678", sh);
+  srv->reactor = reactor;
+  srv->acceptor = acceptor;
+  srv->events = pn_list(PN_VOID, 0);
+  pn_handler_add(sh, pn_handshaker());
+  // XXX: a window of 1 doesn't work unless the flowcontroller is
+  // added after the thing that settles the delivery
+  pn_handler_add(sh, pn_flowcontroller(window));
+  pn_handler_t *snk = pn_handler_new(sink_dispatch, sizeof(sink_t), NULL);
+  sink(snk)->received = 0;
+  pn_handler_add(sh, snk);
+
+  pn_handler_t *ch = pn_handler_new(source_dispatch, sizeof(source_t), NULL);
+  source_t *src = source(ch);
+  src->remaining = count;
+  pn_reactor_connection(reactor, ch);
+
   pn_reactor_run(reactor);
+
+  assert(sink(snk)->received == count);
+
+  pn_free(srv->events);
   pn_reactor_free(reactor);
-  }*/
+  pn_handler_free(sh);
+  pn_handler_free(ch);
+}
 
 int main(int argc, char **argv)
 {
@@ -316,5 +408,10 @@ int main(int argc, char **argv)
   test_reactor_acceptor();
   test_reactor_acceptor_run();
   test_reactor_connect();
+  for (int i = 0; i < 64; i++) {
+    test_reactor_transfer(i, 2);
+  }
+  test_reactor_transfer(1024, 64);
+  test_reactor_transfer(4*1024, 1024);
   return 0;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message