qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1625784 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/ proton-c/src/events/ proton-c/src/messenger/ proton-c/src/ssl/ proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/e...
Date Wed, 17 Sep 2014 19:48:18 GMT
Author: rhs
Date: Wed Sep 17 19:48:17 2014
New Revision: 1625784

URL: http://svn.apache.org/r1625784
Log:
Added events: PN_CONNECTION_BOUND/UNBOUND, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED,
PN_TRANSPORT_CLOSED, and PN_TRANSPORT_ERROR. This should address PROTON-656

Added:
    qpid/proton/trunk/tests/python/proton_tests/scratch.py
Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/event.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/events/event.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Sep 17 19:48:17 2014
@@ -3360,6 +3360,8 @@ class Collector:
 class Event:
 
   CONNECTION_INIT = PN_CONNECTION_INIT
+  CONNECTION_BOUND = PN_CONNECTION_BOUND
+  CONNECTION_UNBOUND = PN_CONNECTION_UNBOUND
   CONNECTION_OPEN = PN_CONNECTION_OPEN
   CONNECTION_CLOSE = PN_CONNECTION_CLOSE
   CONNECTION_REMOTE_OPEN = PN_CONNECTION_REMOTE_OPEN
@@ -3382,7 +3384,12 @@ class Event:
   LINK_FINAL = PN_LINK_FINAL
 
   DELIVERY = PN_DELIVERY
+
   TRANSPORT = PN_TRANSPORT
+  TRANSPORT_ERROR = PN_TRANSPORT_ERROR
+  TRANSPORT_HEAD_CLOSED = PN_TRANSPORT_HEAD_CLOSED
+  TRANSPORT_TAIL_CLOSED = PN_TRANSPORT_TAIL_CLOSED
+  TRANSPORT_CLOSED = PN_TRANSPORT_CLOSED
 
   def __init__(self, clazz, context, type):
     self.clazz = clazz

Modified: qpid/proton/trunk/proton-c/include/proton/event.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/event.h?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/event.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/event.h Wed Sep 17 19:48:17 2014
@@ -91,130 +91,148 @@ typedef enum {
    * will ever be issued for a connection. Events of this type point
    * to the relevant connection.
    */
-  PN_CONNECTION_INIT = 1,
+  PN_CONNECTION_INIT,
+
+  /**
+   * The connection has been bound to a transport.
+   */
+  PN_CONNECTION_BOUND,
+
+  /**
+   * The connection has been unbound from its transport.
+   */
+  PN_CONNECTION_UNBOUND,
 
   /**
    * The local connection endpoint has been closed. Events of this
    * type point to the relevant connection.
    */
-  PN_CONNECTION_OPEN = 2,
+  PN_CONNECTION_OPEN,
 
   /**
    * The remote endpoint has opened the connection. Events of this
    * type point to the relevant connection.
    */
-  PN_CONNECTION_REMOTE_OPEN = 3,
+  PN_CONNECTION_REMOTE_OPEN,
 
   /**
    * The local connection endpoint has been closed. Events of this
    * type point to the relevant connection.
    */
-  PN_CONNECTION_CLOSE = 4,
+  PN_CONNECTION_CLOSE,
 
   /**
    *  The remote endpoint has closed the connection. Events of this
    *  type point to the relevant connection.
    */
-  PN_CONNECTION_REMOTE_CLOSE = 5,
+  PN_CONNECTION_REMOTE_CLOSE,
 
   /**
    * The connection has been freed and any outstanding processing has
    * been completed. This is the final event that will ever be issued
    * for a connection.
    */
-  PN_CONNECTION_FINAL = 6,
+  PN_CONNECTION_FINAL,
 
   /**
    * The session has been created. This is the first event that will
    * ever be issued for a session.
    */
-  PN_SESSION_INIT = 11,
+  PN_SESSION_INIT,
 
   /**
    * The local session endpoint has been opened. Events of this type
    * point ot the relevant session.
    */
-  PN_SESSION_OPEN = 12,
+  PN_SESSION_OPEN,
 
   /**
    * The remote endpoint has opened the session. Events of this type
    * point to the relevant session.
    */
-  PN_SESSION_REMOTE_OPEN = 13,
+  PN_SESSION_REMOTE_OPEN,
 
   /**
    * The local session endpoint has been closed. Events of this type
    * point ot the relevant session.
    */
-  PN_SESSION_CLOSE = 14,
+  PN_SESSION_CLOSE,
 
   /**
    * The remote endpoint has closed the session. Events of this type
    * point to the relevant session.
    */
-  PN_SESSION_REMOTE_CLOSE = 15,
+  PN_SESSION_REMOTE_CLOSE,
 
   /**
    * The session has been freed and any outstanding processing has
    * been completed. This is the final event that will ever be issued
    * for a session.
    */
-  PN_SESSION_FINAL = 16,
+  PN_SESSION_FINAL,
 
   /**
    * The link has been created. This is the first event that will ever
    * be issued for a link.
    */
-  PN_LINK_INIT = 21,
+  PN_LINK_INIT,
 
   /**
    * The local link endpoint has been opened. Events of this type
    * point ot the relevant link.
    */
-  PN_LINK_OPEN = 22,
+  PN_LINK_OPEN,
 
   /**
    * The remote endpoint has opened the link. Events of this type
    * point to the relevant link.
    */
-  PN_LINK_REMOTE_OPEN = 23,
+  PN_LINK_REMOTE_OPEN,
 
   /**
    * The local link endpoint has been closed. Events of this type
    * point ot the relevant link.
    */
-  PN_LINK_CLOSE = 24,
+  PN_LINK_CLOSE,
 
   /**
    * The remote endpoint has closed the link. Events of this type
    * point to the relevant link.
    */
-  PN_LINK_REMOTE_CLOSE = 25,
+  PN_LINK_REMOTE_CLOSE,
 
   /**
    * The flow control state for a link has changed. Events of this
    * type point to the relevant link.
    */
-  PN_LINK_FLOW = 26,
+  PN_LINK_FLOW,
 
   /**
    * The link has been freed and any outstanding processing has been
    * completed. This is the final event that will ever be issued for a
    * link. Events of this type point to the relevant link.
    */
-  PN_LINK_FINAL = 27,
+  PN_LINK_FINAL,
 
   /**
    * A delivery has been created or updated. Events of this type point
    * to the relevant delivery.
    */
-  PN_DELIVERY = 31,
+  PN_DELIVERY,
 
   /**
    * The transport has new data to read and/or write. Events of this
    * type point to the relevant transport.
    */
-  PN_TRANSPORT = 41
+  PN_TRANSPORT,
+
+  PN_TRANSPORT_ERROR,
+
+  PN_TRANSPORT_HEAD_CLOSED,
+
+  PN_TRANSPORT_TAIL_CLOSED,
+
+  PN_TRANSPORT_CLOSED
 
 } pn_event_type_t;
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed Sep 17 19:48:17 2014
@@ -174,6 +174,8 @@ struct pn_transport_t {
   bool tail_closed;      // input stream closed by driver
   bool head_closed;
   bool done_processing; // if true, don't call pn_process again
+  bool posted_head_closed;
+  bool posted_tail_closed;
 };
 
 struct pn_connection_t {
@@ -312,4 +314,7 @@ void pn_clear_modified(pn_connection_t *
 void pn_connection_unbound(pn_connection_t *conn);
 int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
 
+void pni_close_tail(pn_transport_t *transport);
+
+
 #endif /* engine-internal.h */

Modified: qpid/proton/trunk/proton-c/src/events/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/events/event.c?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/events/event.c (original)
+++ qpid/proton/trunk/proton-c/src/events/event.c Wed Sep 17 19:48:17 2014
@@ -236,6 +236,10 @@ const char *pn_event_type_name(pn_event_
     return "PN_EVENT_NONE";
   case PN_CONNECTION_INIT:
     return "PN_CONNECTION_INIT";
+  case PN_CONNECTION_BOUND:
+    return "PN_CONNECTION_BOUND";
+  case PN_CONNECTION_UNBOUND:
+    return "PN_CONNECTION_UNBOUND";
   case PN_CONNECTION_REMOTE_OPEN:
     return "PN_CONNECTION_REMOTE_OPEN";
   case PN_CONNECTION_OPEN:
@@ -276,6 +280,14 @@ const char *pn_event_type_name(pn_event_
     return "PN_DELIVERY";
   case PN_TRANSPORT:
     return "PN_TRANSPORT";
+  case PN_TRANSPORT_ERROR:
+    return "PN_TRANSPORT_ERROR";
+  case PN_TRANSPORT_HEAD_CLOSED:
+    return "PN_TRANSPORT_HEAD_CLOSED";
+  case PN_TRANSPORT_TAIL_CLOSED:
+    return "PN_TRANSPORT_TAIL_CLOSED";
+  case PN_TRANSPORT_CLOSED:
+    return "PN_TRANSPORT_CLOSED";
   }
 
   return "<unrecognized>";

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Wed Sep 17 19:48:17 2014
@@ -1259,10 +1259,18 @@ int pn_messenger_process_events(pn_messe
       pn_messenger_process_delivery(messenger, event);
       break;
     case PN_TRANSPORT:
+    case PN_TRANSPORT_ERROR:
+    case PN_TRANSPORT_HEAD_CLOSED:
+    case PN_TRANSPORT_TAIL_CLOSED:
+    case PN_TRANSPORT_CLOSED:
       pn_messenger_process_transport(messenger, event);
       break;
     case PN_EVENT_NONE:
       break;
+    case PN_CONNECTION_BOUND:
+      break;
+    case PN_CONNECTION_UNBOUND:
+      break;
     case PN_CONNECTION_FINAL:
       break;
     case PN_SESSION_FINAL:

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Wed Sep 17 19:48:17 2014
@@ -197,7 +197,7 @@ static int ssl_failed(pn_ssl_t *ssl)
     ERR_error_string_n( ssl_err, buf, sizeof(buf) );
   }
   _log_ssl_error(NULL);    // spit out any remaining errors to the log file
-  ssl->transport->tail_closed = true;
+  pni_close_tail(ssl->transport);
   pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
   return PN_EOS;
 }

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Wed Sep 17 19:48:17 2014
@@ -173,6 +173,9 @@ static void pn_transport_initialize(void
   transport->output_pending = 0;
 
   transport->done_processing = false;
+
+  transport->posted_head_closed = false;
+  transport->posted_tail_closed = false;
 }
 
 pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -261,11 +264,17 @@ static void pn_transport_finalize(void *
 
 int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
 {
-  if (!transport) return PN_ARG_ERR;
+  assert(transport);
+  assert(connection);
+
   if (transport->connection) return PN_STATE_ERR;
   if (connection->transport) return PN_STATE_ERR;
+
   transport->connection = connection;
   connection->transport = transport;
+
+  pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+
   pn_incref(connection);
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
@@ -273,6 +282,7 @@ int pn_transport_bind(pn_transport_t *tr
     transport->disp->halt = false;
     transport_consume(transport);        // blech - testBindAfterOpen
   }
+
   return 0;
 }
 
@@ -300,9 +310,12 @@ int pn_transport_unbind(pn_transport_t *
   assert(transport);
   if (!transport->connection) return 0;
 
+
   pn_connection_t *conn = transport->connection;
   transport->connection = NULL;
 
+  pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND);
+
   // XXX: what happens if the endpoints are freed before we get here?
   pn_session_t *ssn = pn_session_head(conn, 0);
   while (ssn) {
@@ -411,6 +424,15 @@ int pn_post_close(pn_transport_t *transp
                        (bool) condition, ERROR, condition, description, info);
 }
 
+static pn_collector_t *pni_transport_collector(pn_transport_t *transport)
+{
+  if (transport->connection && transport->connection->collector) {
+    return transport->connection->collector;
+  } else {
+    return NULL;
+  }
+}
+
 int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
 {
   va_list ap;
@@ -429,6 +451,8 @@ int pn_do_error(pn_transport_t *transpor
   }
   transport->disp->halt = true;
   pn_transport_logf(transport, "ERROR %s %s", condition, buf);
+  pn_collector_t *collector = pni_transport_collector(transport);
+  pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR);
   return PN_ERR;
 }
 
@@ -995,6 +1019,14 @@ ssize_t pn_transport_input(pn_transport_
   return original - available;
 }
 
+static void pni_maybe_post_closed(pn_transport_t *transport)
+{
+  pn_collector_t *collector = pni_transport_collector(transport);
+  if (transport->posted_head_closed && transport->posted_tail_closed) {
+    pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED);
+  }
+}
+
 // process pending input until none remaining or EOS
 static ssize_t transport_consume(pn_transport_t *transport)
 {
@@ -1016,6 +1048,12 @@ static ssize_t transport_consume(pn_tran
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_transport_log(transport, "  <- EOS");
       transport->input_pending = 0;  // XXX ???
+      if (!transport->posted_tail_closed) {
+        pn_collector_t *collector = pni_transport_collector(transport);
+        pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED);
+        transport->posted_tail_closed = true;
+        pni_maybe_post_closed(transport);
+      }
       return n;
     }
   }
@@ -2037,6 +2075,13 @@ ssize_t pn_transport_push(pn_transport_t
   }
 }
 
+void pni_close_tail(pn_transport_t *transport)
+{
+  if (!transport->tail_closed) {
+    transport->tail_closed = true;
+  }
+}
+
 int pn_transport_process(pn_transport_t *transport, size_t size)
 {
   assert(transport);
@@ -2046,7 +2091,7 @@ int pn_transport_process(pn_transport_t 
 
   ssize_t n = transport_consume( transport );
   if (n == PN_EOS) {
-    transport->tail_closed = true;
+    pni_close_tail(transport);
   }
 
   if (n < 0 && n != PN_EOS) return n;
@@ -2056,7 +2101,7 @@ int pn_transport_process(pn_transport_t 
 // input stream has closed
 int pn_transport_close_tail(pn_transport_t *transport)
 {
-  transport->tail_closed = true;
+  pni_close_tail(transport);
   transport_consume( transport );
   return 0;
   // XXX: what if not all input processed at this point?  do we care???
@@ -2108,6 +2153,14 @@ void pn_transport_pop(pn_transport_t *tr
       memmove( transport->output_buf,  &transport->output_buf[size],
                transport->output_pending );
     }
+
+    if (!transport->output_pending && pn_transport_pending(transport) < 0 &&
+        !transport->posted_head_closed) {
+      pn_collector_t *collector = pni_transport_collector(transport);
+      pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED);
+      transport->posted_head_closed = true;
+      pni_maybe_post_closed(transport);
+    }
   }
 }
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Wed
Sep 17 19:48:17 2014
@@ -28,56 +28,40 @@ package org.apache.qpid.proton.engine;
 
 public interface Event
 {
-    public enum Category {
-        CONNECTION,
-        SESSION,
-        LINK,
-        DELIVERY,
-        TRANSPORT;
-    }
 
     public enum Type {
-        CONNECTION_INIT(Category.CONNECTION, 1),
-        CONNECTION_OPEN(Category.CONNECTION, 2),
-        CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3),
-        CONNECTION_CLOSE(Category.CONNECTION, 4),
-        CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5),
-        CONNECTION_FINAL(Category.CONNECTION, 6),
-
-        SESSION_INIT(Category.SESSION, 1),
-        SESSION_OPEN(Category.SESSION, 2),
-        SESSION_REMOTE_OPEN(Category.SESSION, 3),
-        SESSION_CLOSE(Category.SESSION, 4),
-        SESSION_REMOTE_CLOSE(Category.SESSION, 5),
-        SESSION_FINAL(Category.SESSION, 6),
-
-        LINK_INIT(Category.LINK, 1),
-        LINK_OPEN(Category.LINK, 2),
-        LINK_REMOTE_OPEN(Category.LINK, 3),
-        LINK_CLOSE(Category.LINK, 4),
-        LINK_REMOTE_CLOSE(Category.LINK, 5),
-        LINK_FLOW(Category.LINK, 6),
-        LINK_FINAL(Category.LINK, 7),
-
-        DELIVERY(Category.DELIVERY, 1),
-        TRANSPORT(Category.TRANSPORT, 1);
-
-        private int _opcode;
-        private Category _category;
-
-        private Type(Category c, int o)
-        {
-            this._category = c;
-            this._opcode = o;
-        }
-
-        public Category getCategory()
-        {
-            return this._category;
-        }
-    }
+        CONNECTION_INIT,
+        CONNECTION_BOUND,
+        CONNECTION_UNBOUND,
+        CONNECTION_OPEN,
+        CONNECTION_REMOTE_OPEN,
+        CONNECTION_CLOSE,
+        CONNECTION_REMOTE_CLOSE,
+        CONNECTION_FINAL,
+
+        SESSION_INIT,
+        SESSION_OPEN,
+        SESSION_REMOTE_OPEN,
+        SESSION_CLOSE,
+        SESSION_REMOTE_CLOSE,
+        SESSION_FINAL,
+
+        LINK_INIT,
+        LINK_OPEN,
+        LINK_REMOTE_OPEN,
+        LINK_CLOSE,
+        LINK_REMOTE_CLOSE,
+        LINK_FLOW,
+        LINK_FINAL,
 
-    Category getCategory();
+        DELIVERY,
+
+        TRANSPORT,
+        TRANSPORT_ERROR,
+        TRANSPORT_HEAD_CLOSED,
+        TRANSPORT_TAIL_CLOSED,
+        TRANSPORT_CLOSED
+    }
 
     Type getType();
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
Wed Sep 17 19:48:17 2014
@@ -56,11 +56,6 @@ class EventImpl implements Event
         context = null;
     }
 
-    public Category getCategory()
-    {
-        return type.getCategory();
-    }
-
     public Type getType()
     {
         return type;
@@ -73,16 +68,15 @@ class EventImpl implements Event
 
     public Connection getConnection()
     {
-        switch (type.getCategory()) {
-        case CONNECTION:
+        if (context instanceof Connection) {
             return (Connection) context;
-        case TRANSPORT:
+        } else if (context instanceof Transport) {
             Transport transport = getTransport();
             if (transport == null) {
                 return null;
             }
             return ((TransportImpl) transport).getConnectionImpl();
-        default:
+        } else {
             Session ssn = getSession();
             if (ssn == null) {
                 return null;
@@ -93,10 +87,9 @@ class EventImpl implements Event
 
     public Session getSession()
     {
-        switch (type.getCategory()) {
-        case SESSION:
+        if (context instanceof Session) {
             return (Session) context;
-        default:
+        } else {
             Link link = getLink();
             if (link == null) {
                 return null;
@@ -107,10 +100,9 @@ class EventImpl implements Event
 
     public Link getLink()
     {
-        switch (type.getCategory()) {
-        case LINK:
+        if (context instanceof Link) {
             return (Link) context;
-        default:
+        } else {
             Delivery dlv = getDelivery();
             if (dlv == null) {
                 return null;
@@ -121,20 +113,18 @@ class EventImpl implements Event
 
     public Delivery getDelivery()
     {
-        switch (type.getCategory()) {
-        case DELIVERY:
+        if (context instanceof Delivery) {
             return (Delivery) context;
-        default:
+        } else {
             return null;
         }
     }
 
     public Transport getTransport()
     {
-        switch (type.getCategory()) {
-        case TRANSPORT:
+        if (context instanceof Transport) {
             return (Transport) context;
-        default:
+        } else {
             return null;
         }
     }
@@ -150,4 +140,5 @@ class EventImpl implements Event
     {
         return "EventImpl{" + "type=" + type + ", context=" + context + '}';
     }
+
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Wed Sep 17 19:48:17 2014
@@ -123,6 +123,9 @@ public class TransportImpl extends Endpo
     private boolean _head_closed = false;
     private TransportException _tail_error = null;
 
+    private boolean postedHeadClosed = false;
+    private boolean postedTailClosed = false;
+
     /**
      * @deprecated This constructor's visibility will be reduced to the default scope in
a future release.
      * Client code outside this module should use a {@link EngineFactory} instead
@@ -210,8 +213,10 @@ public class TransportImpl extends Endpo
     @Override
     public void bind(Connection conn)
     {
-        _connectionEndpoint = (ConnectionImpl) conn;
         // TODO - check if already bound
+
+        _connectionEndpoint = (ConnectionImpl) conn;
+        put(Event.Type.CONNECTION_BOUND, conn);
         _connectionEndpoint.setTransport(this);
         _connectionEndpoint.incref();
 
@@ -230,6 +235,7 @@ public class TransportImpl extends Endpo
     @Override
     public void unbind()
     {
+        put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint);
         _connectionEndpoint.modifyEndpoints();
 
         _connectionEndpoint.setTransport(null);
@@ -1236,6 +1242,19 @@ public class TransportImpl extends Endpo
         return _closeReceived;
     }
 
+    void put(Event.Type type, Object context) {
+        if (_connectionEndpoint != null) {
+            _connectionEndpoint.put(type, context);
+        }
+    }
+
+    private void maybePostClosed()
+    {
+        if (postedHeadClosed && postedTailClosed) {
+            put(Event.Type.TRANSPORT_CLOSED, this);
+        }
+    }
+
     @Override
     public void closed(TransportException error)
     {
@@ -1247,6 +1266,14 @@ public class TransportImpl extends Endpo
             }
             _head_closed = true;
         }
+        if (_tail_error != null) {
+            put(Event.Type.TRANSPORT_ERROR, this);
+        }
+        if (!postedTailClosed) {
+            put(Event.Type.TRANSPORT_TAIL_CLOSED, this);
+            postedTailClosed = true;
+            maybePostClosed();
+        }
     }
 
     @Override
@@ -1351,6 +1378,13 @@ public class TransportImpl extends Endpo
     {
         init();
         _outputProcessor.pop(bytes);
+
+        int p = pending();
+        if (p < 0 && !postedHeadClosed) {
+            put(Event.Type.TRANSPORT_HEAD_CLOSED, this);
+            postedHeadClosed = true;
+            maybePostClosed();
+        }
     }
 
     @Override

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Wed Sep 17 19:48:17 2014
@@ -942,13 +942,9 @@ def pn_transport_closed(trans):
 
 from org.apache.qpid.proton.engine import Event
 
-PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION
-PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION
-PN_EVENT_CATEGORY_LINK = Event.Category.LINK
-PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY
-PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT
-
 PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT
+PN_CONNECTION_BOUND = Event.Type.CONNECTION_BOUND
+PN_CONNECTION_UNBOUND = Event.Type.CONNECTION_UNBOUND
 PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN
 PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN
 PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE
@@ -969,6 +965,10 @@ PN_LINK_FLOW = Event.Type.LINK_FLOW
 PN_LINK_FINAL = Event.Type.LINK_FINAL
 PN_DELIVERY = Event.Type.DELIVERY
 PN_TRANSPORT = Event.Type.TRANSPORT
+PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR
+PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED
+PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED
+PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED
 
 def pn_collector():
   return Proton.collector()

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1625784&r1=1625783&r2=1625784&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Wed Sep 17 19:48:17 2014
@@ -2150,8 +2150,8 @@ class EventTest(CollectorTest):
     self.pump()
     c1.free()
     c1._transport.unbind()
-    self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL,
-                Event.CONNECTION_FINAL)
+    self.expect(Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL,
+                Event.SESSION_FINAL, Event.CONNECTION_FINAL)
 
   def testConnectionINIT_FINAL(self):
     c = Connection()
@@ -2215,8 +2215,8 @@ class EventTest(CollectorTest):
     self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
     rcv.session.connection._transport.unbind()
     rcv.session.connection.free()
-    self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL,
-                Event.CONNECTION_FINAL)
+    self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL,
+                Event.SESSION_FINAL, Event.CONNECTION_FINAL)
 
   def testDeliveryEventsDisp(self):
     snd, rcv = self.testFlowEvents()
@@ -2235,6 +2235,60 @@ class EventTest(CollectorTest):
     event = self.expect(Event.DELIVERY)
     assert event.context == dlv
 
+  def testConnectionBOUND_UNBOUND(self):
+    c = Connection()
+    c.collect(self.collector)
+    self.expect(Event.CONNECTION_INIT)
+    t = Transport()
+    t.bind(c)
+    self.expect(Event.CONNECTION_BOUND)
+    t.unbind()
+    self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT)
+
+  def testTransportERROR_CLOSE(self):
+    c = Connection()
+    c.collect(self.collector)
+    self.expect(Event.CONNECTION_INIT)
+    t = Transport()
+    t.bind(c)
+    self.expect(Event.CONNECTION_BOUND)
+    t.push("asdf")
+    self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
+    p = t.pending()
+    assert p > 0
+    # XXX: can't include this because java behaviour is different
+    #assert "AMQP header mismatch" in t.peek(p), repr(t.peek(p))
+    t.pop(p)
+    self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
+  def testTransportCLOSED(self):
+    c = Connection()
+    c.collect(self.collector)
+    self.expect(Event.CONNECTION_INIT)
+    t = Transport()
+    t.bind(c)
+    c.open()
+
+    self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_OPEN, Event.TRANSPORT)
+
+    c2 = Connection()
+    t2 = Transport()
+    t2.bind(c2)
+    c2.open()
+    c2.close()
+
+    pump(t, t2)
+
+    self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE,
+                Event.TRANSPORT_TAIL_CLOSED)
+
+    c.close()
+
+    pump(t, t2)
+
+    self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT,
+                Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
+
 class PeerTest(CollectorTest):
 
   def setup(self):
@@ -2255,7 +2309,8 @@ class TeardownLeakTest(PeerTest):
 
   def doLeak(self, local, remote):
     self.connection.open()
-    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT)
+    self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
+                Event.CONNECTION_OPEN, Event.TRANSPORT)
 
     ssn = self.connection.session()
     ssn.open()
@@ -2294,19 +2349,23 @@ class TeardownLeakTest(PeerTest):
     self.pump()
 
     if remote:
-      self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE,
-                         Event.CONNECTION_REMOTE_CLOSE),
-                        (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL,
-                         Event.SESSION_REMOTE_CLOSE,
-                         Event.CONNECTION_REMOTE_CLOSE))
+      self.expect_oneof((Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE,
+                         Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE,
+                         Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED),
+                        (Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE,
+                         Event.LINK_FINAL, Event.SESSION_REMOTE_CLOSE,
+                         Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED,
+                         Event.TRANSPORT_CLOSED))
     else:
-      self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE)
+      self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE,
+                  Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED,
+                  Event.TRANSPORT_CLOSED)
 
     self.connection.free()
     self.transport.unbind()
 
-    self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
-                      (Event.SESSION_FINAL, Event.CONNECTION_FINAL))
+    self.expect_oneof((Event.LINK_FINAL, Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.CONNECTION_FINAL),
+                      (Event.CONNECTION_UNBOUND, Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL))
 
   def testLocalRemoteLeak(self):
     self.doLeak(True, True)

Added: qpid/proton/trunk/tests/python/proton_tests/scratch.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/scratch.py?rev=1625784&view=auto
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/scratch.py (added)
+++ qpid/proton/trunk/tests/python/proton_tests/scratch.py Wed Sep 17 19:48:17 2014
@@ -0,0 +1,44 @@
+  def xxx_test_reopen_on_same_session(self):
+    ssn1 = self.snd.session
+    ssn2 = self.rcv.session
+
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+
+    assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+    assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+    self.snd.close()
+    self.rcv.close()
+    self.pump()
+
+    assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+    assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
+
+    print self.snd._link
+    self.snd = ssn1.sender("test-link")
+    print self.snd._link
+    self.rcv = ssn2.receiver("test-link")
+    self.snd.open()
+    self.rcv.open()
+    self.pump()
+
+    assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+    assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+
+class SessionPipelineTest(PeerTest):
+
+  def xxx_test(self):
+    self.connection.open()
+    self.peer.open()
+    self.pump()
+    ssn = self.connection.session()
+    ssn.open()
+    self.pump()
+    peer_ssn = self.peer.session_head(0)
+    ssn.close()
+    self.pump()
+    peer_ssn.close()
+    self.peer.close()
+    self.pump()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message