qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [39/50] [abbrv] qpid-proton git commit: PROTON-740: fixed shutdown and event related issues with idle timeout during sasl
Date Fri, 14 Nov 2014 11:03:48 GMT
PROTON-740: fixed shutdown and event related issues with idle timeout during sasl


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a5d65452
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a5d65452
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a5d65452

Branch: refs/heads/examples
Commit: a5d654521d47498355089cf93281028876244b3e
Parents: 8a042a2
Author: Rafael Schloming <rhs@alum.mit.edu>
Authored: Sat Nov 8 08:22:39 2014 -0500
Committer: Rafael Schloming <rhs@alum.mit.edu>
Committed: Sat Nov 8 08:23:14 2014 -0500

----------------------------------------------------------------------
 proton-c/include/proton/transport.h   |  13 +++
 proton-c/src/engine/engine-internal.h |   6 +-
 proton-c/src/sasl/sasl.c              |   9 +-
 proton-c/src/ssl/openssl.c            | 133 ++++++++++++++---------------
 proton-c/src/transport/transport.c    |  94 +++++++++++---------
 proton-c/src/windows/schannel.c       |   1 -
 tests/python/proton_tests/engine.py   |  27 ++++++
 7 files changed, 170 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index 33bb3a5..2262e7c 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -209,6 +209,19 @@ PN_EXTERN void pn_transport_log(pn_transport_t *transport, const char
*message);
  *
  * @param[in] transport a transport object
  * @param[in] fmt the printf formatted message to be logged
+ * @param[in] ap a vector containing the format arguments
+ */
+PN_EXTERN void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap);
+
+/**
+ * Log a printf formatted message using a transport's logging
+ * mechanism.
+ *
+ * This can be useful in a debugging context as the log message will
+ * be prefixed with the transport's identifier.
+ *
+ * @param[in] transport a transport object
+ * @param[in] fmt the printf formatted message to be logged
  */
 PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 97f7ead..dd4c44e 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -176,8 +176,7 @@ struct pn_transport_t {
   bool tail_closed;      // input stream closed by driver
   bool head_closed;
   bool done_processing; // if true, don't call pn_process again
-  bool posted_head_closed;
-  bool posted_tail_closed;
+  bool posted_idle_timeout;
 };
 
 struct pn_connection_t {
@@ -319,7 +318,4 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const
char *fm
 void pn_session_unbound(pn_session_t* ssn);
 void pn_link_unbound(pn_link_t* link);
 
-void pni_close_tail(pn_transport_t *transport);
-
-
 #endif /* engine-internal.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index f926b1b..2cc77c2 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -467,7 +467,14 @@ static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char
*bytes,
 static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
 {
   pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
-  ssize_t n = pn_sasl_output(sasl, bytes, size);
+  // this accounts for pn_do_error is invoked, e.g. by idle timeout
+  ssize_t n;
+  if (sasl->transport->close_sent) {
+    n = PN_EOS;
+  } else {
+    n = pn_sasl_output(sasl, bytes, size);
+  }
+
   if (n == PN_EOS) {
     sasl->io_layer->process_output = pn_io_layer_output_passthru;
     pn_io_layer_t *io_next = sasl->io_layer->next;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/ssl/openssl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c
index c9536e2..ea2bd5b 100644
--- a/proton-c/src/ssl/openssl.c
+++ b/proton-c/src/ssl/openssl.c
@@ -149,11 +149,11 @@ static size_t buffered_output( pn_io_layer_t *io_layer );
 static size_t buffered_input( pn_io_layer_t *io_layer );
 
 // @todo: used to avoid littering the code with calls to printf...
-static void _log_error(const char *fmt, ...)
+static void _log_error(pn_ssl_t *ssl, const char *fmt, ...)
 {
   va_list ap;
   va_start(ap, fmt);
-  vfprintf(stderr, fmt, ap);
+  pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap);
   va_end(ap);
 }
 
@@ -163,27 +163,27 @@ static void _log(pn_ssl_t *ssl, const char *fmt, ...)
   if (PN_TRACE_DRV & ssl->trace) {
     va_list ap;
     va_start(ap, fmt);
-    vfprintf(stderr, fmt, ap);
+    pn_transport_vlogf(ssl->transport, fmt, ap);
     va_end(ap);
   }
 }
 
 // log an error and dump the SSL error stack
-static void _log_ssl_error( const char *fmt, ...)
+static void _log_ssl_error(pn_ssl_t *ssl, const char *fmt, ...)
 {
   char buf[128];        // see "man ERR_error_string_n()"
   va_list ap;
 
   if (fmt) {
     va_start(ap, fmt);
-    vfprintf(stderr, fmt, ap);
+    pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap);
     va_end(ap);
   }
 
   unsigned long err = ERR_get_error();
   while (err) {
     ERR_error_string_n(err, buf, sizeof(buf));
-    _log_error("%s\n", buf);
+    _log_error(ssl, "%s", buf);
     err = ERR_get_error();
   }
 }
@@ -211,8 +211,7 @@ static int ssl_failed(pn_ssl_t *ssl)
   if (ssl_err) {
     ERR_error_string_n( ssl_err, buf, sizeof(buf) );
   }
-  _log_ssl_error(NULL);    // spit out any remaining errors to the log file
-  pni_close_tail(ssl->transport);
+  _log_ssl_error(ssl, NULL);    // spit out any remaining errors to the log file
   pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
   return PN_EOS;
 }
@@ -284,23 +283,23 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
   X509 *cert = X509_STORE_CTX_get_current_cert(ctx);
   SSL *ssn = (SSL *) X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
   if (!ssn) {
-    _log_error("Error: unexpected error - SSL session info not available for peer verify!\n");
+    _log_error(NULL, "Error: unexpected error - SSL session info not available for peer verify!");
     return 0;  // fail connection
   }
 
   pn_ssl_t *ssl = (pn_ssl_t *)SSL_get_ex_data(ssn, ssl_ex_data_index);
   if (!ssl) {
-    _log_error("Error: unexpected error - SSL context info not available for peer verify!\n");
+    _log_error(NULL, "Error: unexpected error - SSL context info not available for peer verify!");
     return 0;  // fail connection
   }
 
   if (ssl->domain->verify_mode != PN_SSL_VERIFY_PEER_NAME) return preverify_ok;
   if (!ssl->peer_hostname) {
-    _log_error("Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but no peer
hostname set!\n");
+    _log_error(ssl, "Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but
no peer hostname set!");
     return 0;  // fail connection
   }
 
-  _log( ssl, "Checking identifying name in peer cert against '%s'\n", ssl->peer_hostname);
+  _log( ssl, "Checking identifying name in peer cert against '%s'", ssl->peer_hostname);
 
   bool matched = false;
 
@@ -317,7 +316,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
           unsigned char *str;
           int len = ASN1_STRING_to_UTF8( &str, asn1 );
           if (len >= 0) {
-            _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'\n", len, str );
+            _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'", len, str );
             matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len );
             OPENSSL_free( str );
           }
@@ -337,7 +336,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
       unsigned char *str;
       int len = ASN1_STRING_to_UTF8( &str, name_asn1);
       if (len >= 0) {
-        _log( ssl, "commonName from peer cert = '%.*s'\n", len, str );
+        _log( ssl, "commonName from peer cert = '%.*s'", len, str );
         matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len );
         OPENSSL_free(str);
       }
@@ -345,14 +344,14 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
   }
 
   if (!matched) {
-    _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.\n",
+    _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.",
           ssl->peer_hostname);
     preverify_ok = 0;
 #ifdef X509_V_ERR_APPLICATION_VERIFICATION
     X509_STORE_CTX_set_error( ctx, X509_V_ERR_APPLICATION_VERIFICATION );
 #endif
   } else {
-    _log( ssl, "Name from peer cert matched - peer is valid.\n" );
+    _log( ssl, "Name from peer cert matched - peer is valid." );
   }
   return preverify_ok;
 }
@@ -459,7 +458,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
   case PN_SSL_MODE_CLIENT:
     domain->ctx = SSL_CTX_new(SSLv23_client_method()); // and TLSv1+
     if (!domain->ctx) {
-      _log_ssl_error( "Unable to initialize OpenSSL context.\n");
+      _log_ssl_error(NULL, "Unable to initialize OpenSSL context.");
       free(domain);
       return NULL;
     }
@@ -468,14 +467,14 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
   case PN_SSL_MODE_SERVER:
     domain->ctx = SSL_CTX_new(SSLv23_server_method()); // and TLSv1+
     if (!domain->ctx) {
-      _log_ssl_error("Unable to initialize OpenSSL context.\n");
+      _log_ssl_error(NULL, "Unable to initialize OpenSSL context.");
       free(domain);
       return NULL;
     }
     break;
 
   default:
-    _log_error("Invalid value for pn_ssl_mode_t: %d\n", mode);
+    _log_error(NULL, "Invalid value for pn_ssl_mode_t: %d", mode);
     free(domain);
     return NULL;
   }
@@ -488,7 +487,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode )
 
   // by default, allow anonymous ciphers so certificates are not required 'out of the box'
   if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_ANONYMOUS )) {
-    _log_ssl_error("Failed to set cipher list to %s\n", CIPHERS_ANONYMOUS);
+    _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_ANONYMOUS);
     pn_ssl_domain_free(domain);
     return NULL;
   }
@@ -537,7 +536,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
   if (!domain || !domain->ctx) return -1;
 
   if (SSL_CTX_use_certificate_chain_file(domain->ctx, certificate_file) != 1) {
-    _log_ssl_error( "SSL_CTX_use_certificate_chain_file( %s ) failed\n", certificate_file);
+    _log_ssl_error(NULL, "SSL_CTX_use_certificate_chain_file( %s ) failed", certificate_file);
     return -3;
   }
 
@@ -548,12 +547,12 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
   }
 
   if (SSL_CTX_use_PrivateKey_file(domain->ctx, private_key_file, SSL_FILETYPE_PEM) !=
1) {
-    _log_ssl_error( "SSL_CTX_use_PrivateKey_file( %s ) failed\n", private_key_file);
+    _log_ssl_error(NULL, "SSL_CTX_use_PrivateKey_file( %s ) failed", private_key_file);
     return -4;
   }
 
   if (SSL_CTX_check_private_key(domain->ctx) != 1) {
-    _log_ssl_error( "The key file %s is not consistent with the certificate %s\n",
+    _log_ssl_error(NULL, "The key file %s is not consistent with the certificate %s",
                    private_key_file, certificate_file);
     return -5;
   }
@@ -564,7 +563,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain,
   // cipher was negotiated.  TLSv1 will reject such a request.  Hack: once a cert is
   // configured, allow only authenticated ciphers.
   if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_AUTHENTICATE )) {
-      _log_ssl_error( "Failed to set cipher list to %s\n", CIPHERS_AUTHENTICATE);
+      _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_AUTHENTICATE);
       return -6;
   }
 
@@ -581,7 +580,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain,
   // to SSL_CTX_load_verify_locations()
   struct stat sbuf;
   if (stat( certificate_db, &sbuf ) != 0) {
-    _log_error("stat(%s) failed: %s\n", certificate_db, strerror(errno));
+    _log_error(NULL, "stat(%s) failed: %s", certificate_db, strerror(errno));
     return -1;
   }
 
@@ -596,7 +595,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain,
   }
 
   if (SSL_CTX_load_verify_locations( domain->ctx, file, dir ) != 1) {
-    _log_ssl_error( "SSL_CTX_load_verify_locations( %s ) failed\n", certificate_db);
+    _log_ssl_error(NULL, "SSL_CTX_load_verify_locations( %s ) failed", certificate_db);
     return -1;
   }
 
@@ -617,8 +616,8 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
   case PN_SSL_VERIFY_PEER_NAME:
 
     if (!domain->has_ca_db) {
-      _log_error("Error: cannot verify peer without a trusted CA configured.\n"
-                 "       Use pn_ssl_domain_set_trusted_ca_db()\n");
+      _log_error(NULL, "Error: cannot verify peer without a trusted CA configured.\n"
+                 "       Use pn_ssl_domain_set_trusted_ca_db()");
       return -1;
     }
 
@@ -626,12 +625,12 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
       // openssl requires that server connections supply a list of trusted CAs which is
       // sent to the client
       if (!trusted_CAs) {
-        _log_error("Error: a list of trusted CAs must be provided.\n");
+        _log_error(NULL, "Error: a list of trusted CAs must be provided.");
         return -1;
       }
       if (!domain->has_certificate) {
-      _log_error("Error: Server cannot verify peer without configuring a certificate.\n"
-                 "       Use pn_ssl_domain_set_credentials()\n");
+        _log_error(NULL, "Error: Server cannot verify peer without configuring a certificate.\n"
+                   "       Use pn_ssl_domain_set_credentials()");
       }
 
       if (domain->trusted_CAs) free(domain->trusted_CAs);
@@ -641,7 +640,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
       if (cert_names != NULL)
         SSL_CTX_set_client_CA_list(domain->ctx, cert_names);
       else {
-        _log_error("Error: Unable to process file of trusted CAs: %s\n", trusted_CAs);
+        _log_error(NULL, "Error: Unable to process file of trusted CAs: %s", trusted_CAs);
         return -1;
       }
     }
@@ -658,7 +657,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain,
     break;
 
   default:
-    _log_error( "Invalid peer authentication mode given.\n" );
+    _log_error(NULL, "Invalid peer authentication mode given." );
     return -1;
   }
 
@@ -692,7 +691,7 @@ int pn_ssl_domain_allow_unsecured_client(pn_ssl_domain_t *domain)
 {
   if (!domain) return -1;
   if (domain->mode != PN_SSL_MODE_SERVER) {
-    _log_error("Cannot permit unsecured clients - not a server.\n");
+    _log_error(NULL, "Cannot permit unsecured clients - not a server.");
     return -1;
   }
   domain->allow_unsecured = true;
@@ -734,7 +733,7 @@ bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *buffer, size_t size
)
 void pn_ssl_free( pn_ssl_t *ssl)
 {
   if (!ssl) return;
-  _log( ssl, "SSL socket freed.\n" );
+  _log( ssl, "SSL socket freed." );
   release_ssl_socket( ssl );
   if (ssl->domain) pn_ssl_domain_free(ssl->domain);
   if (ssl->session_id) free((void *)ssl->session_id);
@@ -796,7 +795,7 @@ static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata)
 static int start_ssl_shutdown( pn_ssl_t *ssl )
 {
   if (!ssl->ssl_shutdown) {
-    _log(ssl, "Shutting down SSL connection...\n");
+    _log(ssl, "Shutting down SSL connection...");
     if (ssl->session_id) {
       // save the negotiated credentials before we close the connection
       pn_ssl_session_t *ssn = (pn_ssl_session_t *)calloc( 1, sizeof(pn_ssl_session_t));
@@ -804,7 +803,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
         ssn->id = pn_strdup( ssl->session_id );
         ssn->session = SSL_get1_session( ssl->ssl );
         if (ssn->session) {
-          _log( ssl, "Saving SSL session as %s\n", ssl->session_id );
+          _log( ssl, "Saving SSL session as %s", ssl->session_id );
           LL_ADD( ssl->domain, ssn_cache, ssn );
         } else {
           ssl_session_free( ssn );
@@ -821,7 +820,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl )
 
 static int setup_ssl_connection( pn_ssl_t *ssl )
 {
-  _log( ssl, "SSL connection detected.\n");
+  _log( ssl, "SSL connection detected.");
   ssl->io_layer->process_input = process_input_ssl;
   ssl->io_layer->process_output = process_output_ssl;
   return 0;
@@ -837,7 +836,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
   pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
   if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
-  _log( ssl, "process_input_ssl( data size=%d )\n",available );
+  _log( ssl, "process_input_ssl( data size=%d )",available );
 
   ssize_t consumed = 0;
   bool work_pending;
@@ -856,12 +855,12 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
         consumed += written;
         ssl->read_blocked = false;
         work_pending = (available > 0);
-        _log( ssl, "Wrote %d bytes to BIO Layer, %d left over\n", written, available );
+        _log( ssl, "Wrote %d bytes to BIO Layer, %d left over", written, available );
       }
     } else if (shutdown_input) {
       // lower layer (caller) has closed.  Close the WRITE side of the BIO.  This will cause
       // an EOF to be passed to SSL once all pending inbound data has been consumed.
-      _log( ssl, "Lower layer closed - shutting down BIO write side\n");
+      _log( ssl, "Lower layer closed - shutting down BIO write side");
       (void)BIO_shutdown_wr( ssl->bio_net_io );
       shutdown_input = false;
     }
@@ -871,7 +870,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
     if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) {
       int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size
- ssl->in_count );
       if (read > 0) {
-        _log( ssl, "Read %d bytes from SSL socket for app\n", read );
+        _log( ssl, "Read %d bytes from SSL socket for app", read );
         _log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read );
         ssl->in_count += read;
         work_pending = true;
@@ -881,7 +880,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
           switch (reason) {
           case SSL_ERROR_ZERO_RETURN:
             // SSL closed cleanly
-            _log(ssl, "SSL connection has closed\n");
+            _log(ssl, "SSL connection has closed");
             start_ssl_shutdown(ssl);  // KAG: not sure - this may not be necessary
             ssl->ssl_closed = true;
             break;
@@ -892,11 +891,11 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
         } else {
           if (BIO_should_write( ssl->bio_ssl )) {
             ssl->write_blocked = true;
-            _log(ssl, "Detected write-blocked\n");
+            _log(ssl, "Detected write-blocked");
           }
           if (BIO_should_read( ssl->bio_ssl )) {
             ssl->read_blocked = true;
-            _log(ssl, "Detected read-blocked\n");
+            _log(ssl, "Detected read-blocked");
           }
         }
       }
@@ -913,9 +912,9 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
           if (ssl->in_count)
             memmove( ssl->inbuf, ssl->inbuf + consumed, ssl->in_count );
           work_pending = true;
-          _log( ssl, "Application consumed %d bytes from peer\n", (int) consumed );
+          _log( ssl, "Application consumed %d bytes from peer", (int) consumed );
         } else if (consumed < 0) {
-          _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n",
+          _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)",
                (int) consumed, (int)ssl->in_count);
           ssl->in_count = 0;    // discard any pending input
           ssl->app_input_closed = consumed;
@@ -945,7 +944,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
               // the application _must_ have enough data to process.  If
               // this is an oversized frame, the app _must_ handle it
               // by returning an error code to SSL.
-              _log_error("Error: application unable to consume input.\n");
+              _log_error(ssl, "Error: application unable to consume input.");
             }
           }
         }
@@ -954,7 +953,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
 
   } while (work_pending);
 
-  //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d\n",
+  //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d",
   //     ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed
);
 
   // PROTON-82: Instead, close the input side as soon as we've completed enough of the SSL
@@ -971,7 +970,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char
*input_dat
     consumed = ssl->app_input_closed;
     ssl->io_layer->process_input = process_input_done;
   }
-  _log(ssl, "process_input_ssl() returning %d\n", (int) consumed);
+  _log(ssl, "process_input_ssl() returning %d", (int) consumed);
   return consumed;
 }
 
@@ -994,10 +993,10 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer,
size_t
       if (app_bytes > 0) {
         ssl->out_count += app_bytes;
         work_pending = true;
-        _log( ssl, "Gathered %d bytes from app to send to peer\n", app_bytes );
+        _log( ssl, "Gathered %d bytes from app to send to peer", app_bytes );
       } else {
         if (app_bytes < 0) {
-          _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)\n",
+          _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)",
                (int) app_bytes, (int) ssl->out_count);
           ssl->app_output_closed = app_bytes;
         }
@@ -1014,14 +1013,14 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer,
size_t
           data += wrote;
           ssl->out_count -= wrote;
           work_pending = true;
-          _log( ssl, "Wrote %d bytes from app to socket\n", wrote );
+          _log( ssl, "Wrote %d bytes from app to socket", wrote );
         } else {
           if (!BIO_should_retry(ssl->bio_ssl)) {
             int reason = SSL_get_error( ssl->ssl, wrote );
             switch (reason) {
             case SSL_ERROR_ZERO_RETURN:
               // SSL closed cleanly
-              _log(ssl, "SSL connection has closed\n");
+              _log(ssl, "SSL connection has closed");
               start_ssl_shutdown(ssl); // KAG: not sure - this may not be necessary
               ssl->out_count = 0;      // can no longer write to socket, so erase app
output data
               ssl->ssl_closed = true;
@@ -1033,11 +1032,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer,
size_t
           } else {
             if (BIO_should_read( ssl->bio_ssl )) {
               ssl->read_blocked = true;
-              _log(ssl, "Detected read-blocked\n");
+              _log(ssl, "Detected read-blocked");
             }
             if (BIO_should_write( ssl->bio_ssl )) {
               ssl->write_blocked = true;
-              _log(ssl, "Detected write-blocked\n");
+              _log(ssl, "Detected write-blocked");
             }
           }
         }
@@ -1063,13 +1062,13 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer,
size_t
         written += available;
         ssl->write_blocked = false;
         work_pending = work_pending || max_len > 0;
-        _log( ssl, "Read %d bytes from BIO Layer\n", available );
+        _log( ssl, "Read %d bytes from BIO Layer", available );
       }
     }
 
   } while (work_pending);
 
-  //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d
bio_pend=%d\n",
+  //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d
bio_pend=%d",
   //     written, ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed,
BIO_pending(ssl->bio_net_io) );
 
   // PROTON-82: close the output side as soon as we've sent the SSL close_notify.
@@ -1084,7 +1083,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer,
size_t
     written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
     ssl->io_layer->process_output = process_output_done;
   }
-  _log(ssl, "process_output_ssl() returning %d\n", (int) written);
+  _log(ssl, "process_output_ssl() returning %d", (int) written);
   return written;
 }
 
@@ -1095,7 +1094,7 @@ static int init_ssl_socket( pn_ssl_t *ssl )
 
   ssl->ssl = SSL_new(ssl->domain->ctx);
   if (!ssl->ssl) {
-    _log_error( "SSL socket setup failure.\n" );
+    _log_error(ssl, "SSL socket setup failure." );
     return -1;
   }
 
@@ -1112,10 +1111,10 @@ static int init_ssl_socket( pn_ssl_t *ssl )
   if (ssl->session_id) {
     pn_ssl_session_t *ssn = ssn_cache_find( ssl->domain, ssl->session_id );
     if (ssn) {
-      _log( ssl, "Restoring previous session id=%s\n", ssn->id );
+      _log( ssl, "Restoring previous session id=%s", ssn->id );
       int rc = SSL_set_session( ssl->ssl, ssn->session );
       if (rc != 1) {
-        _log( ssl, "Session restore failed, id=%s\n", ssn->id );
+        _log( ssl, "Session restore failed, id=%s", ssn->id );
       }
       LL_REMOVE( ssl->domain, ssn_cache, ssn );
       ssl_session_free( ssn );
@@ -1125,14 +1124,14 @@ static int init_ssl_socket( pn_ssl_t *ssl )
   // now layer a BIO over the SSL socket
   ssl->bio_ssl = BIO_new(BIO_f_ssl());
   if (!ssl->bio_ssl) {
-    _log_error( "BIO setup failure.\n" );
+    _log_error(ssl, "BIO setup failure." );
     return -1;
   }
   (void)BIO_set_ssl(ssl->bio_ssl, ssl->ssl, BIO_NOCLOSE);
 
   // create the "lower" BIO "pipe", and attach it below the SSL layer
   if (!BIO_new_bio_pair(&ssl->bio_ssl_io, 0, &ssl->bio_net_io, 0)) {
-    _log_error( "BIO setup failure.\n" );
+    _log_error(ssl, "BIO setup failure." );
     return -1;
   }
   SSL_set_bio(ssl->ssl, ssl->bio_ssl_io, ssl->bio_ssl_io);
@@ -1140,11 +1139,11 @@ static int init_ssl_socket( pn_ssl_t *ssl )
   if (ssl->domain->mode == PN_SSL_MODE_SERVER) {
     SSL_set_accept_state(ssl->ssl);
     BIO_set_ssl_mode(ssl->bio_ssl, 0);  // server mode
-    _log( ssl, "Server SSL socket created.\n" );
+    _log( ssl, "Server SSL socket created." );
   } else {      // client mode
     SSL_set_connect_state(ssl->ssl);
     BIO_set_ssl_mode(ssl->bio_ssl, 1);  // client mode
-    _log( ssl, "Client SSL socket created.\n" );
+    _log( ssl, "Client SSL socket created." );
   }
   return 0;
 }
@@ -1167,7 +1166,7 @@ static void release_ssl_socket( pn_ssl_t *ssl )
 
 static int setup_cleartext_connection( pn_ssl_t *ssl )
 {
-  _log( ssl, "Cleartext connection detected.\n");
+  _log( ssl, "Cleartext connection detected.");
   ssl->io_layer->process_input = pn_io_layer_input_passthru;
   ssl->io_layer->process_output = pn_io_layer_output_passthru;
   return 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index d91b55a..601d6a2 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -177,8 +177,7 @@ static void pn_transport_initialize(void *object)
 
   transport->done_processing = false;
 
-  transport->posted_head_closed = false;
-  transport->posted_tail_closed = false;
+  transport->posted_idle_timeout = false;
 }
 
 pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -457,6 +456,24 @@ static pn_collector_t *pni_transport_collector(pn_transport_t *transport)
   }
 }
 
+static void pni_maybe_post_closed(pn_transport_t *transport)
+{
+  pn_collector_t *collector = pni_transport_collector(transport);
+  if (transport->head_closed && transport->tail_closed) {
+    pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
+  }
+}
+
+static void pni_close_tail(pn_transport_t *transport)
+{
+  if (!transport->tail_closed) {
+    transport->tail_closed = true;
+    pn_collector_t *collector = pni_transport_collector(transport);
+    pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
+    pni_maybe_post_closed(transport);
+  }
+}
+
 int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
 {
   va_list ap;
@@ -479,6 +496,8 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const
char *fm
   pn_collector_t *collector = pni_transport_collector(transport);
   pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR);
   pn_transport_logf(transport, "ERROR %s %s", condition, buf);
+  transport->done_processing = true;
+  pni_close_tail(transport);
   return PN_ERR;
 }
 
@@ -1050,14 +1069,6 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes,
size_t
   return original - available;
 }
 
-static void pni_maybe_post_closed(pn_transport_t *transport)
-{
-  pn_collector_t *collector = pni_transport_collector(transport);
-  if (transport->posted_head_closed && transport->posted_tail_closed) {
-    pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
-  }
-}
-
 // process pending input until none remaining or EOS
 static ssize_t transport_consume(pn_transport_t *transport)
 {
@@ -1079,12 +1090,6 @@ static ssize_t transport_consume(pn_transport_t *transport)
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_transport_log(transport, "  <- EOS");
       transport->input_pending = 0;  // XXX ???
-      if (!transport->posted_tail_closed) {
-        pn_collector_t *collector = pni_transport_collector(transport);
-        pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
-        transport->posted_tail_closed = true;
-        pni_maybe_post_closed(transport);
-      }
       return n;
     }
   }
@@ -1171,8 +1176,11 @@ static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t
now)
       transport->last_bytes_input = transport->bytes_input;
     } else if (transport->dead_remote_deadline <= now) {
       transport->dead_remote_deadline = now + transport->local_idle_timeout;
-      // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
-      pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+      if (!transport->posted_idle_timeout) {
+        transport->posted_idle_timeout = true;
+        // Note: AMQP-1.0 really should define a generic "timeout" error, but does not.
+        pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired");
+      }
     }
     timeout = transport->dead_remote_deadline;
   }
@@ -1861,9 +1869,21 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes,
size_t
   return pn_dispatcher_output(transport->disp, bytes, size);
 }
 
+static void pni_close_head(pn_transport_t *transport)
+{
+  if (!transport->head_closed) {
+    transport->head_closed = true;
+    pn_collector_t *collector = pni_transport_collector(transport);
+    pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
+    pni_maybe_post_closed(transport);
+  }
+}
+
 // generate outbound data, return amount of pending output else error
 static ssize_t transport_produce(pn_transport_t *transport)
 {
+  if (transport->head_closed) return PN_EOS;
+
   pn_io_layer_t *io_layer = transport->io_layers;
   ssize_t space = transport->output_size - transport->output_pending;
 
@@ -1900,13 +1920,12 @@ static ssize_t transport_produce(pn_transport_t *transport)
         if (n < 0) {
           pn_transport_log(transport, "  -> EOS");
         }
-        /*else
-          pn_transport_logf(transport, "  -> EOS (%" PN_ZI ") %s", n,
-          pn_error_text(transport->error));*/
       }
+      pni_close_head(transport);
       return n;
     }
   }
+
   return transport->output_pending;
 }
 
@@ -1963,15 +1982,24 @@ void pn_transport_log(pn_transport_t *transport, const char *message)
   transport->tracer(transport, message);
 }
 
+void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap)
+{
+  if (transport) {
+    pn_string_vformat(transport->scratch, fmt, ap);
+    pn_transport_log(transport, pn_string_get(transport->scratch));
+  } else {
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+  }
+}
+
 void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...)
 {
   va_list ap;
 
   va_start(ap, fmt);
-  pn_string_vformat(transport->scratch, fmt, ap);
+  pn_transport_vlogf(transport, fmt, ap);
   va_end(ap);
-
-  pn_transport_log(transport, pn_string_get(transport->scratch));
 }
 
 uint16_t pn_transport_get_channel_max(pn_transport_t *transport)
@@ -2132,13 +2160,6 @@ ssize_t pn_transport_push(pn_transport_t *transport, const char *src,
size_t siz
   }
 }
 
-void pni_close_tail(pn_transport_t *transport)
-{
-  if (!transport->tail_closed) {
-    transport->tail_closed = true;
-  }
-}
-
 int pn_transport_process(pn_transport_t *transport, size_t size)
 {
   assert(transport);
@@ -2168,7 +2189,6 @@ int pn_transport_close_tail(pn_transport_t *transport)
 ssize_t pn_transport_pending(pn_transport_t *transport)      /* <0 == done */
 {
   assert(transport);
-  if (transport->head_closed) return PN_EOS;
   return transport_produce( transport );
 }
 
@@ -2211,12 +2231,8 @@ void pn_transport_pop(pn_transport_t *transport, size_t size)
                transport->output_pending );
     }
 
-    if (!transport->output_pending && pn_transport_pending(transport) < 0 &&
-        !transport->posted_head_closed) {
-      pn_collector_t *collector = pni_transport_collector(transport);
-      pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
-      transport->posted_head_closed = true;
-      pni_maybe_post_closed(transport);
+    if (!transport->output_pending && pn_transport_pending(transport) < 0)
{
+      pni_close_head(transport);
     }
   }
 }
@@ -2224,7 +2240,7 @@ void pn_transport_pop(pn_transport_t *transport, size_t size)
 int pn_transport_close_head(pn_transport_t *transport)
 {
   size_t pending = pn_transport_pending(transport);
-  transport->head_closed = true;
+  pni_close_head(transport);
   pn_transport_pop(transport, pending);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/windows/schannel.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c
index 373dc51..abf4b85 100644
--- a/proton-c/src/windows/schannel.c
+++ b/proton-c/src/windows/schannel.c
@@ -222,7 +222,6 @@ static int ssl_failed(pn_ssl_t *ssl, const char *reason)
   ssl->ssl_closed = true;
   ssl->app_input_closed = ssl->app_output_closed = PN_EOS;
   ssl->state = SSL_CLOSED;
-  pni_close_tail(ssl->transport);
   pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", reason);
   return PN_EOS;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index eec73d0..d17a57c 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -2413,3 +2413,30 @@ class TeardownLeakTest(PeerTest):
 
   def testLeak(self):
     self.doLeak(False, False)
+
+class IdleTimeoutEventTest(PeerTest):
+
+  def half_pump(self):
+    p = self.transport.pending()
+    self.transport.pop(p)
+
+  def testTimeoutWithZombieServer(self):
+    self.transport.idle_timeout = self.delay
+    self.connection.open()
+    self.half_pump()
+    self.transport.tick(time())
+    sleep(self.delay*2)
+    self.transport.tick(time())
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
+                Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
+    assert self.transport.capacity() < 0
+    assert self.transport.pending() > 0
+    self.half_pump()
+    self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+    assert self.transport.pending() < 0
+
+  def testTimeoutWithZombieServerAndSASL(self):
+    sasl = self.transport.sasl()
+    sasl.client()
+    self.testTimeoutWithZombieServer()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message