qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fad...@apache.org
Subject svn commit: r1553874 [2/2] - in /qpid/proton/branches/fadams-javascript-binding: examples/messenger/py/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ p...
Date Sat, 28 Dec 2013 16:58:27 GMT
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/messenger/subscription.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/messenger/subscription.h?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/messenger/subscription.h (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/messenger/subscription.h Sat Dec 28 16:58:25 2013
@@ -24,7 +24,10 @@
 
 #include <proton/messenger.h>
 
-pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme);
+pn_subscription_t *pn_subscription(pn_messenger_t *messenger,
+                                   const char *scheme, const char *host,
+                                   const char *port);
 const char *pn_subscription_scheme(pn_subscription_t *sub);
+int pni_subscription_set_address(pn_subscription_t *sub, const char *address);
 
 #endif /* subscription.h */

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/posix/driver.c?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/posix/driver.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/posix/driver.c Sat Dec 28 16:58:25 2013
@@ -620,9 +620,6 @@ void pn_connector_process(pn_connector_t
             }
           } else if (n) {
             pn_transport_pop(transport, (size_t) n);
-            pending -= n;
-            if (pending == 0)
-              c->status &= ~PN_SEL_WR;
           }
         }
       } else if (pending == 0) {

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/sasl/sasl.c?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/sasl/sasl.c Sat Dec 28 16:58:25 2013
@@ -67,7 +67,7 @@ pn_sasl_t *pn_sasl(pn_transport_t *trans
 {
   if (!transport->sasl) {
     pn_sasl_t *sasl = (pn_sasl_t *) malloc(sizeof(pn_sasl_t));
-    sasl->disp = pn_dispatcher(1, sasl);
+    sasl->disp = pn_dispatcher(1, transport);
     sasl->disp->batch = false;
 
     pn_dispatcher_action(sasl->disp, SASL_INIT, pn_do_init);
@@ -356,7 +356,7 @@ ssize_t pn_sasl_output(pn_sasl_t *sasl, 
 
 int pn_do_init(pn_dispatcher_t *disp)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *) disp->context;
+  pn_sasl_t *sasl = disp->transport->sasl;
   pn_bytes_t mech;
   pn_bytes_t recv;
   int err = pn_scan_args(disp, "D.[sz]", &mech, &recv);
@@ -369,14 +369,14 @@ int pn_do_init(pn_dispatcher_t *disp)
 
 int pn_do_mechanisms(pn_dispatcher_t *disp)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *) disp->context;
+  pn_sasl_t *sasl = disp->transport->sasl;
   sasl->rcvd_init = true;
   return 0;
 }
 
 int pn_do_recv(pn_dispatcher_t *disp)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *) disp->context;
+  pn_sasl_t *sasl = disp->transport->sasl;
   pn_bytes_t recv;
   int err = pn_scan_args(disp, "D.[z]", &recv);
   if (err) return err;
@@ -396,7 +396,7 @@ int pn_do_response(pn_dispatcher_t *disp
 
 int pn_do_outcome(pn_dispatcher_t *disp)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *) disp->context;
+  pn_sasl_t *sasl = disp->transport->sasl;
   uint8_t outcome;
   int err = pn_scan_args(disp, "D.[B]", &outcome);
   if (err) return err;
@@ -425,7 +425,7 @@ static ssize_t pn_input_read_sasl_header
     if (sasl->header_count == SASL_HEADER_LEN) {
       sasl->io_layer->process_input = pn_input_read_sasl;
       if (sasl->disp->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- %s\n", "SASL");
+        pn_transport_logf(sasl->transport, "  <- %s", "SASL");
     }
     return delta;
   }
@@ -447,7 +447,7 @@ static ssize_t pn_output_write_sasl_head
 {
   pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
   if (sasl->disp->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> %s\n", "SASL");
+    pn_transport_logf(sasl->transport, "  -> %s", "SASL");
   if (size >= SASL_HEADER_LEN) {
     memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
     sasl->io_layer->process_output = pn_output_write_sasl;

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/transport/transport.c Sat Dec 28 16:58:25 2013
@@ -104,11 +104,18 @@ static ssize_t pn_output_write_amqp_head
 static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available);
 static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now);
 
+static void pni_default_tracer(pn_transport_t *transport, const char *message)
+{
+  fprintf(stderr, "[%p]:%s\n", (void *) transport, message);
+}
+
 void pn_transport_init(pn_transport_t *transport)
 {
+  transport->tracer = pni_default_tracer;
   transport->header_count = 0;
   transport->sasl = NULL;
   transport->ssl = NULL;
+  transport->scratch = pn_string(NULL);
   transport->disp = pn_dispatcher(0, transport);
 
   pn_io_layer_t *io_layer = transport->io_layers;
@@ -233,6 +240,7 @@ void pn_transport_free(pn_transport_t *t
   pn_free(transport->remote_channels);
   free(transport->input_buf);
   free(transport->output_buf);
+  pn_free(transport->scratch);
   free(transport);
 }
 
@@ -377,7 +385,7 @@ int pn_do_error(pn_transport_t *transpor
     transport->close_sent = true;
   }
   transport->disp->halt = true;
-  fprintf(stderr, "ERROR %s %s\n", condition, pn_error_text(transport->error));
+  pn_transport_logf(transport, "ERROR %s %s", condition, pn_error_text(transport->error));
   return PN_ERR;
 }
 
@@ -388,7 +396,7 @@ static char *pn_bytes_strdup(pn_bytes_t 
 
 int pn_do_open(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   pn_connection_t *conn = transport->connection;
   bool container_q, hostname_q;
   pn_bytes_t remote_container, remote_hostname;
@@ -405,8 +413,8 @@ int pn_do_open(pn_dispatcher_t *disp)
   if (err) return err;
   if (transport->remote_max_frame > 0) {
     if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) {
-      fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
-              transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
+      pn_transport_logf(transport, "Peer advertised bad max-frame (%u), forcing to %u",
+                        transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
       transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
     }
     disp->remote_max_frame = transport->remote_max_frame;
@@ -436,7 +444,7 @@ int pn_do_open(pn_dispatcher_t *disp)
 
 int pn_do_begin(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   bool reply;
   uint16_t remote_channel;
   pn_sequence_t next;
@@ -524,7 +532,7 @@ int pn_terminus_set_address_bytes(pn_ter
 
 int pn_do_attach(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   pn_bytes_t name;
   uint32_t handle;
   bool is_sender;
@@ -641,7 +649,7 @@ void pn_full_settle(pn_delivery_map_t *d
 int pn_do_transfer(pn_dispatcher_t *disp)
 {
   // XXX: multi transfer
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   uint32_t handle;
   pn_bytes_t tag;
   bool id_present;
@@ -710,7 +718,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
 
 int pn_do_flow(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   pn_sequence_t onext, inext, delivery_count;
   uint32_t iwin, owin, link_credit;
   uint32_t handle;
@@ -777,7 +785,7 @@ static int pn_scan_error(pn_data_t *data
 
 int pn_do_disposition(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   bool role;
   pn_sequence_t first, last;
   uint64_t type = 0;
@@ -854,7 +862,7 @@ int pn_do_disposition(pn_dispatcher_t *d
 
 int pn_do_detach(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   uint32_t handle;
   bool closed;
   int err = pn_scan_args(disp, "D.[Io]", &handle, &closed);
@@ -883,7 +891,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
 
 int pn_do_end(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   pn_session_t *ssn = pn_channel_state(transport, disp->channel);
   int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
@@ -894,7 +902,7 @@ int pn_do_end(pn_dispatcher_t *disp)
 
 int pn_do_close(pn_dispatcher_t *disp)
 {
-  pn_transport_t *transport = (pn_transport_t *) disp->context;
+  pn_transport_t *transport = disp->transport;
   pn_connection_t *conn = transport->connection;
   int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
@@ -947,12 +955,12 @@ static ssize_t transport_consume(pn_tran
       break;
     } else {
       if (n != PN_EOS) {
-        pn_dispatcher_trace(transport->disp, 0, "ERROR[%i] %s\n",
-                            pn_error_code(transport->error),
-                            pn_error_text(transport->error));
+        pn_transport_logf(transport, "ERROR[%i] %s\n",
+                          pn_error_code(transport->error),
+                          pn_error_text(transport->error));
       }
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
-        pn_dispatcher_trace(transport->disp, 0, "<- EOS\n");
+        pn_transport_log(transport, "  <- EOS");
       transport->input_pending = 0;  // XXX ???
       return n;
     }
@@ -983,7 +991,7 @@ static ssize_t pn_input_read_header(pn_t
       transport->io_layers[PN_IO_AMQP].process_input = next;
 
       if (transport->disp->trace & PN_TRACE_FRM)
-        fprintf(stderr, "    <- %s\n", protocol);
+        pn_transport_logf(transport, "  <- %s", protocol);
     }
     return delta;
   }
@@ -1623,7 +1631,7 @@ static ssize_t pn_output_write_header(pn
                                       ssize_t (*next)(pn_io_layer_t *, char *, size_t))
 {
   if (transport->disp->trace & PN_TRACE_FRM)
-    fprintf(stderr, "    -> %s\n", protocol);
+    pn_transport_logf(transport, "  -> %s", protocol);
   if (size >= hdrsize) {
     memmove(bytes, header, hdrsize);
     transport->io_layers[PN_IO_AMQP].process_output = next;
@@ -1700,10 +1708,10 @@ static ssize_t transport_produce(pn_tran
         break;   // return what is available
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
         if (n == PN_EOS)
-          pn_dispatcher_trace(transport->disp, 0, "-> EOS\n");
+          pn_transport_log(transport, "  -> EOS");
         else
-          pn_dispatcher_trace(transport->disp, 0, "-> EOS (%" PN_ZI ") %s\n", n,
-                              pn_error_text(transport->error));
+          pn_transport_logf(transport, "  -> EOS (%" PN_ZI ") %s", n,
+                            pn_error_text(transport->error));
       }
       return n;
     }
@@ -1732,6 +1740,49 @@ void pn_transport_trace(pn_transport_t *
   transport->disp->trace = trace;
 }
 
+void pn_transport_set_tracer(pn_transport_t *transport, pn_tracer_t *tracer)
+{
+  assert(transport);
+  assert(tracer);
+
+  transport->tracer = tracer;
+}
+
+pn_tracer_t *pn_transport_get_tracer(pn_transport_t *transport)
+{
+  assert(transport);
+  return transport->tracer;
+}
+
+void pn_transport_set_context(pn_transport_t *transport, void *context)
+{
+  assert(transport);
+  transport->context = context;
+}
+
+void *pn_transport_get_context(pn_transport_t *transport)
+{
+  assert(transport);
+  return transport->context;
+}
+
+void pn_transport_log(pn_transport_t *transport, const char *message)
+{
+  assert(transport);
+  transport->tracer(transport, message);
+}
+
+void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...)
+{
+  va_list ap;
+
+  va_start(ap, fmt);
+  pn_string_vformat(transport->scratch, fmt, ap);
+  va_end(ap);
+
+  pn_transport_log(transport, pn_string_get(transport->scratch));
+}
+
 uint32_t pn_transport_get_max_frame(pn_transport_t *transport)
 {
   return transport->local_max_frame;

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/driver.c?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/driver.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/driver.c Sat Dec 28 16:58:25 2013
@@ -143,6 +143,7 @@ struct pn_listener_t {
   int idx;
   bool pending;
   pn_socket_t fd;
+  bool closed;
   void *context;
 };
 
@@ -204,7 +205,7 @@ pn_listener_t *pn_listener(pn_driver_t *
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
   if (code) {
-    pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s\n", gai_strerror(code));
+    pn_error_format(driver->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
     return NULL;
   }
 
@@ -255,6 +256,7 @@ pn_listener_t *pn_listener_fd(pn_driver_
   l->idx = 0;
   l->pending = false;
   l->fd = fd;
+  l->closed = false;
   l->context = context;
 
   pn_driver_add_listener(driver, l);
@@ -341,9 +343,11 @@ pn_connector_t *pn_listener_accept(pn_li
 void pn_listener_close(pn_listener_t *l)
 {
   if (!l) return;
+  if (l->closed) return;
 
   if (close(l->fd) == -1)
     perror("close");
+  l->closed = true;
 }
 
 void pn_listener_free(pn_listener_t *l)
@@ -391,7 +395,7 @@ pn_connector_t *pn_connector(pn_driver_t
   struct addrinfo *addr;
   int code = getaddrinfo(host, port, NULL, &addr);
   if (code) {
-    pn_error_format(driver->error, PN_ERR, "getaddrinfo: %s", gai_strerror(code));
+    pn_error_format(driver->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
     return NULL;
   }
 
@@ -623,7 +627,11 @@ void pn_connector_process(pn_connector_t
             }
           }
         }
-      } else if (capacity < 0) {
+      }
+
+      capacity = pn_transport_capacity(transport);
+
+      if (capacity < 0) {
         c->status &= ~PN_SEL_RD;
         c->input_done = true;
       }
@@ -654,9 +662,6 @@ void pn_connector_process(pn_connector_t
             }
           } else if (n) {
             pn_transport_pop(transport, (size_t) n);
-            pending -= n;
-            if (pending == 0)
-              c->status &= ~PN_SEL_WR;
           }
         }
       } else if (pending == 0) {
@@ -818,7 +823,7 @@ static void pn_driver_rebuild(pn_driver_
           d->overflow = true;
           break;
         }
-      }   
+      }
       // if (c->fd > d->max_fds) d->max_fds = c->fd;
     }
     c = c->connector_next;

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java Sat Dec 28 16:58:25 2013
@@ -55,7 +55,7 @@ public interface Driver
      *
      * Thread-safe.
      *
-     * @param timeout maximum time in milliseconds to wait. 0 means wait indefinitely.
+     * @param timeout maximum time in milliseconds to wait. -1 means wait indefinitely.
      *
      * @param returns true if woken up
      */

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Sat Dec 28 16:58:25 2013
@@ -104,4 +104,6 @@ public interface Delivery
 
     public int pending();
 
+    public boolean isBuffered();
+
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java Sat Dec 28 16:58:25 2013
@@ -182,4 +182,6 @@ public interface Link extends Endpoint
 
     public int drained();
 
+    public int getRemoteCredit();
+    public boolean getDrain();
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java Sat Dec 28 16:58:25 2013
@@ -71,4 +71,6 @@ public interface Receiver extends Link
 
     public boolean draining();
 
+    public void setDrain(boolean drain);
+
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Sat Dec 28 16:58:25 2013
@@ -118,6 +118,14 @@ public interface Messenger
      */
     void recv(int count) throws TimeoutException;
     /**
+     * Returns the capacity of the incoming message queue of
+     * messenger. Note this count does not include those messages
+     * already available on the incoming queue (see
+     * incoming()). Rather it returns the number of incoming queue
+     * entries available for receiving messages
+     */
+    int receiving();
+    /**
      * Returns the message from the head of the incoming message
      * queue.
      */
@@ -142,7 +150,12 @@ public interface Messenger
 
     boolean stopped();
 
-    boolean work(long timeout);
+    /** Sends or receives any outstanding messages queued for a
+     * messenger.  If timeout is zero, no blocking is done.  A timeout
+     * of -1 blocks forever, otherwise timeout is the maximum time (in
+     * millisecs) to block.  Returns True if work was performed.
+     */
+    boolean work(long timeout) throws TimeoutException;
 
     void interrupt();
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java Sat Dec 28 16:58:25 2013
@@ -26,5 +26,8 @@ public enum Status
     PENDING,
     ACCEPTED,
     REJECTED,
-    MODIFIED
+    RELEASED,
+    MODIFIED,
+    ABORTED,
+    SETTLED
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/resources/proton.py?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton-api/src/main/resources/proton.py Sat Dec 28 16:58:25 2013
@@ -25,6 +25,7 @@ except NameError:
 
 from org.apache.qpid.proton import Proton, ProtonUnsupportedOperationException
 from org.apache.qpid.proton import InterruptException as Interrupt
+from org.apache.qpid.proton import TimeoutException as Timeout
 from org.apache.qpid.proton.engine import \
     Transport as JTransport, Sender as JSender, Receiver as JReceiver, \
     Sasl, SslDomain as JSslDomain, \
@@ -42,7 +43,6 @@ from org.apache.qpid.proton.amqp import 
     Decimal32, Decimal64, Decimal128
 from jarray import zeros, array
 from java.util import EnumSet, UUID as JUUID, Date as JDate, HashMap
-from java.util.concurrent import TimeoutException as Timeout
 from java.nio import ByteBuffer
 from java.lang import Character as JCharacter, String as JString, Integer as JInteger
 from java.lang import NoClassDefFoundError
@@ -63,11 +63,15 @@ class Skipped(Exception):
 PENDING = "PENDING"
 ACCEPTED = "ACCEPTED"
 REJECTED = "REJECTED"
+RELEASED = "RELEASED"
+SETTLED = "SETTLED"
 
 STATUSES = {
   Status.ACCEPTED: ACCEPTED,
   Status.REJECTED: REJECTED,
   Status.PENDING: PENDING,
+  Status.RELEASED: RELEASED,
+  Status.SETTLED: SETTLED,
   Status.UNKNOWN: None
   }
 
@@ -377,6 +381,10 @@ class Link(Endpoint):
     return wrap_link(self.impl.next(*self._enums(mask)))
 
   @property
+  def name(self):
+      return self.impl.getName()
+
+  @property
   def remote_snd_settle_mode(self):
     return self.impl.getRemoteSenderSettleMode()
 
@@ -727,6 +735,22 @@ class Transport(object):
 
   def __init__(self):
     self.impl = Proton.transport()
+    self._ssl = None
+    self._sasl = None
+
+  def __del__(self):
+    if hasattr(self, ".impl") and self.impl:
+      pn_transport_free(self.impl)
+      if hasattr(self, "_sasl") and self._sasl:
+        # pn_transport_free deallocs the C sasl associated with the
+        # transport, so erase the reference if a SASL object was used.
+        self._sasl._sasl = None
+        self._sasl = None
+      if hasattr(self, "_ssl") and self._ssl:
+        # ditto the owned c SSL object
+        self._ssl._ssl = None
+        self._ssl = None
+      del self._trans
 
   def trace(self, mask):
     # XXX: self.impl.trace(mask)
@@ -830,6 +854,18 @@ The idle timeout of the connection (in m
     #return pn_transport_get_frames_input(self._trans)
     raise ProtonUnsupportedOperationException("Transport.frames_input")
 
+  def sasl(self):
+    # SASL factory (singleton for this transport)
+    if not self._sasl:
+      self._sasl = SASL(self)
+    return self._sasl
+
+  def ssl(self, domain=None, session_details=None):
+    # SSL factory (singleton for this transport)
+    if not self._ssl:
+      self._ssl = SSL(self, domain, session_details)
+    return self._ssl
+
 class UnmappedType:
 
   def __init__(self, msg):
@@ -1321,12 +1357,20 @@ class Messenger(object):
   def recv(self, n=-1):
     self.impl.recv(n)
 
+  @property
+  def receiving(self):
+    return self.impl.receiving()
+
   def work(self, timeout=None):
     if timeout is None:
       t = -1
     else:
       t = long(1000*timeout)
-    return self.impl.work(t)
+    try:
+        err = self.impl.work(t)
+    except Timeout, e:
+        return False
+    return err
 
   def interrupt(self):
     self.impl.interrupt()
@@ -1411,6 +1455,9 @@ class Messenger(object):
     raise Skipped()
   certificate = property(_get_certificate, _set_certificate)
 
+  def buffered(self, tracker):
+    raise Skipped()
+
 
 class Message(object):
 
@@ -1594,8 +1641,13 @@ class SASL(object):
   OK = Sasl.PN_SASL_OK
   AUTH = Sasl.PN_SASL_AUTH
 
-  def __init__(self,transport):
-    self._sasl = transport.impl.sasl()
+  def __new__(cls, transport):
+    """Enforce a singleton SASL object per Transport"""
+    if not transport._sasl:
+      obj = super(SASL, cls).__new__(cls)
+      obj._sasl = transport.impl.sasl()
+      transport._sasl = obj
+    return transport._sasl
 
   def mechanisms(self, mechanisms):
     self._sasl.setMechanisms(mechanisms.split())
@@ -1681,6 +1733,29 @@ class SSLSessionDetails(object):
 
 class SSL(object):
 
+  def __new__(cls, transport, domain, session_details=None):
+    """Enforce a singleton SSL object per Transport"""
+    if transport._ssl:
+      # unfortunately, we've combined the allocation and the configuration in a
+      # single step.  So catch any attempt by the application to provide what
+      # may be a different configuration than the original (hack)
+      ssl = transport._ssl
+      if (domain and (ssl._domain is not domain) or
+          session_details and (ssl._session_details is not session_details)):
+        raise SSLException("Cannot re-configure existing SSL object!")
+    else:
+      obj = super(SSL, cls).__new__(cls)
+      obj._domain = domain
+      obj._session_details = session_details
+
+      internal_session_details = None
+      if session_details:
+        internal_session_details = session_details._session_details
+
+      obj._ssl = transport.impl.ssl(domain._domain, internal_session_details)
+      transport._ssl = obj
+    return transport._ssl
+
   def __init__(self, transport, domain, session_details=None):
 
     internal_session_details = None
@@ -1771,6 +1846,8 @@ __all__ = [
            "MANUAL",
            "PENDING",
            "REJECTED",
+           "RELEASED",
+           "SETTLED",
            "char",
            "Condition",
            "Connection",

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Sat Dec 28 16:58:25 2013
@@ -192,6 +192,8 @@ public class DriverImpl implements Drive
         {
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
+            // Disable the Nagle algorithm on TCP connections.
+            channel.socket().setTcpNoDelay(true);
             channel.connect(new InetSocketAddress(host, port));
             return createConnector(channel, context);
         }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Sat Dec 28 16:58:25 2013
@@ -373,7 +373,7 @@ public class DeliveryImpl implements Del
         _updated = true;
     }
 
-    boolean isBuffered()
+    public boolean isBuffered()
     {
         if (_remoteSettled) return false;
         if (getLink() instanceof SenderImpl) {

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Sat Dec 28 16:58:25 2013
@@ -55,7 +55,6 @@ public abstract class LinkImpl extends E
     private final LinkNode<LinkImpl> _node;
     private boolean _drain;
 
-
     LinkImpl(SessionImpl session, String name)
     {
         _session = session;
@@ -296,7 +295,7 @@ public abstract class LinkImpl extends E
         _drain = drain;
     }
 
-    boolean getDrain()
+    public boolean getDrain()
     {
         return _drain;
     }
@@ -378,4 +377,8 @@ public abstract class LinkImpl extends E
         _drained = value;
     }
 
+    public int getRemoteCredit()
+    {
+        return _credit - _queued;
+    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Sat Dec 28 16:58:25 2013
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.engine.Rec
 
 public class ReceiverImpl extends LinkImpl implements Receiver
 {
+    private boolean _drainFlagMode = true;
 
     @Override
     public boolean advance()
@@ -61,10 +62,14 @@ public class ReceiverImpl extends LinkIm
 
     public void flow(final int credits)
     {
-        modified();
         addCredit(credits);
-        setDrain(false);
         _unsentCredits += credits;
+        modified();
+        if (!_drainFlagMode)
+        {
+            setDrain(false);
+            _drainFlagMode = false;
+        }
     }
 
     int clearUnsentCredits()
@@ -122,8 +127,9 @@ public class ReceiverImpl extends LinkIm
 
     public void drain(int credit)
     {
-        flow(credit);
         setDrain(true);
+        flow(credit);
+        _drainFlagMode = false;
     }
 
     public boolean draining()
@@ -131,4 +137,11 @@ public class ReceiverImpl extends LinkIm
         return getDrain() && (getCredit() > getQueued());
     }
 
+    @Override
+    public void setDrain(boolean drain)
+    {
+        super.setDrain(drain);
+        modified();
+        _drainFlagMode = true;
+    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Sat Dec 28 16:58:25 2013
@@ -61,6 +61,12 @@ import org.apache.qpid.proton.amqp.Binar
 
 public class MessengerImpl implements Messenger
 {
+    private enum LinkCreditMode
+    {
+        // method for replenishing credit
+        LINK_CREDIT_EXPLICIT,   // recv(N)
+        LINK_CREDIT_AUTO;       // recv()
+    }
 
     private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
     private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
@@ -73,10 +79,15 @@ public class MessengerImpl implements Me
     private boolean _blocking = true;
     private long _nextTag = 1;
     private Driver _driver;
-    private int _receiving = 0;
-    private static final int _creditBatch = 1024;
-    private int _credit;
-    private int _distributed;
+    private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+    private final int _credit_batch = 1024;   // credit_mode == LINK_CREDIT_AUTO
+    private int _credit;        // available
+    private int _distributed;    // outstanding credit
+    private int _receivers;      // total # receiver Links
+    private int _draining;       // # Links in drain state
+    private List<Receiver> _credited = new ArrayList<Receiver>();
+    private List<Receiver> _blocked = new ArrayList<Receiver>();
+    private long _next_drain;
     private TrackerImpl _incomingTracker;
     private TrackerImpl _outgoingTracker;
     private Store _incomingStore = new Store();
@@ -165,7 +176,7 @@ public class MessengerImpl implements Me
         return _allClosed.test();
     }
 
-    public boolean work(long timeout)
+    public boolean work(long timeout) throws TimeoutException
     {
         if (_driver == null) { return false; }
         _worked = false;
@@ -285,10 +296,38 @@ public class MessengerImpl implements Me
         pumpOut(m.getAddress(), sender);
     }
 
+    private void reclaimLink(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            int credit = link.getCredit();
+            if (credit > 0)
+            {
+                _credit += credit;
+                _distributed -= credit;
+            }
+        }
+
+        Iterator<Delivery> dIter = link.unsettled();
+        while (dIter != null && dIter.hasNext())
+        {
+            Delivery delivery = (Delivery) dIter.next();
+            StoreEntry entry = (StoreEntry) delivery.getContext();
+            if (entry != null)
+            {
+                entry.setDelivery(null);
+                if (delivery.isBuffered())
+                    entry.setStatus(Status.ABORTED);
+            }
+        }
+        linkRemoved(link);
+    }
+
     private int pumpOut( String address, Sender sender )
     {
         StoreEntry entry = _outgoingStore.get( address );
         if (entry == null) {
+            sender.drained();
             return 0;
         }
 
@@ -342,12 +381,24 @@ public class MessengerImpl implements Me
             throw new IllegalStateException("cannot recv while messenger is stopped");
         }
 
-        if(_logger.isLoggable(Level.FINE))
+        if (_logger.isLoggable(Level.FINE) && n != -1)
         {
             _logger.fine(this + " about to wait for up to " + n + " messages to be received");
         }
 
-        _receiving = n;
+        if (n == -1)
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
+        }
+        else
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+            if (n > _distributed)
+                _credit = n - _distributed;
+            else        // cancel unallocated
+                _credit = 0;
+        }
+
         distributeCredit();
 
         waitUntil(_messageAvailable);
@@ -360,7 +411,7 @@ public class MessengerImpl implements Me
 
     public int receiving()
     {
-        return _receiving;
+        return _credit + _distributed;
     }
 
     public Message get()
@@ -375,10 +426,8 @@ public class MessengerImpl implements Me
                                                _incomingStore.trackEntry(entry));
 
             _incomingStore.freeEntry( entry );
-            _distributed--;
             return message;
         }
-
         return null;
     }
 
@@ -400,6 +449,36 @@ public class MessengerImpl implements Me
             }
             entry.setEncodedMsg( buffer, size );
             receiver.advance();
+
+            // account for the used credit, replenish if
+            // low (< 20% maximum per-link batch) and
+            // extra credit available
+            assert(_distributed > 0);
+            _distributed--;
+            if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
+            {
+                final int max = perLinkCredit();
+                final int lo_thresh = (int)(max * 0.2 + 0.5);
+                if (receiver.getRemoteCredit() < lo_thresh)
+                {
+                    final int more = Math.min(_credit, max - receiver.getRemoteCredit());
+                    _credit -= more;
+                    _distributed += more;
+                    receiver.flow(more);
+                }
+            }
+            // check if blocked
+            if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
+            {
+                _credited.remove(receiver);
+                if (receiver.getDrain())
+                {
+                    receiver.setDrain(false);
+                    assert( _draining > 0 );
+                    _draining--;
+                }
+                _blocked.add(receiver);
+            }
         }
         return 0;
     }
@@ -423,7 +502,8 @@ public class MessengerImpl implements Me
             {
                 _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
             }
-            _driver.createListener(hostName, port, null);
+            ListenerContext ctx = new ListenerContext( address.getScheme(), hostName, ports );
+            _driver.createListener(hostName, port, ctx);
         }
         else
         {
@@ -585,6 +665,8 @@ public class MessengerImpl implements Me
             Connector<?> c = l.accept();
             Connection connection = Proton.connection();
             connection.setContainer(_name);
+            ListenerContext ctx = (ListenerContext) l.getContext();
+            connection.setContext(new ConnectionContext(ctx.getService(), c));
             c.setConnection(connection);
             //TODO: SSL and full SASL
             Sasl sasl = c.sasl();
@@ -596,18 +678,10 @@ public class MessengerImpl implements Me
             }
             connection.open();
         }
-        //process active connectors, handling opened & closed connections as needed
+        // process connectors, reclaiming credit on closed connectors
         for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
         {
             _worked = true;
-            _logger.log(Level.FINE, "Processing active connector " + c);
-            try
-            {
-                c.process();
-            } catch (IOException e) {
-                _logger.log(Level.SEVERE, "Error processing connection", e);
-            }
-            processEndpoints(c);
             if (c.isClosed())
             {
                 _awaitingDestruction.add(c);
@@ -615,9 +689,12 @@ public class MessengerImpl implements Me
             }
             else
             {
+                _logger.log(Level.FINE, "Processing active connector " + c);
                 try
                 {
                     c.process();
+                    processEndpoints(c);
+                    c.process();
                 }
                 catch (IOException e)
                 {
@@ -672,6 +749,7 @@ public class MessengerImpl implements Me
             //TODO: the following is not correct; should only copy those properties that we understand
             link.setSource(link.getRemoteSource());
             link.setTarget(link.getRemoteTarget());
+            linkAdded(link);
             link.open();
             _logger.log(Level.FINE, "Opened link " + link);
         }
@@ -686,14 +764,23 @@ public class MessengerImpl implements Me
             }
         }
 
-        for (Link link : new Links(connection, ACTIVE, CLOSED))
-        {
-            link.close();
-        }
         for (Session session : new Sessions(connection, ACTIVE, CLOSED))
         {
             session.close();
         }
+
+        for (Link link : new Links(connection, ANY, CLOSED))
+        {
+            if (link.getLocalState() == EndpointState.ACTIVE)
+            {
+                link.close();
+            }
+            else
+            {
+                reclaimLink(link);
+            }
+        }
+
         if (connection.getRemoteState() == EndpointState.CLOSED)
         {
             if (connection.getLocalState() == EndpointState.ACTIVE)
@@ -729,7 +816,7 @@ public class MessengerImpl implements Me
 
         // wait until timeout expires or until test is true
         long now = System.currentTimeMillis();
-        long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
+        final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
         boolean done = false;
 
         while (true)
@@ -737,21 +824,31 @@ public class MessengerImpl implements Me
             done = condition.test();
             if (done) break;
 
-            boolean woken;
-            if ( timeout >= 0 ) {
-                long remaining = deadline - now;
+            long remaining;
+            if (timeout < 0)
+                remaining = -1;
+            else {
+                remaining = deadline - now;
                 if (remaining < 0) break;
-                woken = _driver.doWait(remaining);
-            } else {
-                woken = _driver.doWait(-1);
             }
+
+            // Update the credit scheduler. If the scheduler detects
+            // credit imbalance on the links, wake up in time to
+            // service credit drain
+            distributeCredit();
+            if (_next_drain != 0)
+            {
+                long wakeup = (_next_drain > now) ? _next_drain - now : 0;
+                remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
+            }
+
+            boolean woken;
+            woken = _driver.doWait(remaining);
             processActive();
             if (woken) {
                 throw new InterruptException();
             }
-            if (timeout >= 0) {
-                now = System.currentTimeMillis();
-            }
+            now = System.currentTimeMillis();
         }
 
         return done;
@@ -762,7 +859,8 @@ public class MessengerImpl implements Me
         for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
-            if (host.equals(connection.getRemoteContainer()) || service.equals(connection.getContext()))
+            ConnectionContext ctx = (ConnectionContext) connection.getContext();
+            if (host.equals(connection.getRemoteContainer()) || service.equals(ctx.getService()))
             {
                 return connection;
             }
@@ -774,62 +872,108 @@ public class MessengerImpl implements Me
     {
         for (Link link : new Links(connection, ANY, ANY))
         {
-            if (link instanceof Receiver && link.getCredit() > 0)
-            {
-                reclaimCredit(link.getCredit());
-            }
+            reclaimLink(link);
         }
     }
 
-    private void reclaimCredit(int credit)
-    {
-        _credit += credit;
-        _distributed -= credit;
-    }
-
     private void distributeCredit()
     {
-        int linkCt = 0;
-        // @todo track the number of opened receive links
-        for (Connector<?> c : _driver.connectors())
+        if (_receivers == 0) return;
+
+        if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
         {
-            if (c.isClosed()) continue;
-            Connection connection = c.getConnection();
-            for (Link link : new Links(connection, ACTIVE, ANY))
+            // replenish, but limit the max total messages buffered
+            final int max = _receivers * _credit_batch;
+            final int used = _distributed + incoming();
+            if (max > used)
+                _credit = max - used;
+        }
+
+        // reclaim any credit left over after draining links has completed
+        if (_draining > 0)
+        {
+            Iterator<Receiver> itr = _credited.iterator();
+            while (itr.hasNext())
             {
-                if (link instanceof Receiver) linkCt++;
+                Receiver link = (Receiver) itr.next();
+                if (link.getDrain())
+                {
+                    if (!link.draining())
+                    {
+                        // drain completed for this link
+                        int drained = link.drained();
+                        assert(_distributed >= drained);
+                        _distributed -= drained;
+                        _credit += drained;
+                        link.setDrain(false);
+                        _draining--;
+                        itr.remove();
+                        _blocked.add(link);
+                    }
+                }
             }
         }
 
-        if (linkCt == 0) return;
+        // distribute available credit to blocked links
+        final int batch = perLinkCredit();
+        while (_credit > 0 && !_blocked.isEmpty())
+        {
+            Receiver link = _blocked.get(0);
+            _blocked.remove(0);
+
+            final int more = Math.min(_credit, batch);
+            _distributed += more;
+            _credit -= more;
 
-        if (_receiving < 0)
-        {
-            _credit = linkCt * _creditBatch - incoming();
-        } else {
-            int total = _credit + _distributed;
-            if (_receiving > total)
-                _credit += _receiving - total;
+            link.flow(more);
+            _credited.add(link);
+
+            // flow changed, must process it
+            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+            try
+            {
+                ctx.getConnector().process();
+            } catch (IOException e) {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
         }
 
-        int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
-        for (Connector<?> c : _driver.connectors())
+        if (_blocked.isEmpty())
         {
-            if (c.isClosed()) continue;
-            Connection connection = c.getConnection();
-            for (Link link : new Links(connection, ACTIVE, ANY))
+            _next_drain = 0;
+        }
+        else
+        {
+            // not enough credit for all links - start draining granted credit
+            if (_draining == 0)
             {
-                if (link instanceof Receiver)
+                // don't do it too often - pace ourselves (it's expensive)
+                if (_next_drain == 0)
                 {
-                    int have = ((Receiver) link).getCredit();
-                    if (have < batch)
+                    _next_drain = System.currentTimeMillis() + 250;
+                }
+                else if (_next_drain <= System.currentTimeMillis())
+                {
+                    // initiate drain, free up at most enough to satisfy blocked
+                    _next_drain = 0;
+                    int needed = _blocked.size() * batch;
+
+                    for (Receiver link : _credited)
                     {
-                        int need = batch - have;
-                        int amount = (_credit < need) ? _credit : need;
-                        ((Receiver) link).flow(amount);
-                        _credit -= amount;
-                        _distributed += amount;
-                        if (_credit == 0) return;
+                        if (!link.getDrain()) {
+                            link.setDrain(true);
+                            needed -= link.getRemoteCredit();
+                            _draining++;
+                            // drain requested on link, must process it
+                            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+                            try
+                            {
+                                ctx.getConnector().process();
+                            } catch (IOException e) {
+                                _logger.log(Level.SEVERE, "Error processing connection", e);
+                            }
+                            if (needed <= 0) break;
+                        }
                     }
                 }
             }
@@ -1058,7 +1202,7 @@ public class MessengerImpl implements Me
             connection = Proton.connection();
             connection.setContainer(_name);
             connection.setHostname(host);
-            connection.setContext(service);
+            connection.setContext(new ConnectionContext(service, connector));
             connector.setConnection(connection);
             Sasl sasl = connector.sasl();
             if (sasl != null)
@@ -1077,6 +1221,7 @@ public class MessengerImpl implements Me
         Session session = connection.session();
         session.open();
         C link = finder.create(session);
+        linkAdded(link);
         link.open();
         return link;
     }
@@ -1232,4 +1377,97 @@ public class MessengerImpl implements Me
         return builder.toString();
     }
 
+    // compute the maximum amount of credit each receiving link is
+    // entitled to.  The actual credit given to the link depends on
+    // what amount of credit is actually available.
+    private int perLinkCredit()
+    {
+        if (_receivers == 0) return 0;
+        int total = _credit + _distributed;
+        return Math.max(total/_receivers, 1);
+    }
+
+    // a new link has been created, account for it.
+    private void linkAdded(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            _receivers++;
+            _blocked.add((Receiver)link);
+        }
+    }
+
+    // a link is being removed, account for it.
+    private void linkRemoved(Link _link)
+    {
+        if (_link instanceof Receiver)
+        {
+            Receiver link = (Receiver)_link;
+            assert _receivers > 0;
+            _receivers--;
+            if (link.getDrain())
+            {
+                link.setDrain(false);
+                assert _draining > 0;
+                _draining--;
+            }
+            if (_blocked.contains(link))
+                _blocked.remove(link);
+            else if (_credited.contains(link))
+                _credited.remove(link);
+            else
+                assert(false);
+        }
+    }
+
+    private class ConnectionContext
+    {
+        private String _service;
+        private Connector _connector;
+
+        public ConnectionContext(String service, Connector connector)
+        {
+            _service = service;
+            _connector = connector;
+        }
+
+        public String getService()
+        {
+            return _service;
+        }
+
+        public Connector getConnector()
+        {
+            return _connector;
+        }
+    }
+
+    private class ListenerContext
+    {
+        private String _host;
+        private String _port;
+        private String _service;  // for now. move to subscription later
+
+        public ListenerContext(String service, String host, String port)
+        {
+            _service = service;
+            _host = host;
+            _port = port;
+        }
+
+        public String getService()
+        {
+            return _service;
+        }
+
+        public String getHost()
+        {
+            return _host;
+        }
+
+        public String getPort()
+        {
+            return _port;
+        }
+    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java Sat Dec 28 16:58:25 2013
@@ -123,6 +123,10 @@ class StoreEntry
         return _status;
     }
 
+    public void setStatus(Status status)
+    {
+        _status = status;
+    }
 
     private static Status _disp2status(DeliveryState disp)
     {
@@ -135,7 +139,7 @@ class StoreEntry
         if (disp instanceof Rejected)
             return Status.REJECTED;
         if (disp instanceof Released)
-            return Status.PENDING;
+            return Status.RELEASED;
         if (disp instanceof Modified)
             return Status.MODIFIED;
         assert(false);
@@ -152,7 +156,12 @@ class StoreEntry
             }
             else if (_delivery.remotelySettled())
             {
-                _status = _disp2status(_delivery.getLocalState());
+                DeliveryState disp = _delivery.getLocalState();
+                if (disp == null) {
+                    _status = Status.SETTLED;
+                } else {
+                    _status = _disp2status(_delivery.getLocalState());
+                }
             }
             else
             {

Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/engine.py?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/engine.py Sat Dec 28 16:58:25 2013
@@ -68,6 +68,7 @@ class Test(common.Test):
       c1._transport.max_frame_size = max_frame[0]
       c2._transport.max_frame_size = max_frame[1]
     if idle_timeout:
+      # idle_timeout in seconds expressed as float
       c1._transport.idle_timeout = idle_timeout[0]
       c2._transport.idle_timeout = idle_timeout[1]
     c1.open()
@@ -387,6 +388,7 @@ class LinkTest(Test):
 
   def test_multiple(self):
     rcv = self.snd.session.receiver("second-rcv")
+    assert rcv.name == "second-rcv"
     self.snd.open()
     rcv.open()
     self.pump()
@@ -866,16 +868,16 @@ class IdleTimeoutTest(Test):
     Verify the configuration and negotiation of the idle timeout.
     """
 
-    self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,2000])
+    self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0,2.0])
     self.c1 = self.snd.session.connection
     self.c2 = self.rcv.session.connection
     self.snd.open()
     self.rcv.open()
     self.pump()
-    assert self.rcv.session.connection._transport.idle_timeout == 2000
-    assert self.rcv.session.connection._transport.remote_idle_timeout == 1000
-    assert self.snd.session.connection._transport.idle_timeout == 1000
-    assert self.snd.session.connection._transport.remote_idle_timeout == 2000
+    assert self.rcv.session.connection._transport.idle_timeout == 2.0
+    assert self.rcv.session.connection._transport.remote_idle_timeout == 1.0
+    assert self.snd.session.connection._transport.idle_timeout == 1.0
+    assert self.snd.session.connection._transport.remote_idle_timeout == 2.0
 
   def testTimeout(self):
     """
@@ -883,7 +885,7 @@ class IdleTimeoutTest(Test):
     """
 
     # snd will timeout the Connection if no frame is received within 1000 ticks
-    self.snd, self.rcv = self.link("test-link", idle_timeout=[1000,0])
+    self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0,0])
     self.c1 = self.snd.session.connection
     self.c2 = self.rcv.session.connection
     self.snd.open()
@@ -892,32 +894,32 @@ class IdleTimeoutTest(Test):
 
     t_snd = self.snd.session.connection._transport
     t_rcv = self.rcv.session.connection._transport
-    assert t_rcv.idle_timeout == 0
-    assert t_rcv.remote_idle_timeout == 1000
-    assert t_snd.idle_timeout == 1000
-    assert t_snd.remote_idle_timeout == 0
+    assert t_rcv.idle_timeout == 0.0
+    assert t_rcv.remote_idle_timeout == 1.0
+    assert t_snd.idle_timeout == 1.0
+    assert t_snd.remote_idle_timeout == 0.0
 
     sndr_frames_in = t_snd.frames_input
     rcvr_frames_out = t_rcv.frames_output
 
-    # at t+1, nothing should happen:
-    clock = 1
-    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
-    assert t_rcv.tick(clock) == 501,  "deadline to send keepalive"
+    # at t+1msec, nothing should happen:
+    clock = 0.001
+    assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 0.501,  "deadline to send keepalive"
     self.pump()
     assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
 
     # at one tick from expected idle frame send, nothing should happen:
-    clock = 500
-    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
-    assert t_rcv.tick(clock) == 501,  "deadline to send keepalive"
+    clock = 0.500
+    assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 0.501,  "deadline to send keepalive"
     self.pump()
     assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
 
     # this should cause rcvr to expire and send a keepalive
-    clock = 502
-    assert t_snd.tick(clock) == 1001, "deadline for remote timeout"
-    assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+    clock = 0.502
+    assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 1.002, "deadline to send keepalive"
     self.pump()
     sndr_frames_in += 1
     rcvr_frames_out += 1
@@ -926,14 +928,14 @@ class IdleTimeoutTest(Test):
 
     # since a keepalive was received, sndr will rebase its clock against this tick:
     # and the receiver should not change its deadline
-    clock = 503
-    assert t_snd.tick(clock) == 1503, "deadline for remote timeout"
-    assert t_rcv.tick(clock) == 1002, "deadline to send keepalive"
+    clock = 0.503
+    assert t_snd.tick(clock) == 1.503, "deadline for remote timeout"
+    assert t_rcv.tick(clock) == 1.002, "deadline to send keepalive"
     self.pump()
     assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
 
     # now expire sndr
-    clock = 1504
+    clock = 1.504
     t_snd.tick(clock)
     try:
       self.pump()
@@ -1642,7 +1644,7 @@ class ServerTest(Test):
     self.server.start()
     self.driver = Driver()
     self.cxtr = self.driver.connector(self.server.host, self.server.port)
-    self.cxtr.transport.idle_timeout = int(idle_timeout_secs * 1000)  #msecs
+    self.cxtr.transport.idle_timeout = idle_timeout_secs
     self.cxtr.sasl().mechanisms("ANONYMOUS")
     self.cxtr.sasl().client()
     self.conn = Connection()
@@ -1659,7 +1661,7 @@ class ServerTest(Test):
     while self.conn.state != (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE) \
           and time() <= deadline:
       self.cxtr.process()
-      self.driver.wait(1)
+      self.driver.wait(0.001)
       self.cxtr.process()
 
     assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection failed"
@@ -1670,7 +1672,7 @@ class ServerTest(Test):
     deadline = time() + duration
     while time() <= deadline:
       self.cxtr.process()
-      self.driver.wait(1)
+      self.driver.wait(0.001)
       self.cxtr.process()
 
     assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated"
@@ -1683,7 +1685,7 @@ class ServerTest(Test):
     arrive in a timely manner.
     """
     idle_timeout_secs = self.delay
-    self.server = common.TestServerDrain(idle_timeout=int(idle_timeout_secs * 1000))
+    self.server = common.TestServerDrain(idle_timeout=idle_timeout_secs)
     self.server.start()
     self.driver = Driver()
     self.cxtr = self.driver.connector(self.server.host, self.server.port)
@@ -1699,7 +1701,7 @@ class ServerTest(Test):
     while self.conn.state != (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE) \
           and time() <= deadline:
       self.cxtr.process()
-      self.driver.wait(int(self.timeout * 1000))
+      self.driver.wait(self.timeout)
       self.cxtr.process()
 
     assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection failed"
@@ -1711,7 +1713,7 @@ class ServerTest(Test):
     deadline = time() + duration
     while time() <= deadline:
       self.cxtr.process()
-      self.driver.wait(int(10 * duration * 1000))
+      self.driver.wait(10 * duration)
       self.cxtr.process()
 
     assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated"
@@ -1726,7 +1728,7 @@ class ServerTest(Test):
     deadline = time() + self.timeout
     while (self.conn.state & Endpoint.REMOTE_ACTIVE) and time() <= deadline:
       self.cxtr.process()
-      self.driver.wait(int(self.timeout*1000))
+      self.driver.wait(self.timeout)
       self.cxtr.process()
 
     assert self.conn.state & Endpoint.REMOTE_CLOSED, "Connection failed to close"

Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py Sat Dec 28 16:58:25 2013
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -314,13 +314,13 @@ class MessengerTest(Test):
       self.client.recv(2*size - len(trackers))
       while self.client.incoming:
         t = self.client.get(msg)
-        assert self.client.status(t) is PENDING, (t, self.client.status(t))
+        assert self.client.status(t) is SETTLED, (t, self.client.status(t))
         trackers.append(t)
 
     for t in trackers[:size]:
       assert self.client.status(t) is None, (t, self.client.status(t))
     for t in trackers[size:]:
-      assert self.client.status(t) is PENDING, (t, self.client.status(t))
+      assert self.client.status(t) is SETTLED, (t, self.client.status(t))
 
     self.client.accept()
 
@@ -332,6 +332,33 @@ class MessengerTest(Test):
   def testIncomingQueueBiggerThanSessionWindow(self):
     self.testIncomingQueueBiggerThanWindow(2048)
 
+  def testBuffered(self):
+    self.client.outgoing_window = 1000
+    self.client.incoming_window = 1000
+    self.start();
+    assert self.server_received == 0
+    buffering = 0
+    count = 100
+    for i in range(count):
+      msg = Message()
+      msg.address="amqp://0.0.0.0:12345"
+      msg.subject="Hello World!"
+      msg.body = "First the world, then the galaxy!"
+      t = self.client.put(msg)
+      buffered = self.client.buffered(t)
+      # allow transition from False to True, but not back
+      if buffered:
+          buffering += 1
+      else:
+        assert not buffering, ("saw %s buffered deliveries before?" % buffering)
+
+    while self.client.outgoing:
+        last = self.client.outgoing
+        self.client.send()
+        print "sent ", last - self.client.outgoing
+
+    assert self.server_received == count
+
   def test_proton222(self):
     self.start()
     msg = Message()
@@ -613,9 +640,6 @@ class MessengerTest(Test):
     """ The server is given a fixed amount of credit, and runs until that
     credit is exhausted.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditBlockingRebalance - credit scheduler TBD for Java Messenger")
-
     self.server_finite_credit = True
     self.server_credit = 11
     self.start()
@@ -767,9 +791,6 @@ class NBMessengerTest(common.Test):
     """ Verify that a fixed amount of credit will redistribute to new
     links.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditRedistribution - credit scheduler TBD for Java Messenger")
-
     self.server.recv( 5 )
 
     # first link will get all credit
@@ -802,9 +823,6 @@ class NBMessengerTest(common.Test):
     """ Verify that credit is reclaimed when a link with outstanding credit is
     torn down.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditReclaim - credit scheduler TBD for Java Messenger")
-
     self.server.recv( 9 )
 
     # first link will get all credit
@@ -866,15 +884,10 @@ class NBMessengerTest(common.Test):
     assert self.server.incoming == 9, self.server.incoming
     assert self.server.receiving == 0, self.server.receiving
 
-
-
   def testCreditReplenish(self):
     """ When extra credit is available it should be granted to the first
     link that can use it.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditReplenish - credit scheduler TBD for Java Messenger")
-
     # create three links
     msg = Message()
     for i in range(3):

Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/sasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/sasl.py?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/sasl.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/sasl.py Sat Dec 28 16:58:25 2013
@@ -168,3 +168,16 @@ class SaslTest(Test):
       out = self.t1.output(1024)
 
     assert self.s1.outcome == SASL.OK, self.s1.outcome
+
+  def test_singleton(self):
+      """Verify that only a single instance of SASL can exist per Transport"""
+      transport = Transport()
+      sasl1 = SASL(transport)
+      sasl2 = transport.sasl()
+      sasl3 = SASL(transport)
+      assert sasl1 is sasl2
+      assert sasl1 is sasl3
+      transport = Transport()
+      sasl1 = transport.sasl()
+      sasl2 = SASL(transport)
+      assert sasl1 is sasl2

Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/ssl.py?rev=1553874&r1=1553873&r2=1553874&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/ssl.py Sat Dec 28 16:58:25 2013
@@ -770,6 +770,24 @@ class SslTest(common.Test):
         # self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
         # self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER )
 
+    def test_singleton(self):
+        """Verify that only a single instance of SSL can exist per Transport"""
+        transport = Transport()
+        ssl1 = SSL(transport, self.client_domain)
+        ssl2 = transport.ssl(self.client_domain)
+        ssl3 = transport.ssl(self.client_domain)
+        assert ssl1 is ssl2
+        assert ssl1 is ssl3
+        transport = Transport()
+        ssl1 = transport.ssl(self.client_domain)
+        ssl2 = SSL(transport, self.client_domain)
+        assert ssl1 is ssl2
+        # catch attempt to re-configure existing SSL
+        try:
+            ssl3 = SSL(transport, self.server_domain)
+            assert False, "Expected error did not occur!"
+        except SSLException, e:
+            pass
 
 class MessengerSSLTests(common.Test):
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message