qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [04/35] qpid-proton git commit: PROTON-749: Refactoring of io layers: - Eliminate some unnecessary stuff. - Make pn_io_layer_t a pure interface. - Simplify amqp header code; remove header_count member from pn_transport_t
Date Wed, 26 Nov 2014 20:05:51 GMT
PROTON-749: Refactoring of io layers:
- Eliminate some unnecessary stuff.
- Make pn_io_layer_t a pure interface.
- Simplify amqp header code; remove header_count member from pn_transport_t


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c814d5c3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c814d5c3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c814d5c3

Branch: refs/heads/examples
Commit: c814d5c39147afa642e95e1b1ad51650b04d9739
Parents: 2794da5
Author: Andrew Stitcher <astitcher@apache.org>
Authored: Wed Aug 6 17:57:56 2014 -0400
Committer: Andrew Stitcher <astitcher@apache.org>
Committed: Mon Nov 17 14:55:03 2014 -0500

----------------------------------------------------------------------
 proton-c/src/engine/engine-internal.h |  20 ++-
 proton-c/src/sasl/sasl.c              | 128 +++++++++++-------
 proton-c/src/ssl/openssl.c            | 143 +++++++++++---------
 proton-c/src/transport/transport.c    | 204 ++++++++++++++---------------
 proton-c/src/windows/schannel.c       | 155 ++++++++++++----------
 5 files changed, 354 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index dd4c44e..86f5161 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -100,18 +100,16 @@ typedef struct {
 #include <proton/ssl.h>
 
 typedef struct pn_io_layer_t {
-  void *context;
-  struct pn_io_layer_t *next;
-  ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
-  ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
-  pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
-  size_t (*buffered_output)(struct pn_io_layer_t *);  // how much output is held
-  size_t (*buffered_input)(struct pn_io_layer_t *);   // how much input is held
+  ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t);
+  ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t);
+  pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+  size_t (*buffered_output)(struct pn_transport_t *);  // how much output is held
 } pn_io_layer_t;
 
+extern const pn_io_layer_t pni_passthru_layer;
+
 struct pn_transport_t {
   pn_tracer_t tracer;
-  size_t header_count;
   pn_sasl_t *sasl;
   pn_ssl_t *ssl;
   pn_connection_t *connection;  // reference counted
@@ -134,7 +132,7 @@ struct pn_transport_t {
 #define PN_IO_SASL 1
 #define PN_IO_AMQP 2
 #define PN_IO_LAYER_CT (PN_IO_AMQP+1)
-  pn_io_layer_t io_layers[PN_IO_LAYER_CT];
+  const pn_io_layer_t *io_layers[PN_IO_LAYER_CT];
 
   /* dead remote detection */
   pn_millis_t local_idle_timeout;
@@ -302,9 +300,7 @@ void pn_link_dump(pn_link_t *link);
 void pn_dump(pn_connection_t *conn);
 void pn_transport_sasl_init(pn_transport_t *transport);
 
-ssize_t pn_io_layer_input_passthru(pn_io_layer_t *, const char *, size_t );
-ssize_t pn_io_layer_output_passthru(pn_io_layer_t *, char *, size_t );
-pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *, pn_timestamp_t);
+pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *, unsigned int, pn_timestamp_t);
 
 void pn_condition_init(pn_condition_t *condition);
 void pn_condition_tini(pn_condition_t *condition);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index a82ec02..97bead4 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -36,7 +36,6 @@
 
 struct pn_sasl_t {
   pn_transport_t *transport;
-  pn_io_layer_t *io_layer;
   pn_dispatcher_t *disp;
   char *mechanisms;
   char *remote_mechanisms;
@@ -50,12 +49,42 @@ struct pn_sasl_t {
   bool rcvd_init;
   bool sent_done;
   bool rcvd_done;
+  bool input_bypass;
+  bool output_bypass;
 };
 
-static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t size);
+static ssize_t pn_output_write_sasl(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+
+const pn_io_layer_t sasl_headers_layer = {
+    pn_input_read_sasl_header,
+    pn_output_write_sasl_header,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t sasl_write_header_layer = {
+    pn_input_read_sasl,
+    pn_output_write_sasl_header,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t sasl_read_header_layer = {
+    pn_input_read_sasl_header,
+    pn_output_write_sasl,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t sasl_layer = {
+    pn_input_read_sasl,
+    pn_output_write_sasl,
+    pn_io_layer_tick_passthru,
+    NULL
+};
 
 pn_sasl_t *pn_sasl(pn_transport_t *transport)
 {
@@ -76,14 +105,12 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
     sasl->rcvd_init = false;
     sasl->sent_done = false;
     sasl->rcvd_done = false;
+    sasl->input_bypass = false;
+    sasl->output_bypass = false;
 
     transport->sasl = sasl;
     sasl->transport = transport;
-    sasl->io_layer = &transport->io_layers[PN_IO_SASL];
-    sasl->io_layer->context = sasl;
-    sasl->io_layer->process_input = pn_input_read_sasl_header;
-    sasl->io_layer->process_output = pn_output_write_sasl_header;
-    sasl->io_layer->process_tick = pn_io_layer_tick_passthru;
+    transport->io_layers[PN_IO_SASL] = &sasl_headers_layer;
   }
 
   return transport->sasl;
@@ -404,9 +431,9 @@ int pn_do_outcome(pn_dispatcher_t *disp)
 #define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
 #define SASL_HEADER_LEN 8
 
-static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  pn_sasl_t *sasl = transport->sasl;
   if (available > 0) {
     if (available < SASL_HEADER_LEN) {
       if (memcmp(bytes, SASL_HEADER, available) == 0 ||
@@ -414,20 +441,22 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by
         return 0;
     } else {
       if (memcmp(bytes, SASL_HEADER, SASL_HEADER_LEN) == 0) {
-        sasl->io_layer->process_input = pn_input_read_sasl;
+        if (transport->io_layers[layer] == &sasl_read_header_layer) {
+          transport->io_layers[layer] = &sasl_layer;
+        } else {
+          transport->io_layers[layer] = &sasl_write_header_layer;
+        }
         if (sasl->disp->trace & PN_TRACE_FRM)
-          pn_transport_logf(sasl->transport, "  <- %s", "SASL");
+          pn_transport_logf(transport, "  <- %s", "SASL");
         return SASL_HEADER_LEN;
       }
       if (memcmp(bytes, AMQP_HEADER, SASL_HEADER_LEN) == 0) {
         if (sasl->allow_skip) {
           sasl->outcome = PN_SASL_SKIPPED;
-          sasl->io_layer->process_input = pn_io_layer_input_passthru;
-          sasl->io_layer->process_output = pn_io_layer_output_passthru;
-          pn_io_layer_t *io_next = sasl->io_layer->next;
-          return io_next->process_input( io_next, bytes, available );
+          transport->io_layers[layer] = &pni_passthru_layer;
+          return pni_passthru_layer.process_input(transport, layer, bytes, available);
         } else {
-            pn_do_error(sasl->transport, "amqp:connection:policy-error",
+            pn_do_error(transport, "amqp:connection:policy-error",
                         "Client skipped SASL exchange - forbidden");
             return PN_EOS;
         }
@@ -436,50 +465,57 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by
   }
   char quoted[1024];
   pn_quote_data(quoted, 1024, bytes, available);
-  pn_do_error(sasl->transport, "amqp:connection:framing-error",
+  pn_do_error(transport, "amqp:connection:framing-error",
               "%s header mismatch: '%s'", "SASL", quoted);
   return PN_EOS;
 }
 
-static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_sasl(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
-  ssize_t n = pn_sasl_input(sasl, bytes, available);
-  if (n == PN_EOS) {
-    sasl->io_layer->process_input = pn_io_layer_input_passthru;
-    pn_io_layer_t *io_next = sasl->io_layer->next;
-    return io_next->process_input( io_next, bytes, available );
+  pn_sasl_t *sasl = transport->sasl;
+  if (!sasl->input_bypass) {
+    ssize_t n = pn_sasl_input(sasl, bytes, available);
+    if (n != PN_EOS) return n;
+
+    sasl->input_bypass = true;
+    if (sasl->output_bypass)
+        transport->io_layers[layer] = &pni_passthru_layer;
   }
-  return n;
+  return pni_passthru_layer.process_input(transport, layer, bytes, available );
 }
 
-static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t size)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  pn_sasl_t *sasl = transport->sasl;
   if (sasl->disp->trace & PN_TRACE_FRM)
-    pn_transport_logf(sasl->transport, "  -> %s", "SASL");
+    pn_transport_logf(transport, "  -> %s", "SASL");
   assert(size >= SASL_HEADER_LEN);
   memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
-  sasl->io_layer->process_output = pn_output_write_sasl;
+  if (transport->io_layers[layer]==&sasl_write_header_layer) {
+      transport->io_layers[layer] = &sasl_layer;
+  } else {
+      transport->io_layers[layer] = &sasl_read_header_layer;
+  }
   return SASL_HEADER_LEN;
 }
 
-static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_sasl(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
 {
-  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
-  // this accounts for when pn_do_error is invoked, e.g. by idle timeout
-  ssize_t n;
-  if (sasl->transport->close_sent) {
-    n = PN_EOS;
-  } else {
-    n = pn_sasl_output(sasl, bytes, size);
-  }
+  pn_sasl_t *sasl = transport->sasl;
+  if (!sasl->output_bypass) {
+    // this accounts for when pn_do_error is invoked, e.g. by idle timeout
+    ssize_t n;
+    if (transport->close_sent) {
+        n = PN_EOS;
+    } else {
+        n = pn_sasl_output(sasl, bytes, available);
+    }
+    if (n != PN_EOS) return n;
 
-  if (n == PN_EOS) {
-    sasl->io_layer->process_output = pn_io_layer_output_passthru;
-    pn_io_layer_t *io_next = sasl->io_layer->next;
-    return io_next->process_output( io_next, bytes, size );
+    sasl->output_bypass = true;
+    if (sasl->input_bypass)
+        transport->io_layers[layer] = &pni_passthru_layer;
   }
-  return n;
+  return pni_passthru_layer.process_output(transport, layer, bytes, available );
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/ssl/openssl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c
index dd1b88b..a763cfb 100644
--- a/proton-c/src/ssl/openssl.c
+++ b/proton-c/src/ssl/openssl.c
@@ -87,9 +87,7 @@ struct pn_ssl_domain_t {
 
 
 struct pn_ssl_t {
-
   pn_transport_t   *transport;
-  pn_io_layer_t    *io_layer;
   pn_ssl_domain_t  *domain;
   const char    *session_id;
   const char *peer_hostname;
@@ -134,19 +132,18 @@ struct pn_ssl_session_t {
 
 /* */
 static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata);
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
 static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
 static int init_ssl_socket( pn_ssl_t * );
 static void release_ssl_socket( pn_ssl_t * );
 static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
 static void ssl_session_free( pn_ssl_session_t *);
-static size_t buffered_output( pn_io_layer_t *io_layer );
-static size_t buffered_input( pn_io_layer_t *io_layer );
+static size_t buffered_output( pn_transport_t *transport );
 
 // @todo: used to avoid littering the code with calls to printf...
 static void _log_error(pn_ssl_t *ssl, const char *fmt, ...)
@@ -670,6 +667,40 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
   return 0;
 }
 
+const pn_io_layer_t unknown_layer = {
+    process_input_unknown,
+    process_output_unknown,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t ssl_layer = {
+    process_input_ssl,
+    process_output_ssl,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_input_closed_layer = {
+    process_input_done,
+    process_output_ssl,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_output_closed_layer = {
+    process_input_ssl,
+    process_output_done,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_closed_layer = {
+    process_input_done,
+    process_output_done,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
 
 int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
 {
@@ -678,13 +709,10 @@ int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
   ssl->domain = domain;
   domain->ref_count++;
   if (domain->allow_unsecured) {
-    ssl->io_layer->process_input = process_input_unknown;
-    ssl->io_layer->process_output = process_output_unknown;
+    ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer;
   } else {
-    ssl->io_layer->process_input = process_input_ssl;
-    ssl->io_layer->process_output = process_output_ssl;
+    ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer;
   }
-
   if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
     ssl->session_id = pn_strdup(session_id);
 
@@ -773,13 +801,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport)
   ssl->transport = transport;
   transport->ssl = ssl;
 
-  ssl->io_layer = &transport->io_layers[PN_IO_SSL];
-  ssl->io_layer->context = ssl;
-  ssl->io_layer->process_input = pn_io_layer_input_passthru;
-  ssl->io_layer->process_output = pn_io_layer_output_passthru;
-  ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
-  ssl->io_layer->buffered_output = buffered_output;
-  ssl->io_layer->buffered_input = buffered_input;
+  transport->io_layers[PN_IO_SSL] = &pni_passthru_layer;
 
   ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
 
@@ -823,11 +845,9 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
 
 
 
-static int setup_ssl_connection( pn_ssl_t *ssl )
+static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer)
 {
-  _log( ssl, "SSL connection detected.");
-  ssl->io_layer->process_input = process_input_ssl;
-  ssl->io_layer->process_output = process_output_ssl;
+  transport->io_layers[layer] = &ssl_layer;
   return 0;
 }
 
@@ -836,9 +856,9 @@ static int setup_ssl_connection( pn_ssl_t *ssl )
 
 // take data from the network, and pass it into SSL.  Attempt to read decrypted data from
 // SSL socket and pass it to the application.
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
   _log( ssl, "process_input_ssl( data size=%d )",available );
@@ -910,8 +930,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
 
     if (!ssl->app_input_closed) {
       if (ssl->in_count > 0 || ssl->ssl_closed) {  /* if ssl_closed, send 0 count */
-        pn_io_layer_t *io_next = ssl->io_layer->next;
-        ssize_t consumed = io_next->process_input( io_next, ssl->inbuf, ssl->in_count);
+        ssize_t consumed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->inbuf, ssl->in_count);
         if (consumed > 0) {
           ssl->in_count -= consumed;
           if (ssl->in_count)
@@ -973,15 +992,19 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat
   //}
   if (ssl->app_input_closed && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) ) {
     consumed = ssl->app_input_closed;
-    ssl->io_layer->process_input = process_input_done;
+    if (transport->io_layers[layer]==&ssl_output_closed_layer) {
+      transport->io_layers[layer] = &ssl_closed_layer;
+    } else {
+      transport->io_layers[layer] = &ssl_input_closed_layer;
+    }
   }
   _log(ssl, "process_input_ssl() returning %d", (int) consumed);
   return consumed;
 }
 
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   if (!ssl) return PN_EOS;
   if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
@@ -993,8 +1016,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
     // first, get any pending application output, if possible
 
     if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) {
-      pn_io_layer_t *io_next = ssl->io_layer->next;
-      ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
+      ssize_t app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count);
       if (app_bytes > 0) {
         ssl->out_count += app_bytes;
         work_pending = true;
@@ -1086,7 +1108,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
   //}
   if (written == 0 && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) && BIO_pending(ssl->bio_net_io) == 0) {
     written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
-    ssl->io_layer->process_output = process_output_done;
+    if (transport->io_layers[layer]==&ssl_input_closed_layer) {
+      transport->io_layers[layer] = &ssl_closed_layer;
+    } else {
+      transport->io_layers[layer] = &ssl_output_closed_layer;
+    }
   }
   _log(ssl, "process_output_ssl() returning %d", (int) written);
   return written;
@@ -1169,33 +1195,34 @@ static void release_ssl_socket( pn_ssl_t *ssl )
 }
 
 
-static int setup_cleartext_connection( pn_ssl_t *ssl )
+static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer)
 {
-  _log( ssl, "Cleartext connection detected.");
-  ssl->io_layer->process_input = pn_io_layer_input_passthru;
-  ssl->io_layer->process_output = pn_io_layer_output_passthru;
+  transport->io_layers[layer] = &pni_passthru_layer;
   return 0;
 }
 
 
 // until we determine if the client is using SSL or not:
 
-static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   switch (check_for_ssl_connection( input_data, len )) {
   case SSL_CONNECTION:
-    setup_ssl_connection( ssl );
-    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+    _log( ssl, "SSL connection detected.\n");
+    setup_ssl_connection(transport, layer);
+    break;
   case CLEAR_CONNECTION:
-    setup_cleartext_connection( ssl );
-    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+    _log( ssl, "Cleartext connection detected.\n");
+    setup_cleartext_connection(transport, layer);
+    break;
   default:
     return 0;
   }
+  return transport->io_layers[layer]->process_input(transport, layer, input_data, len );
 }
 
-static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
 {
   // do not do output until we know if SSL is used or not
   return 0;
@@ -1307,20 +1334,20 @@ int pn_ssl_get_peer_hostname( pn_ssl_t *ssl, char *hostname, size_t *bufsize )
   return 0;
 }
 
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
 {
   return PN_EOS;
 }
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
 {
   return PN_EOS;
 }
 
 // return # output bytes sitting in this layer
-static size_t buffered_output(pn_io_layer_t *io_layer)
+static size_t buffered_output(pn_transport_t *transport)
 {
   size_t count = 0;
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   if (ssl) {
     count += ssl->out_count;
     if (ssl->bio_net_io) { // pick up any bytes waiting for network io
@@ -1329,17 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer)
   }
   return count;
 }
-
-// return # input bytes sitting in this layer
-static size_t buffered_input( pn_io_layer_t *io_layer )
-{
-  size_t count = 0;
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (ssl) {
-    count += ssl->in_count;
-    if (ssl->bio_ssl) { // pick up any bytes waiting to be read
-      count += BIO_ctrl_pending(ssl->bio_ssl);
-    }
-  }
-  return count;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 601d6a2..d93e16f 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -92,17 +92,55 @@ void pn_delivery_map_clear(pn_delivery_map_t *dm)
   dm->next = 0;
 }
 
-static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available);
-static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available);
-static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now);
+static ssize_t pn_io_layer_input_passthru(pn_transport_t *, unsigned int, const char *, size_t );
+static ssize_t pn_io_layer_output_passthru(pn_transport_t *, unsigned int, char *, size_t );
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available);
+static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now);
 
 static void pni_default_tracer(pn_transport_t *transport, const char *message)
 {
   fprintf(stderr, "[%p]:%s\n", (void *) transport, message);
 }
 
+const pn_io_layer_t pni_passthru_layer = {
+    pn_io_layer_input_passthru,
+    pn_io_layer_output_passthru,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t amqp_header_layer = {
+    pn_input_read_amqp_header,
+    pn_output_write_amqp_header,
+    pn_tick_amqp,
+    NULL
+};
+
+const pn_io_layer_t amqp_write_header_layer = {
+    pn_input_read_amqp,
+    pn_output_write_amqp_header,
+    pn_tick_amqp,
+    NULL
+};
+
+const pn_io_layer_t amqp_read_header_layer = {
+    pn_input_read_amqp_header,
+    pn_output_write_amqp,
+    pn_tick_amqp,
+    NULL
+};
+
+const pn_io_layer_t amqp_layer = {
+    pn_input_read_amqp,
+    pn_output_write_amqp,
+    pn_tick_amqp,
+    NULL
+};
+
 static void pn_transport_initialize(void *object)
 {
   pn_transport_t *transport = (pn_transport_t *)object;
@@ -112,33 +150,16 @@ static void pn_transport_initialize(void *object)
   transport->input_buf = NULL;
   transport->input_size =  PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024;
   transport->tracer = pni_default_tracer;
-  transport->header_count = 0;
   transport->sasl = NULL;
   transport->ssl = NULL;
   transport->scratch = pn_string(NULL);
   transport->disp = pn_dispatcher(0, transport);
   transport->connection = NULL;
 
-  pn_io_layer_t *io_layer = transport->io_layers;
-  while (io_layer != &transport->io_layers[PN_IO_AMQP]) {
-    io_layer->context = NULL;
-    io_layer->next = io_layer + 1;
-    io_layer->process_input = pn_io_layer_input_passthru;
-    io_layer->process_output = pn_io_layer_output_passthru;
-    io_layer->process_tick = pn_io_layer_tick_passthru;
-    io_layer->buffered_output = NULL;
-    io_layer->buffered_input = NULL;
-    ++io_layer;
-  }
-
-  pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP];
-  amqp->context = transport;
-  amqp->process_input = pn_input_read_amqp_header;
-  amqp->process_output = pn_output_write_amqp_header;
-  amqp->process_tick = pn_io_layer_tick_passthru;
-  amqp->buffered_output = NULL;
-  amqp->buffered_input = NULL;
-  amqp->next = NULL;
+  for (int layer=0; layer<PN_IO_LAYER_CT; ++layer) {
+    transport->io_layers[layer] = &pni_passthru_layer;
+  }
+  transport->io_layers[PN_IO_AMQP] = &amqp_header_layer;
 
   transport->open_sent = false;
   transport->open_rcvd = false;
@@ -550,8 +571,6 @@ int pn_do_open(pn_dispatcher_t *disp)
   } else {
     transport->disp->halt = true;
   }
-  if (transport->remote_idle_timeout)
-    transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;  // enable timeouts
   transport->open_rcvd = true;
   return 0;
 }
@@ -1072,14 +1091,14 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t
 // process pending input until none remaining or EOS
 static ssize_t transport_consume(pn_transport_t *transport)
 {
-  pn_io_layer_t *io_layer = transport->io_layers;
   size_t consumed = 0;
 
   while (transport->input_pending || transport->tail_closed) {
     ssize_t n;
-    n = io_layer->process_input( io_layer,
-                                 transport->input_buf + consumed,
-                                 transport->input_pending );
+    n = transport->io_layers[PN_IO_SSL]->
+      process_input( transport, PN_IO_SSL,
+                     transport->input_buf + consumed,
+                     transport->input_pending );
     if (n > 0) {
       consumed += n;
       transport->input_pending -= n;
@@ -1101,44 +1120,34 @@ static ssize_t transport_consume(pn_transport_t *transport)
   return consumed;
 }
 
-static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available,
-                                    const char *header, size_t size, const char *protocol,
-                                    ssize_t (*next)(pn_io_layer_t *, const char *, size_t))
+#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
+
+static ssize_t pn_input_read_amqp_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
 {
-  const char *point = header + transport->header_count;
-  int delta = pn_min(available, size - transport->header_count);
-  if (!available || memcmp(bytes, point, delta)) {
+  unsigned readable = pn_min(8, available);
+  bool eos = pn_transport_capacity(transport)==PN_EOS;
+  if (memcmp(bytes, AMQP_HEADER, readable) || (readable<8 && eos) ) {
     char quoted[1024];
     pn_quote_data(quoted, 1024, bytes, available);
     pn_do_error(transport, "amqp:connection:framing-error",
-                "%s header mismatch: '%s'%s", protocol, quoted,
-                available ? "" : " (connection aborted)");
+                "%s header mismatch: '%s'%s", "AMQP", quoted,
+                !eos ? "" : " (connection aborted)");
     return PN_EOS;
-  } else {
-    transport->header_count += delta;
-    if (transport->header_count == size) {
-      transport->header_count = 0;
-      transport->io_layers[PN_IO_AMQP].process_input = next;
-
-      if (transport->disp->trace & PN_TRACE_FRM)
-        pn_transport_logf(transport, "  <- %s", protocol);
+  } else if (readable==8) {
+    if (transport->io_layers[layer] == &amqp_read_header_layer) {
+      transport->io_layers[layer] = &amqp_layer;
+    } else {
+      transport->io_layers[layer] = &amqp_write_header_layer;
     }
-    return delta;
+    if (transport->disp->trace & PN_TRACE_FRM)
+      pn_transport_logf(transport, "  <- %s", "AMQP");
+    return 8;
   }
+  return 0;
 }
 
-#define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
-
-static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
-{
-  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
-  return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
-                              "AMQP", pn_input_read_amqp);
-}
-
-static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available)
 {
-  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   if (transport->close_rcvd) {
     if (available > 0) {
       pn_do_error(transport, "amqp:connection:framing-error", "data after close");
@@ -1164,10 +1173,9 @@ static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, si
 }
 
 /* process AMQP related timer events */
-static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now)
+static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now)
 {
   pn_timestamp_t timeout = 0;
-  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
 
   if (transport->local_idle_timeout) {
     if (transport->dead_remote_deadline == 0 ||
@@ -1827,30 +1835,22 @@ int pn_process(pn_transport_t *transport)
   return 0;
 }
 
-static ssize_t pn_output_write_header(pn_transport_t *transport,
-                                      char *bytes, size_t size,
-                                      const char *header, size_t hdrsize,
-                                      const char *protocol,
-                                      ssize_t (*next)(pn_io_layer_t *, char *, size_t))
+static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
 {
   if (transport->disp->trace & PN_TRACE_FRM)
-    pn_transport_logf(transport, "  -> %s", protocol);
-  assert(size >= hdrsize);
-  memmove(bytes, header, hdrsize);
-  transport->io_layers[PN_IO_AMQP].process_output = next;
-  return hdrsize;
-}
-
-static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
-{
-  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
-  return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
-                                pn_output_write_amqp);
+    pn_transport_logf(transport, "  -> %s", "AMQP");
+  assert(available >= 8);
+  memmove(bytes, AMQP_HEADER, 8);
+  if (transport->io_layers[layer] == &amqp_write_header_layer) {
+    transport->io_layers[layer] = &amqp_layer;
+  } else {
+    transport->io_layers[layer] = &amqp_read_header_layer;
+  }
+  return 8;
 }
 
-static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available)
 {
-  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   if (transport->connection && !transport->done_processing) {
     int err = pn_process(transport);
     if (err) {
@@ -1866,7 +1866,7 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t
     return PN_EOS;
   }
 
-  return pn_dispatcher_output(transport->disp, bytes, size);
+  return pn_dispatcher_output(transport->disp, bytes, available);
 }
 
 static void pni_close_head(pn_transport_t *transport)
@@ -1884,7 +1884,6 @@ static ssize_t transport_produce(pn_transport_t *transport)
 {
   if (transport->head_closed) return PN_EOS;
 
-  pn_io_layer_t *io_layer = transport->io_layers;
   ssize_t space = transport->output_size - transport->output_pending;
 
   if (space <= 0) {     // can we expand the buffer?
@@ -1905,9 +1904,10 @@ static ssize_t transport_produce(pn_transport_t *transport)
 
   while (space > 0) {
     ssize_t n;
-    n = io_layer->process_output( io_layer,
-                                  &transport->output_buf[transport->output_pending],
-                                  space );
+    n = transport->io_layers[PN_IO_SSL]->
+      process_output( transport, PN_IO_SSL,
+                      &transport->output_buf[transport->output_pending],
+                      space );
     if (n > 0) {
       space -= n;
       transport->output_pending += n;
@@ -2043,7 +2043,6 @@ pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport)
 void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
 {
   transport->local_idle_timeout = timeout;
-  transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;
 }
 
 pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
@@ -2053,8 +2052,7 @@ pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
 
 pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
 {
-  pn_io_layer_t *io_layer = transport->io_layers;
-  return io_layer->process_tick( io_layer, now );
+  return transport->io_layers[PN_IO_SSL]->process_tick(transport, PN_IO_SSL, now);
 }
 
 uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
@@ -2072,29 +2070,26 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
 }
 
 /** Pass through input handler */
-ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available)
+ssize_t pn_io_layer_input_passthru(pn_transport_t *transport, unsigned int layer, const char *data, size_t available)
 {
-  pn_io_layer_t *next = io_layer->next;
-  if (next)
-    return next->process_input( next, data, available );
+  if (layer+1<PN_IO_LAYER_CT)
+    return transport->io_layers[layer+1]->process_input(transport, layer+1, data, available);
   return PN_EOS;
 }
 
 /** Pass through output handler */
-ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size)
+ssize_t pn_io_layer_output_passthru(pn_transport_t *transport, unsigned int layer, char *data, size_t available)
 {
-  pn_io_layer_t *next = io_layer->next;
-  if (next)
-    return next->process_output( next, bytes, size );
+  if (layer+1<PN_IO_LAYER_CT)
+      return transport->io_layers[layer+1]->process_output(transport, layer+1, data, available);
   return PN_EOS;
 }
 
 /** Pass through tick handler */
-pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now)
+pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now)
 {
-  pn_io_layer_t *next = io_layer->next;
-  if (next)
-    return next->process_tick( next, now );
+  if (layer+1<PN_IO_LAYER_CT)
+      return transport->io_layers[layer+1]->process_tick(transport, layer+1, now);
   return 0;
 }
 
@@ -2253,11 +2248,10 @@ bool pn_transport_quiesced(pn_transport_t *transport)
   if (pending < 0) return true; // output done
   else if (pending > 0) return false;
   // no pending at transport, but check if data is buffered in I/O layers
-  pn_io_layer_t *io_layer = transport->io_layers;
-  while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) {
-    if (io_layer->buffered_output && io_layer->buffered_output( io_layer ))
+  for (int layer = 0; layer<PN_IO_LAYER_CT; ++layer) {
+    if (transport->io_layers[layer]->buffered_output &&
+        transport->io_layers[layer]->buffered_output( transport ))
       return false;
-    ++io_layer;
   }
   return true;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/windows/schannel.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c
index de6e117..397fa21 100644
--- a/proton-c/src/windows/schannel.c
+++ b/proton-c/src/windows/schannel.c
@@ -82,7 +82,6 @@ typedef enum { CREATED, CLIENT_HELLO, NEGOTIATING,
 
 struct pn_ssl_t {
   pn_transport_t   *transport;
-  pn_io_layer_t    *io_layer;
   pn_ssl_domain_t  *domain;
   const char    *session_id;
   const char *peer_hostname;
@@ -136,17 +135,16 @@ struct pn_ssl_session_t {
 };
 
 
-static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len);
 static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
 static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * );
 static void ssl_session_free( pn_ssl_session_t *);
-static size_t buffered_output( pn_io_layer_t *io_layer );
-static size_t buffered_input( pn_io_layer_t *io_layer );
+static size_t buffered_output( pn_transport_t *transport );
 static void start_ssl_shutdown(pn_ssl_t *ssl);
 static void rewind_sc_inbuf(pn_ssl_t *ssl);
 static bool grow_inbuf2(pn_ssl_t *ssl, size_t minimum_size);
@@ -350,6 +348,41 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
   return 0;
 }
 
+const pn_io_layer_t unknown_layer = {
+    process_input_unknown,
+    process_output_unknown,
+    pn_io_layer_tick_passthru,
+    NULL
+};
+
+const pn_io_layer_t ssl_layer = {
+    process_input_ssl,
+    process_output_ssl,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_input_closed_layer = {
+    process_input_done,
+    process_output_ssl,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_output_closed_layer = {
+    process_input_ssl,
+    process_output_done,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
+const pn_io_layer_t ssl_closed_layer = {
+    process_input_done,
+    process_output_done,
+    pn_io_layer_tick_passthru,
+    buffered_output
+};
+
 int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
 {
   if (!ssl || !domain || ssl->domain) return -1;
@@ -358,13 +391,11 @@ int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id)
   ssl->domain = domain;
   domain->ref_count++;
   if (domain->allow_unsecured) {
-    ssl->io_layer->process_input = process_input_unknown;
-    ssl->io_layer->process_output = process_output_unknown;
-  } else {
-    ssl->io_layer->process_input = process_input_ssl;
-    ssl->io_layer->process_output = process_output_ssl;
+    ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer;
+  }
+  else {
+    ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer;
   }
-
   if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
     ssl->session_id = pn_strdup(session_id);
 
@@ -460,13 +491,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport)
   ssl->transport = transport;
   transport->ssl = ssl;
 
-  ssl->io_layer = &transport->io_layers[PN_IO_SSL];
-  ssl->io_layer->context = ssl;
-  ssl->io_layer->process_input = pn_io_layer_input_passthru;
-  ssl->io_layer->process_output = pn_io_layer_output_passthru;
-  ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
-  ssl->io_layer->buffered_output = buffered_output;
-  ssl->io_layer->buffered_input = buffered_input;
+  transport->io_layers[PN_IO_SSL] = &pni_passthru_layer;
 
   ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
   SecInvalidateHandle(&ssl->cred_handle);
@@ -849,11 +874,9 @@ static void start_ssl_shutdown(pn_ssl_t *ssl)
   ssl_handshake(ssl);
 }
 
-static int setup_ssl_connection(pn_ssl_t *ssl)
+static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer)
 {
-  ssl_log( ssl, "SSL connection detected.\n");
-  ssl->io_layer->process_input = process_input_ssl;
-  ssl->io_layer->process_output = process_output_ssl;
+  transport->io_layers[layer] = &ssl_layer;
   return 0;
 }
 
@@ -976,14 +999,14 @@ static void app_inbytes_advance(pn_ssl_t *ssl, size_t consumed)
     app_inbytes_progress(ssl, 0);
 }
 
-static void read_closed(pn_ssl_t *ssl, ssize_t error)
+static void read_closed(pn_transport_t *transport, unsigned int layer, ssize_t error)
 {
+  pn_ssl_t *ssl = transport->ssl;
   if (ssl->app_input_closed)
     return;
   if (ssl->state == RUNNING && !error) {
-    pn_io_layer_t *io_next = ssl->io_layer->next;
     // Signal end of stream
-    ssl->app_input_closed = io_next->process_input(io_next, ssl->app_inbytes.start, 0);
+    ssl->app_input_closed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, 0);
   }
   if (!ssl->app_input_closed)
     ssl->app_input_closed = error ? error : PN_ERR;
@@ -1000,9 +1023,9 @@ static void read_closed(pn_ssl_t *ssl, ssize_t error)
 
 // Read up to "available" bytes from the network, decrypt it and pass plaintext to application.
 
-static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data, size_t available)
+static ssize_t process_input_ssl(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   ssl_log( ssl, "process_input_ssl( data size=%d )\n",available );
   ssize_t consumed = 0;
   ssize_t forwarded = 0;
@@ -1010,14 +1033,14 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
 
   if (available == 0) {
     // No more inbound network data
-    read_closed(ssl,0);
+    read_closed(transport, layer, 0);
     return 0;
   }
 
   do {
     if (ssl->sc_input_shutdown) {
       // TLS protocol shutdown detected on input
-      read_closed(ssl,0);
+      read_closed(transport, layer, 0);
       return consumed;
     }
 
@@ -1097,8 +1120,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
       // present app_inbytes to io_next only if it has new content
       while (ssl->app_inbytes.size > 0) {
         if (!ssl->app_input_closed) {
-          pn_io_layer_t *io_next = ssl->io_layer->next;
-          ssize_t count = io_next->process_input(io_next, ssl->app_inbytes.start, ssl->app_inbytes.size);
+          ssize_t count = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, ssl->app_inbytes.size);
           if (count > 0) {
             forwarded += count;
             // advance() can increase app_inbytes.size if double buffered
@@ -1115,7 +1137,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
             ssl_log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
                  (int) count, (int)ssl->app_inbytes.size);
             app_inbytes_advance(ssl, ssl->app_inbytes.size);    // discard
-            read_closed(ssl, count);
+            read_closed(transport, layer, count);
           }
         } else {
           ssl_log(ssl, "Input closed discard %d bytes\n",
@@ -1128,15 +1150,19 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data
 
   if (ssl->app_input_closed && ssl->state >= SHUTTING_DOWN) {
     consumed = ssl->app_input_closed;
-    ssl->io_layer->process_input = process_input_done;
+    if (transport->io_layers[layer]==&ssl_output_closed_layer) {
+      transport->io_layers[layer] = &ssl_closed_layer;
+    } else {
+      transport->io_layers[layer] = &ssl_input_closed_layer;
+    }
   }
   ssl_log(ssl, "process_input_ssl() returning %d, forwarded %d\n", (int) consumed, (int) forwarded);
   return consumed;
 }
 
-static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   if (!ssl) return PN_EOS;
   ssl_log( ssl, "process_output_ssl( max_len=%d )\n",max_len );
 
@@ -1173,8 +1199,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
       size_t remaining = ssl->max_data_size;
       ssize_t app_bytes;
       do {
-        pn_io_layer_t *io_next = ssl->io_layer->next;
-        app_bytes = io_next->process_output(io_next, app_outp, remaining);
+        app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, app_outp, remaining);
         if (app_bytes > 0) {
           app_outp += app_bytes;
           remaining -= app_bytes;
@@ -1212,40 +1237,45 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t
 
   if (written == 0 && ssl->state == SSL_CLOSED) {
     written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
-    ssl->io_layer->process_output = process_output_done;
+    if (transport->io_layers[layer]==&ssl_input_closed_layer) {
+      transport->io_layers[layer] = &ssl_closed_layer;
+    } else {
+      transport->io_layers[layer] = &ssl_output_closed_layer;
+    }
   }
   ssl_log(ssl, "process_output_ssl() returning %d\n", (int) written);
   return written;
 }
 
 
-static int setup_cleartext_connection( pn_ssl_t *ssl )
+static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer)
 {
-  ssl_log( ssl, "Cleartext connection detected.\n");
-  ssl->io_layer->process_input = pn_io_layer_input_passthru;
-  ssl->io_layer->process_output = pn_io_layer_output_passthru;
+  transport->io_layers[layer] = &pni_passthru_layer;
   return 0;
 }
 
 
 // until we determine if the client is using SSL or not:
 
-static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
 {
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   switch (check_for_ssl_connection( input_data, len )) {
   case SSL_CONNECTION:
-    setup_ssl_connection( ssl );
-    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+	ssl_log(ssl, "SSL connection detected.\n");
+    setup_ssl_connection(transport, layer);
+	break;
   case CLEAR_CONNECTION:
-    setup_cleartext_connection( ssl );
-    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
+	ssl_log(ssl, "Cleartext connection detected.\n");
+    setup_cleartext_connection(transport, layer);
+	break;
   default:
     return 0;
   }
+  return transport->io_layers[layer]->process_input(transport, layer, input_data, len);
 }
 
-static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
 {
   // do not do output until we know if SSL is used or not
   return 0;
@@ -1304,21 +1334,21 @@ static connection_mode_t check_for_ssl_connection( const char *data, size_t len
   return UNKNOWN_CONNECTION;
 }
 
-static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len)
 {
   return PN_EOS;
 }
 
-static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
+static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len)
 {
   return PN_EOS;
 }
 
 // return # output bytes sitting in this layer
-static size_t buffered_output(pn_io_layer_t *io_layer)
+static size_t buffered_output(pn_transport_t *transport)
 {
   size_t count = 0;
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
+  pn_ssl_t *ssl = transport->ssl;
   if (ssl) {
     count += ssl->network_out_pending;
     if (count == 0 && ssl->state == SHUTTING_DOWN && ssl->queued_shutdown)
@@ -1326,14 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer)
   }
   return count;
 }
-
-// return # input bytes sitting in this layer
-static size_t buffered_input( pn_io_layer_t *io_layer )
-{
-  size_t count = 0;
-  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (ssl) {
-    count += ssl->in_data_count;
-  }
-  return count;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message