qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-1512: aborted messages: handle credit and empty messages.
Date Tue, 03 Oct 2017 19:37:44 GMT
PROTON-1512: aborted messages: handle credit and empty messages.

The AMQP spec says: "The delivery-count is initialized by the sender when a link
endpoint is created, and is incremented whenever a message is sent."
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control

We interpret this to mean when the *first* frame of a message is sent/received,
so if any frames of an aborted message are sent, it consumes a credit and
increases the delivery count exactly like a successful message.

However: if a proton sender calls pn_delivery_abort() *before* any frames have
been sent, the delivery is dropped locally, no frames sent, no link-state
updates. This means pn_delivery_abort() will either cancel a local,
partially-constructed message with no side effects, or send an aborted=true
frame as required. Proton will never send a message consisting of a single
aborted frame.

A proton receiver will handle a message consisting of a single aborted frame
correctly - i.e. do credit calculations but consider the delivery aborted.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b36b70c2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b36b70c2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b36b70c2

Branch: refs/heads/master
Commit: b36b70c2ab9ce19b7d56b2e6c9ca9b2383e369bf
Parents: c015ceb
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Sep 29 18:07:01 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Tue Oct 3 14:56:58 2017 -0400

----------------------------------------------------------------------
 examples/c/broker.c                    |  19 +--
 examples/c/direct.c                    |  15 +-
 examples/c/example_test.py             |  21 +--
 examples/c/receive.c                   |  13 +-
 proton-c/include/proton/delivery.h     |  30 ++--
 proton-c/src/core/engine.c             |  51 +++----
 proton-c/src/core/transport.c          |  42 +++---
 proton-c/src/tests/CMakeLists.txt      |   1 -
 proton-c/src/tests/connection_driver.c | 206 +++++++++++++++++++++-------
 proton-c/src/tests/test_handler.h      |   9 +-
 proton-c/src/tests/test_tools.h        |   3 +
 11 files changed, 265 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index b509910..85bebf8 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -262,7 +262,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   }
 }
 
-const int WINDOW=10;            /* Incoming credit window */
+const int WINDOW=5; /* Very small incoming credit window, to show flow control in action
*/
 
 static void handle(broker_t* b, pn_event_t* e) {
   pn_connection_t *c = pn_event_connection(e);
@@ -338,14 +338,17 @@ static void handle(broker_t* b, pn_event_t* e) {
        pn_link_t *l = pn_delivery_link(d);
        size_t size = pn_delivery_pending(d);
        pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */
-       int err;
        m->size += size;
        m->start = (char*)realloc(m->start, m->size);
-       err = pn_link_recv(l, m->start, m->size);
-       if (err < 0 && err != PN_EOS) {
-         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+       int recv = pn_link_recv(l, m->start, m->size);
+       if (recv == PN_ABORTED) { /*  */
+         fprintf(stderr, "Message aborted\n");
+         m->size = 0;           /* Forget the data we accumulated */
          pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
-         m->size = 0;           /* forget the data we accumulated */
+         pn_link_flow(l, WINDOW - pn_link_credit(l)); /* Replace credit for the aborted message
*/
+       } else if (recv < 0 && recv != PN_EOS) {        /* Unexpected error */
+         pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
+         pn_link_close(l);               /* Unexpected error, close the link */
        } else if (!pn_delivery_partial(d)) { /* Message is complete */
          const char *qname = pn_terminus_get_address(pn_link_target(l));
          queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
@@ -387,9 +390,7 @@ static void handle(broker_t* b, pn_event_t* e) {
     broker_stop(b);
     break;
 
- break;
-
-   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
+    case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
     broker_stop(b);
     break;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 5f72ba7..52ca1e0 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -129,7 +129,7 @@ static void decode_message(pn_rwbytes_t data) {
 }
 
 /* This function handles events when we are acting as the receiver */
-static void handle_receive(app_data_t* app, pn_event_t* event) {
+static void handle_receive(app_data_t *app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_LINK_REMOTE_OPEN: {
@@ -144,14 +144,17 @@ static void handle_receive(app_data_t* app, pn_event_t* event) {
        pn_link_t *l = pn_delivery_link(d);
        size_t size = pn_delivery_pending(d);
        pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
-       int err;
        m->size += size;
        m->start = (char*)realloc(m->start, m->size);
-       err = pn_link_recv(l, m->start, m->size);
-       if (err < 0 && err != PN_EOS) {
-         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+       int recv = pn_link_recv(l, m->start, m->size);
+       if (recv == PN_ABORTED) {
+         fprintf(stderr, "Message aborted\n");
+         m->size = 0;           /* Forget the data we accumulated */
          pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
-         m->size = 0;           /* forget the data we accumulated */
+         pn_link_flow(l, 1);    /* Replace credit for aborted message */
+       } else if (recv < 0 && recv != PN_EOS) {        /* Unexpected error */
+         pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
+         pn_link_close(l);               /* Unexpected error, close the link */
        } else if (!pn_delivery_partial(d)) { /* Message is complete */
          decode_message(*m);
          *m = pn_rwbytes_null;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
index ee3e4e4..9cd0b13 100644
--- a/examples/c/example_test.py
+++ b/examples/c/example_test.py
@@ -26,13 +26,14 @@ def python_cmd(name):
     dir = os.path.dirname(__file__)
     return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
 
-def receive_expect_messages(n=10): return ''.join(['{"sequence"=%s}\n'%i for i in xrange(1,
n+1)])
-def receive_expect_total(n=10): return "%s messages received\n"%n
-def receive_expect(n=10): return receive_expect_messages(n)+receive_expect_total(n)
+MESSAGES=10
 
-def send_expect(n=10): return "%s messages sent and acknowledged\n" % n
-def send_abort_expect(n=10): return "%s messages started and aborted\n" % n
+def receive_expect_messages(n=MESSAGES): return ''.join(['{"sequence"=%s}\n'%i for i in xrange(1,
n+1)])
+def receive_expect_total(n=MESSAGES): return "%s messages received\n"%n
+def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect_total(n)
 
+def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n
+def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n
 
 class Broker(object):
     def __init__(self, test):
@@ -56,9 +57,9 @@ class Broker(object):
 
 class CExampleTest(ProcTestCase):
 
-    def runex(self, name, port, *args):
+    def runex(self, name, port, messages=MESSAGES):
         """Run an example with standard arugments, return output"""
-        return self.proc([name, "", port, "examples"] + list(args)).wait_exit()
+        return self.proc([name, "", str(port), "xtest", str(messages)]).wait_exit()
 
     def test_send_receive(self):
         """Send first then receive"""
@@ -93,9 +94,9 @@ class CExampleTest(ProcTestCase):
         with Broker(self) as b:
             self.assertEqual(send_expect(), self.runex("send", b.port))
             self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port))
-            b.proc.wait_re("PN_DELIVERY error: PN_ABORTED\n"*10)
+            b.proc.wait_re("Message aborted\n"*MESSAGES)
             self.assertEqual(send_expect(), self.runex("send", b.port))
-            expect = receive_expect_messages(10)+receive_expect_messages(10)+receive_expect_total(20)
+            expect = receive_expect_messages(MESSAGES)+receive_expect_messages(MESSAGES)+receive_expect_total(20)
             self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20"))
 
     def test_send_abort_direct(self):
@@ -108,7 +109,7 @@ class CExampleTest(ProcTestCase):
             expect += receive_expect_messages()
             d.wait_re(expect)
             self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port))
-            expect += "PN_DELIVERY error: PN_ABORTED\n"*10
+            expect += "Message aborted\n"*MESSAGES
             d.wait_re(expect)
             self.assertEqual(send_expect(), self.runex("send", tp.port))
             expect += receive_expect_messages()+receive_expect_total(20)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index 16f1a66..bf0a32f 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -95,18 +95,21 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    case PN_DELIVERY: {
      /* A message has been received */
      pn_delivery_t *d = pn_event_delivery(event);
-     int err;
      if (pn_delivery_readable(d)) {
        pn_link_t *l = pn_delivery_link(d);
        size_t size = pn_delivery_pending(d);
        pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
        m->size += size;
        m->start = (char*)realloc(m->start, m->size);
-       err = pn_link_recv(l, m->start, m->size);
-       if (err < 0 && err != PN_EOS) {
-         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+       int recv = pn_link_recv(l, m->start, m->size);
+       if (recv == PN_ABORTED) {
+         fprintf(stderr, "Message aborted\n");
+         m->size = 0;           /* Forget the data we accumulated */
          pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
-         m->size = 0;           /* forget the data we accumulated */
+         pn_link_flow(l, 1);    /* Replace credit for aborted message */
+       } else if (recv < 0 && recv != PN_EOS) {        /* Unexpected error */
+         pn_condition_format(pn_link_condition(l), "broker", "PN_DELIVERY error: %s", pn_code(recv));
+         pn_link_close(l);               /* Unexpected error, close the link */
        } else if (!pn_delivery_partial(d)) { /* Message is complete */
          decode_message(*m);
          *m = pn_rwbytes_null;  /* Reset the buffer for the next message*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/include/proton/delivery.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/delivery.h b/proton-c/include/proton/delivery.h
index 966e750..00ca11d 100644
--- a/proton-c/include/proton/delivery.h
+++ b/proton-c/include/proton/delivery.h
@@ -185,23 +185,16 @@ PN_EXTERN bool pn_delivery_partial(pn_delivery_t *delivery);
 /**
  * Check if a received delivery has been aborted.
  *
- * An aborted delivery means the sender cannot complete the message and the
- * receiver should discard any data already received. There is nothing further
- * to be done with the delivery: it is pn_delivery_settled() and pn_link_recv()
- * returns ::PN_ABORTED.
+ * An aborted delivery means the sender cannot complete this message and the
+ * receiver should discard any data already received. The link remains open for
+ * future messages.
  *
- * For backward compatibility with code that does not check pn_delivery_aborted(),
- * the following are true of an aborted delivery:
+ * You must still call pn_delivery_settle() to free local resources. An aborted
+ * delivery consumes a credit, use pn_link_flow() to issue new credit as for a
+ * successful delivery.
  *
- * - pn_delivery_partial() is false - old code will not wait forever for more data
- *
- * - pn_delivery_pending() returns 1, not 0 - old code that checks for completion with
- *
- *       if (pn_delivery_pending(d)==0 && !pn_delivery_partial(d))
- *
- *   will not mistake this for successful completion, and will call pn_link_recv()
- *
- * - pn_link_recv() returns ::PN_ABORTED - old code will know there is some kind of error.
+ * Calling pn_link_recv() when the current delivery is aborted returns
+ * ::PN_ABORTED.
  *
  * @see pn_delivery_abort()
  * @param[in] delivery a delivery object
@@ -274,12 +267,13 @@ PN_EXTERN bool pn_delivery_current(pn_delivery_t *delivery);
 /**
  * Abort a delivery being sent.
  *
- * Aborting means the sender cannot complete the delivery. It will not send any
- * more data and all data received so far should be discarded by the receiver.
+ * Aborting means the sender cannot complete this message. It will not send any
+ * more data, and data sent so far should be discarded by the receiver.  The
+ * link remains open for future messages.
  *
  * If some data has already been sent on the network, an AMQP "aborted" frame
  * will be sent to inform the peer. If no data has yet been sent, the delivery
- * will simply be dropped.
+ * will simply be forgotten.
  *
  * The delivery will be freed, and cannot be used after the call.
  *

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index cd929f4..eae5ebf 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -1545,7 +1545,8 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
 
   // begin delivery state
   delivery->state.init = false;
-  delivery->state.sent = false;
+  delivery->state.sending = false; /* True if we have sent at least 1 frame */
+  delivery->state.sent = false;    /* True if we have sent the entire delivery */
   // end delivery state
 
   if (!link->current)
@@ -1728,9 +1729,16 @@ pn_delivery_t *pn_link_current(pn_link_t *link)
 static void pni_advance_sender(pn_link_t *link)
 {
   link->current->done = true;
-  link->queued++;
-  link->credit--;
-  link->session->outgoing_deliveries++;
+  /* Skip accounting if the link is aborted and has not sent any frames.
+     A delivery that was aborted before sending the first frame was not accounted
+     for in pni_process_tpwork_sender() so we don't need to account for it being sent here.
+  */
+  bool skip = link->current->aborted && !link->current->state.sending;
+  if (!skip) {
+    link->queued++;
+    link->credit--;
+    link->session->outgoing_deliveries++;
+  }
   pni_add_tpwork(link->current);
   link->current = link->current->unsettled_next;
 }
@@ -1888,25 +1896,23 @@ int pn_link_drained(pn_link_t *link)
 ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
 {
   if (!receiver) return PN_ARG_ERR;
-
   pn_delivery_t *delivery = receiver->current;
-  if (delivery && !delivery->aborted) {
-    size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
-    pn_buffer_trim(delivery->bytes, size, 0);
-    if (size) {
-      receiver->session->incoming_bytes -= size;
-      if (!receiver->session->state.incoming_window) {
-        pni_add_tpwork(delivery);
-      }
-      return size;
-    } else {
-      return delivery->done ? PN_EOS : 0;
+  if (!delivery) return PN_STATE_ERR;
+  if (delivery->aborted) return PN_ABORTED;
+  size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
+  pn_buffer_trim(delivery->bytes, size, 0);
+  if (size) {
+    receiver->session->incoming_bytes -= size;
+    if (!receiver->session->state.incoming_window) {
+      pni_add_tpwork(delivery);
     }
+    return size;
   } else {
-    return delivery ? PN_ABORTED : PN_STATE_ERR;
+    return delivery->done ? PN_EOS : 0;
   }
 }
 
+
 void pn_link_flow(pn_link_t *receiver, int credit)
 {
   assert(receiver);
@@ -2032,7 +2038,7 @@ bool pn_delivery_readable(pn_delivery_t *delivery)
 
 size_t pn_delivery_pending(pn_delivery_t *delivery)
 {
-  /* Aborted deliveries: for old clients that don't check pn_delivery_aborted(),
+  /* Aborted deliveries: for clients that don't check pn_delivery_aborted(),
      return 1 rather than 0. This will force them to call pn_link_recv() and get
      the PN_ABORTED error return code.
   */
@@ -2046,11 +2052,10 @@ bool pn_delivery_partial(pn_delivery_t *delivery)
 }
 
 void pn_delivery_abort(pn_delivery_t *delivery) {
-  delivery->aborted = true;
-  /* Discard any data, aborted frames are always empty */
-  delivery->link->session->outgoing_bytes -= pn_buffer_size(delivery->bytes);
-  pn_buffer_clear(delivery->bytes);
-  pn_delivery_settle(delivery);
+  if (!delivery->local.settled) { /* Can't abort a settled delivery */
+    delivery->aborted = true;
+    pn_delivery_settle(delivery);
+  }
 }
 
 bool pn_delivery_aborted(pn_delivery_t *delivery) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/transport.c b/proton-c/src/core/transport.c
index c825136..3538e93 100644
--- a/proton-c/src/core/transport.c
+++ b/proton-c/src/core/transport.c
@@ -113,6 +113,7 @@ void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
   if (delivery->state.init) {
     delivery->state.init = false;
+    delivery->state.sending = false;
     delivery->state.sent = false;
     pn_hash_del(db->deliveries, pni_sequence_make_hash(delivery->state.id) );
   }
@@ -1541,25 +1542,24 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type,
uint16_t chann
       pn_work_update(transport->connection, delivery);
     }
   }
-  delivery->aborted = aborted;
-  if (aborted) {
+
+  pn_buffer_append(delivery->bytes, payload->start, payload->size);
+  ssn->incoming_bytes += payload->size;
+  delivery->done = !more;
+
+  ssn->state.incoming_transfer_count++;
+  ssn->state.incoming_window--;
+
+  // XXX: need better policy for when to refresh window
+  if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >=
0) {
+    pni_post_flow(transport, ssn, link);
+  }
+
+  if ((delivery->aborted = aborted)) {
     delivery->remote.settled = true;
     delivery->done = true;
     delivery->updated = true;
-    pn_buffer_clear(delivery->bytes);
     pn_work_update(transport->connection, delivery);
-  } else {
-    pn_buffer_append(delivery->bytes, payload->start, payload->size);
-    ssn->incoming_bytes += payload->size;
-    delivery->done = !more;
-
-    ssn->state.incoming_transfer_count++;
-    ssn->state.incoming_window--;
-
-    // XXX: need better policy for when to refresh window
-    if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >=
0) {
-      pni_post_flow(transport, ssn, link);
-    }
   }
   pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY);
   return 0;
@@ -2163,18 +2163,20 @@ static int pni_post_disp(pn_transport_t *transport, pn_delivery_t
*delivery)
 
 static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery,
bool *settle)
 {
+  pn_link_t *link = delivery->link;
+  pn_delivery_state_t *state = &delivery->state;
   if (delivery->aborted && !delivery->state.sending) {
-    // Aborted delivery with no data yet sent, drop it.
+    // Aborted delivery with no data yet sent, drop it and issue a FLOW as we may have credit.
     *settle = true;
+    state->sent = true;
+    pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_FLOW);
     return 0;
   }
   *settle = false;
-  pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = &link->session->state;
   pn_link_state_t *link_state = &link->state;
   bool xfr_posted = false;
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle
>= 0) {
-    pn_delivery_state_t *state = &delivery->state;
     if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes)
> 0) &&
         ssn_state->remote_incoming_window > 0 && link_state->link_credit
> 0) {
       if (!state->init) {
@@ -2201,7 +2203,7 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t
*d
                                                false /* Batchable */
       );
       if (count < 0) return count;
-      if (count > 0) delivery->state.sending = true;
+      state->sending = true;
       xfr_posted = true;
       ssn_state->outgoing_transfer_count += count;
       ssn_state->remote_incoming_window -= count;
@@ -2221,7 +2223,7 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t
*d
     }
   }
 
-  pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
+  if (!state->init) state = NULL;
   if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled
       && state && state->sent && !xfr_posted) {
     int err = pni_post_disp(transport, delivery);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index ce7d25e..6572f32 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -49,7 +49,6 @@ pn_add_c_test (c-message-tests message.c)
 pn_add_c_test (c-engine-tests engine.c)
 pn_add_c_test (c-parse-url-tests parse-url.c)
 pn_add_c_test (c-refcount-tests refcount.c)
-pn_add_c_test (c-reactor-tests reactor.c)
 pn_add_c_test (c-event-tests event.c)
 pn_add_c_test (c-data-tests data.c)
 pn_add_c_test (c-condition-tests condition.c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/tests/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/connection_driver.c b/proton-c/src/tests/connection_driver.c
index b16efba..fbcefc3 100644
--- a/proton-c/src/tests/connection_driver.c
+++ b/proton-c/src/tests/connection_driver.c
@@ -27,39 +27,32 @@
 #include <proton/session.h>
 #include <proton/link.h>
 
-/* Place for handlers to save link and delivery pointers */
-struct context {
-  pn_link_t *link;
-  pn_delivery_t *delivery;
-};
-
-/* Handler that replies to REMOTE_OPEN */
+/* Handler that replies to REMOTE_OPEN, stores the opened object on the handler */
 static pn_event_type_t open_handler(test_handler_t *th, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
-    pn_connection_open(pn_event_connection(e));
+    th->connection = pn_event_connection(e);
+    pn_connection_open(th->connection);
     break;
    case PN_SESSION_REMOTE_OPEN:
-    pn_session_open(pn_event_session(e));
+    th->session =  pn_event_session(e);
+    pn_session_open(th->session);
     break;
-   case PN_LINK_REMOTE_OPEN: {
-    pn_link_open(pn_event_link(e));
-    struct context *ctx = (struct context*) th->context;
-    if (ctx) ctx->link = pn_event_link(e);
+   case PN_LINK_REMOTE_OPEN:
+    th->link = pn_event_link(e);
+    pn_link_open(th->link);
     break;
-   }
    default:
     break;
   }
   return PN_EVENT_NONE;
 }
 
-/* Handler that returns control on PN_DELIVERY */
+/* Handler that returns control on PN_DELIVERY and stores the delivery on the handler */
 static pn_event_type_t delivery_handler(test_handler_t *th, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_DELIVERY: {
-    struct context *ctx = (struct context*)th->context;
-    if (ctx) ctx->delivery = pn_event_delivery(e);
+     th->delivery = pn_event_delivery(e);
     return PN_DELIVERY;
    }
    default:
@@ -70,9 +63,8 @@ static pn_event_type_t delivery_handler(test_handler_t *th, pn_event_t *e)
{
 /* Blow-by-blow event verification of a single message transfer */
 static void test_message_transfer(test_t *t) {
   test_connection_driver_t client, server;
-  struct context server_ctx = {0};
   test_connection_driver_init(&client, t, open_handler, NULL);
-  test_connection_driver_init(&server, t, delivery_handler, &server_ctx);
+  test_connection_driver_init(&server, t, delivery_handler, NULL);
   pn_transport_set_server(server.driver.transport);
 
   pn_connection_open(client.driver.connection);
@@ -100,7 +92,7 @@ static void test_message_transfer(test_t *t) {
     PN_LINK_LOCAL_OPEN, PN_TRANSPORT,
     0);
 
-  pn_link_t *rcv = server_ctx.link;
+  pn_link_t *rcv = server.handler.link;
   TEST_CHECK(t, rcv);
   TEST_CHECK(t, pn_link_is_receiver(rcv));
   pn_link_flow(rcv, 1);
@@ -113,14 +105,14 @@ static void test_message_transfer(test_t *t) {
   pn_rwbytes_t buf = { 0 };
   ssize_t size = message_encode(m, &buf);
   pn_message_free(m);
-  pn_delivery(snd, pn_dtag("x", 1));
+  pn_delivery(snd, PN_BYTES_LITERAL(x));
   TEST_INT_EQUAL(t, size, pn_link_send(snd, buf.start, size));
   TEST_CHECK(t, pn_link_advance(snd));
   test_connection_drivers_run(&client, &server);
   TEST_HANDLER_EXPECT(&server.handler, PN_TRANSPORT, PN_DELIVERY, 0);
 
   /* Receive and decode the message */
-  pn_delivery_t *dlv = server_ctx.delivery;
+  pn_delivery_t *dlv = server.handler.delivery;
   TEST_ASSERT(dlv);
   pn_message_t *m2 = pn_message();
   pn_rwbytes_t buf2 = { 0 };
@@ -151,8 +143,7 @@ pn_event_type_t send_client_handler(test_handler_t *th, pn_event_t *e)
{
     break;
    }
    case PN_LINK_REMOTE_OPEN: {
-    struct context *ctx = (struct context*) th->context;
-    if (ctx) ctx->link = pn_event_link(e);
+    th->link = pn_event_link(e);
     return PN_LINK_REMOTE_OPEN;
    }
    default:
@@ -165,15 +156,14 @@ pn_event_type_t send_client_handler(test_handler_t *th, pn_event_t *e)
{
 static void test_message_stream(test_t *t) {
   /* Set up the link, give credit, start the delivery */
   test_connection_driver_t client, server;
-  struct context server_ctx = {0}, client_ctx = {0};
-  test_connection_driver_init(&client, t, send_client_handler, &client_ctx);
-  test_connection_driver_init(&server, t, delivery_handler, &server_ctx);
+  test_connection_driver_init(&client, t, send_client_handler, NULL);
+  test_connection_driver_init(&server, t, delivery_handler, NULL);
   pn_transport_set_server(server.driver.transport);
 
   pn_connection_open(client.driver.connection);
   test_connection_drivers_run(&client, &server);
-  pn_link_t *rcv = server_ctx.link;
-  pn_link_t *snd = client_ctx.link;
+  pn_link_t *rcv = server.handler.link;
+  pn_link_t *snd = client.handler.link;
   pn_link_flow(rcv, 1);
   test_connection_drivers_run(&client, &server);
   TEST_HANDLER_EXPECT_LAST(&client.handler, PN_LINK_FLOW);
@@ -188,7 +178,7 @@ static void test_message_stream(test_t *t) {
 
   /* Send and receive the message in chunks */
   static const ssize_t CHUNK = 100;
-  pn_delivery(snd, pn_dtag("x", 1));
+  pn_delivery(snd, PN_BYTES_LITERAL(x));
   pn_rwbytes_t buf2 = { 0 };
   ssize_t received = 0;
   for (ssize_t i = 0; i < size; i += CHUNK) {
@@ -198,7 +188,7 @@ static void test_message_stream(test_t *t) {
     test_connection_drivers_run(&client, &server);
     TEST_HANDLER_EXPECT_LAST(&server.handler, PN_DELIVERY);
     /* Receive a chunk */
-    pn_delivery_t *dlv = server_ctx.delivery;
+    pn_delivery_t *dlv = server.handler.delivery;
     pn_link_t *l = pn_delivery_link(dlv);
     ssize_t dsize = pn_delivery_pending(dlv);
     rwbytes_ensure(&buf2, received+dsize);
@@ -220,63 +210,173 @@ static void test_message_stream(test_t *t) {
 static void test_message_abort(test_t *t) {
   /* Set up the link, give credit, start the delivery */
   test_connection_driver_t client, server;
-  struct context server_ctx = {0}, client_ctx = {0};
-  test_connection_driver_init(&client, t, send_client_handler, &client_ctx);
-  test_connection_driver_init(&server, t, delivery_handler, &server_ctx);
+  test_connection_driver_init(&client, t, send_client_handler, NULL);
+  test_connection_driver_init(&server, t, delivery_handler, NULL);
   pn_transport_set_server(server.driver.transport);
   pn_connection_open(client.driver.connection);
 
   test_connection_drivers_run(&client, &server);
-  pn_link_t *rcv = server_ctx.link;
-  pn_link_t *snd = client_ctx.link;
+  pn_link_t *rcv = server.handler.link;
+  pn_link_t *snd = client.handler.link;
   char data[100] = {0};          /* Dummy data to send. */
   char rbuf[sizeof(data)] = {0}; /* Read buffer for incoming data. */
 
   /* Send 2 frames with data */
   pn_link_flow(rcv, 1);
+  TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
   test_connection_drivers_run(&client, &server);
-  pn_delivery_t *sd = pn_delivery(snd, pn_dtag("1", 1)); /* Sender delivery */
+  TEST_INT_EQUAL(t, 1, pn_link_credit(snd));
+  pn_delivery_t *sd = pn_delivery(snd, PN_BYTES_LITERAL(1)); /* Sender delivery */
   for (size_t i = 0; i < 2; ++i) {
     TEST_INT_EQUAL(t, sizeof(data), pn_link_send(snd, data, sizeof(data)));
     test_connection_drivers_run(&client, &server);
     TEST_HANDLER_EXPECT_LAST(&server.handler, PN_DELIVERY);
-    pn_delivery_t *rd = server_ctx.delivery;
+    pn_delivery_t *rd = server.handler.delivery;
     TEST_CHECK(t, !pn_delivery_aborted(rd));
     TEST_CHECK(t, pn_delivery_partial(rd));
+    TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
     TEST_INT_EQUAL(t, sizeof(data), pn_delivery_pending(rd));
     TEST_INT_EQUAL(t, sizeof(rbuf), pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
     TEST_INT_EQUAL(t, 0, pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
+    TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
   }
+  TEST_INT_EQUAL(t, 1, pn_link_credit(snd));
   /* Abort the delivery */
   pn_delivery_abort(sd);
+  TEST_INT_EQUAL(t, 0, pn_link_credit(snd));
   TEST_CHECK(t, pn_link_current(snd) != sd); /* Settled */
   test_connection_drivers_run(&client, &server);
   TEST_HANDLER_EXPECT_LAST(&server.handler, PN_DELIVERY);
-  /* Receive the aborted frame, should be empty */
-  pn_delivery_t *rd = server_ctx.delivery;
+  TEST_INT_EQUAL(t, 0, pn_link_credit(snd));
+
+  /* Receive the aborted=true frame, should be empty */
+  pn_delivery_t *rd = server.handler.delivery;
   TEST_CHECK(t, pn_delivery_aborted(rd));
   TEST_CHECK(t, !pn_delivery_partial(rd)); /* Aborted deliveries are never partial */
-  TEST_CHECK(t, pn_delivery_settled(rd)); /* Aborted deliveries are always settled */
+  TEST_CHECK(t, pn_delivery_settled(rd)); /* Aborted deliveries are remote settled */
   TEST_INT_EQUAL(t, 1, pn_delivery_pending(rd));
   TEST_INT_EQUAL(t, PN_ABORTED, pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
+  pn_delivery_settle(rd);       /* Must be settled locally to free it */
+
+  TEST_INT_EQUAL(t, 0, pn_link_credit(snd));
+  TEST_INT_EQUAL(t, 0, pn_link_credit(rcv));
 
-  /* Send a single aborted frame, with data. */
+  /* Abort a delivery before any data has been framed, should be dropped. */
   pn_link_flow(rcv, 1);
+  TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
   test_connection_drivers_run(&client, &server);
-  sd = pn_delivery(snd, pn_dtag("x", 1));
+  test_handler_clear(&client.handler, 0);
+  test_handler_clear(&server.handler, 0);
+
+  sd = pn_delivery(snd, PN_BYTES_LITERAL(x));
   TEST_INT_EQUAL(t, sizeof(data), pn_link_send(snd, data, sizeof(data)));
-  pn_delivery_abort(sd);                     /* Abort after send creates an aborted frame
with data */
-  TEST_CHECK(t, pn_link_current(snd) != sd); /* Settled */
+  pn_delivery_abort(sd);
+  TEST_CHECK(t, pn_link_current(snd) != sd); /* Settled, possibly freed */
+  test_connection_drivers_run(&client, &server);
+  TEST_HANDLER_EXPECT(&server.handler, 0); /* Expect no delivery at the server */
+  /* Client gets transport/flow after abort to ensure other messages are sent */
+  TEST_HANDLER_EXPECT(&client.handler, PN_TRANSPORT, PN_LINK_FLOW, 0);
+  /* Aborted delivery consumes no credit */
+  TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
+  TEST_INT_EQUAL(t, 1, pn_link_credit(snd));
+
+  test_connection_driver_destroy(&client);
+  test_connection_driver_destroy(&server);
+}
+
+
+int send_receive_message(test_t *t, const char* tag,
+                         test_connection_driver_t *src, test_connection_driver_t *dst)
+{
+  int errors = t->errors;
+  char data[100] = {0};          /* Dummy data to send. */
+  strncpy(data, tag, sizeof(data));
+
+  if (!TEST_CHECK(t, pn_link_credit(src->handler.link))) return 1;
+
+  pn_delivery_t *sd = pn_delivery(src->handler.link, pn_dtag(tag, strlen(tag)));
+  dst->handler.delivery = NULL;
+  TEST_CHECK(t, pn_delivery_current(sd));
+  TEST_INT_EQUAL(t, sizeof(data), pn_link_send(src->handler.link, data, sizeof(data)));
+  pn_delivery_settle(sd);
+  test_connection_drivers_run(src, dst);
+  pn_delivery_t *rd = dst->handler.delivery;
+  dst->handler.delivery = NULL;
+  if (!TEST_CHECK(t, rd)) return 1;
+
+  TEST_CHECK(t, pn_delivery_current(rd));
+  char rbuf[sizeof(data)] = {0}; /* Read buffer for incoming data. */
+  TEST_INT_EQUAL(t, sizeof(rbuf), pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
+  TEST_STR_EQUAL(t, data, rbuf);
+  pn_delivery_settle(rd);
+  return t->errors > errors;
+}
+
+#define SEND_RECEIVE_MESSAGE(T, TAG, SRC, DST)                  \
+  TEST_INT_EQUAL(T, 0, send_receive_message(T, TAG, SRC, DST))
+
+// Test mixing aborted and good deliveries, make sure credit is correct.
+static void test_message_abort_mixed(test_t *t) {
+  /* Set up the link, give credit, start the delivery */
+  test_connection_driver_t client, server;
+  test_connection_driver_init(&client, t, send_client_handler, NULL);
+  test_connection_driver_init(&server, t, delivery_handler, NULL);
+  pn_transport_set_server(server.driver.transport);
+  pn_connection_open(client.driver.connection);
+
+  test_connection_drivers_run(&client, &server);
+  pn_link_t *rcv = server.handler.link;
+  pn_link_t *snd = client.handler.link;
+  char data[100] = {0};          /* Dummy data to send. */
+  char rbuf[sizeof(data)] = {0}; /* Read buffer for incoming data. */
+
+  /* We will send 3 good messages, interleaved with aborted ones */
+  pn_link_flow(rcv, 5);
+  test_connection_drivers_run(&client, &server);
+  SEND_RECEIVE_MESSAGE(t, "one", &client, &server);
+  TEST_INT_EQUAL(t, 4, pn_link_credit(snd));
+  TEST_INT_EQUAL(t, 4, pn_link_credit(rcv));
+  pn_delivery_t *sd, *rd;
+
+  /* Send a frame, then an abort */
+  sd = pn_delivery(snd, PN_BYTES_LITERAL("x1"));
+  server.handler.delivery = NULL;
+  TEST_INT_EQUAL(t, sizeof(data), pn_link_send(snd, data, sizeof(data)));
+  TEST_INT_EQUAL(t, 4, pn_link_credit(snd)); /* Nothing sent yet */
+  test_connection_drivers_run(&client, &server);
+  rd = server.handler.delivery;
+  if (!TEST_CHECK(t, rd)) goto cleanup;
+  TEST_INT_EQUAL(t, sizeof(rbuf), pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
+
+  pn_delivery_abort(sd);
   test_connection_drivers_run(&client, &server);
-  TEST_HANDLER_EXPECT_LAST(&server.handler, PN_DELIVERY);
-  /* Receive the aborted frame */
-  rd = server_ctx.delivery;
   TEST_CHECK(t, pn_delivery_aborted(rd));
-  TEST_CHECK(t, !pn_delivery_partial(rd)); /* Aborted deliveries are never partial */
-  TEST_CHECK(t, pn_delivery_settled(rd)); /* Aborted deliveries are always settled */
-  TEST_INT_EQUAL(t, 1, pn_delivery_pending(rd)); /* Has no data */
-  TEST_INT_EQUAL(t, PN_ABORTED, pn_link_recv(pn_delivery_link(rd), rbuf, sizeof(rbuf)));
+  pn_delivery_settle(rd);
+  /* Abort after sending data consumes credit */
+  TEST_INT_EQUAL(t, 3, pn_link_credit(snd));
+  TEST_INT_EQUAL(t, 3, pn_link_credit(rcv));
+
+  SEND_RECEIVE_MESSAGE(t, "two", &client, &server);
+  TEST_INT_EQUAL(t, 2, pn_link_credit(snd));
+  TEST_INT_EQUAL(t, 2, pn_link_credit(rcv));
 
+  /* Abort a delivery before any data has been framed, should be dropped. */
+  test_handler_clear(&server.handler, 0);
+  sd = pn_delivery(snd, PN_BYTES_LITERAL(4));
+  TEST_INT_EQUAL(t, sizeof(data), pn_link_send(snd, data, sizeof(data)));
+  pn_delivery_abort(sd);
+  TEST_CHECK(t, pn_link_current(snd) != sd); /* Advanced */
+  test_connection_drivers_run(&client, &server);
+  TEST_HANDLER_EXPECT(&server.handler, PN_TRANSPORT, 0);
+  /* Aborting wit no frames sent should leave credit untouched */
+  TEST_INT_EQUAL(t, 2, pn_link_credit(snd));
+  TEST_INT_EQUAL(t, 2, pn_link_credit(rcv));
+
+  SEND_RECEIVE_MESSAGE(t, "three", &client, &server);
+  TEST_INT_EQUAL(t, 1, pn_link_credit(rcv));
+  TEST_INT_EQUAL(t, 1, pn_link_credit(snd));
+
+ cleanup:
   test_connection_driver_destroy(&client);
   test_connection_driver_destroy(&server);
 }
@@ -287,5 +387,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_message_transfer(&t));
   RUN_ARGV_TEST(failed, t, test_message_stream(&t));
   RUN_ARGV_TEST(failed, t, test_message_abort(&t));
+  RUN_ARGV_TEST(failed, t, test_message_abort_mixed(&t));
   return failed;
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/tests/test_handler.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_handler.h b/proton-c/src/tests/test_handler.h
index 65fcfe7..6eb9079 100644
--- a/proton-c/src/tests/test_handler.h
+++ b/proton-c/src/tests/test_handler.h
@@ -37,7 +37,14 @@ typedef struct test_handler_t {
   test_handler_fn f;
   pn_event_type_t log[MAX_EVENT_LOG]; /* Log of event types */
   size_t log_size;                    /* Number of events in the log */
-  void *context;                      /* Test-specific context */
+  void *context;                      /* Generic test context */
+  /* Specific context slots for proton objects commonly used by handlers  */
+  pn_connection_t *connection;
+  pn_session_t *session;
+  pn_link_t *link;
+  pn_link_t *sender;
+  pn_link_t *receiver;
+  pn_delivery_t *delivery;
 } test_handler_t;
 
 void test_handler_init(test_handler_t *th, test_t *t, test_handler_fn f) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b36b70c2/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 58d3ce0..ed832c6 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -36,6 +36,9 @@
 #include <stdlib.h>
 #include <string.h>
 
+/* Handy way to make pn_bytes: PN_BYTES_LITERAL(FOO) => pn_bytes("FOO",3) */
+#define PN_BYTES_LITERAL(X) (pn_bytes(sizeof(#X)-1, #X))
+
 /* A struct to collect the results of a test, created by RUN_TEST macro. */
 typedef struct test_t {
   const char* name;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message