Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 949DD17472 for ; Fri, 15 May 2015 13:23:38 +0000 (UTC) Received: (qmail 94288 invoked by uid 500); 15 May 2015 13:23:38 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 94132 invoked by uid 500); 15 May 2015 13:23:38 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 94014 invoked by uid 99); 15 May 2015 13:23:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 13:23:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E58BAE35A7; Fri, 15 May 2015 13:23:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rajith@apache.org To: commits@qpid.apache.org Date: Fri, 15 May 2015 13:23:40 -0000 Message-Id: In-Reply-To: <96c2fc2e5bc24abf9e314b9af137a923@git.apache.org> References: <96c2fc2e5bc24abf9e314b9af137a923@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] [abbrv] qpid-proton git commit: PROTON-827: go binding: Added endpoint Disconnected events. PROTON-827: go binding: Added endpoint Disconnected events. On connection disconnect, the MessagingHandler receives LinkDisconnected, SessionDisconnected and ConnectionDisconnected events for all disconnected objects associated with the connection. Simplifies cleanup on disconnect. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1e4118f6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1e4118f6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1e4118f6 Branch: refs/heads/rajith-codec Commit: 1e4118f63b9e64c3390f47d5ee91b17dfc24ab5a Parents: 49335b7 Author: Alan Conway Authored: Wed May 6 16:03:05 2015 -0400 Committer: Alan Conway Committed: Wed May 6 17:04:27 2015 -0400 ---------------------------------------------------------------------- examples/go/event/broker.go | 10 +- proton-c/bindings/go/src/genwrap.go | 6 +- .../go/src/qpid.apache.org/proton/event/doc.go | 2 +- .../qpid.apache.org/proton/event/handlers.go | 98 ++++++++++++-------- .../go/src/qpid.apache.org/proton/event/pump.go | 14 +-- .../qpid.apache.org/proton/event/wrappers.go | 29 ++++++ .../proton/event/wrappers_gen.go | 23 +---- proton-c/bindings/python/proton/handlers.py | 2 +- 8 files changed, 103 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/examples/go/event/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go index 1c963e3..9720843 100644 --- a/examples/go/event/broker.go +++ b/examples/go/event/broker.go @@ -172,17 +172,9 @@ func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) q.subscribe(e.Link()) } - case event.MLinkClosing: + case event.MLinkDisconnected, event.MLinkClosing: b.unsubscribe(e.Link()) - case event.MDisconnected: - fallthrough - case event.MConnectionClosing: - c := e.Connection() - for l := c.LinkHead(event.SRemoteActive); !l.IsNil(); l = l.Next(event.SRemoteActive) { - b.unsubscribe(l) - } - case event.MSendable: q := b.getQueue(e.Link().RemoteSource().Address()) q.popTo(e.Connection().Pump(), e.Link()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go index e269367..e34c045 100644 --- a/proton-c/bindings/go/src/genwrap.go +++ b/proton-c/bindings/go/src/genwrap.go @@ -152,9 +152,7 @@ var ( func event(out io.Writer) { event_h := readHeader("event") - // event.h API functions - apiWrapFns("event", event_h, out) - fmt.Fprintln(out, `func (e Event) String() string { return e.Type().String() }`) + // Event is implented by hand in wrappers.go // Get all the pn_event_type_t enum values var etypes []eventType @@ -206,6 +204,8 @@ func (g genType) goConvert(value string) string { switch g.Gotype { case "string": return fmt.Sprintf("C.GoString(%s)", value) + case "Event": + return fmt.Sprintf("makeEvent(%s)", value) default: return fmt.Sprintf("%s(%s)", g.Gotype, value) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go index 6a1c8ac..a0d45d7 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go @@ -23,7 +23,7 @@ Package event provides a low-level API to the proton AMQP engine. For most tasks, consider instead package qpid.apache.org/proton/messaging. It provides a higher-level, concurrent API that is easier to use. -The API is event based. There are two alternative styles of handler. CoreHandler +The API is event based. There are two alternative styles of handler. EventHandler provides the core proton events. MessagingHandler provides a slighly simplified view of the event stream and automates some common tasks. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go index 450a114..5fc679a 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go @@ -27,8 +27,8 @@ import ( "qpid.apache.org/proton/internal" ) -// CoreHandler handles core proton events. -type CoreHandler interface { +// EventHandler handles core proton events. +type EventHandler interface { // HandleEvent is called with an event. // Typically HandleEvent() is implemented as a switch on e.Type() HandleEvent(e Event) error @@ -44,11 +44,11 @@ func (h cHandler) HandleEvent(e Event) error { return nil // FIXME aconway 2015-03-31: error handling } -// MessagingHandler provides an alternative interface to CoreHandler. +// MessagingHandler provides an alternative interface to EventHandler. // it is easier to use for most applications that send and receive messages. // // Implement this interface and then wrap your value with a MessagingHandlerDelegator. -// MessagingHandlerDelegator implements CoreHandler and can be registered with a Pump. +// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump. // type MessagingHandler interface { HandleMessagingEvent(MessagingEventType, Event) error @@ -57,82 +57,80 @@ type MessagingHandler interface { // MessagingEventType provides a set of events that are easier to work with than the // core events defined by EventType // +// There are 3 types of "endpoint": Connection, Session and Link. +// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error. +// The meaning of these events is as follows: +// +// Opening: The remote end opened, the local end will open automatically. +// +// Opened: Both ends are open, regardless of which end opened first. +// +// Closing: The remote end closed without error, the local end will close automatically. +// +// Error: The remote end closed with an error, the local end will close automatically. +// +// Closed: Both ends are closed, regardless of which end closed first or if there was an error. +// type MessagingEventType int const ( // The event loop starts. MStart MessagingEventType = iota - // The peer closes the connection with an error condition. MConnectionError - // The peer closes the session with an error condition. MSessionError - // The peer closes the link with an error condition. MLinkError - // The peer Initiates the opening of the connection. MConnectionOpening - // The peer initiates the opening of the session. MSessionOpening - // The peer initiates the opening of the link. MLinkOpening - // The connection is opened. MConnectionOpened - // The session is opened. MSessionOpened - // The link is opened. MLinkOpened - // The peer initiates the closing of the connection. MConnectionClosing - // The peer initiates the closing of the session. MSessionClosing - // The peer initiates the closing of the link. MLinkClosing - // Both ends of the connection are closed. MConnectionClosed - // Both ends of the session are closed. MSessionClosed - // Both ends of the link are closed. MLinkClosed - - // The socket is disconnected. - MDisconnected - + // The connection is disconnected. + MConnectionDisconnected + // The session's connection was disconnected + MSessionDisconnected + // The session's connection was disconnected + MLinkDisconnected // The sender link has credit and messages can // therefore be transferred. MSendable - // The remote peer accepts an outgoing message. MAccepted - // The remote peer rejects an outgoing message. MRejected - // The peer releases an outgoing message. Note that this may be in response to // either the RELEASE or MODIFIED state as defined by the AMQP specification. MReleased - // The peer has settled the outgoing message. This is the point at which it // shouod never be retransmitted. MSettled - // A message is received. Call proton.EventMessage(Event) to get the message. // To manage the outcome of this messages (e.g. to accept or reject the message) // use Event.Delivery(). MMessage + // The event loop terminates, there are no more events to process. + MFinal ) func (t MessagingEventType) String() string { @@ -169,8 +167,12 @@ func (t MessagingEventType) String() string { return "SessionClosed" case MLinkClosed: return "LinkClosed" - case MDisconnected: - return "Disconnected" + case MConnectionDisconnected: + return "ConnectionDisconnected" + case MSessionDisconnected: + return "MSessionDisconnected" + case MLinkDisconnected: + return "MLinkDisconnected" case MSendable: return "Sendable" case MAccepted: @@ -223,19 +225,20 @@ func (d endpointDelegator) HandleEvent(e Event) (err error) { case d.remoteClose: var err1 error - if endpoint.RemoteCondition().IsSet() { + if endpoint.RemoteCondition().IsSet() { // Closed with error err1 = d.delegate.HandleMessagingEvent(d.error, e) - if err1 == nil { + if err1 == nil { // Don't overwrite an application error. err1 = endpoint.RemoteCondition().Error() } + } else { + err1 = d.delegate.HandleMessagingEvent(d.closing, e) } if state.Is(SLocalClosed) { err = d.delegate.HandleMessagingEvent(d.closed, e) - } else { - err = d.delegate.HandleMessagingEvent(d.closing, e) + } else if state.Is(SLocalActive) { endpoint.Close() } - if err1 != nil { + if err1 != nil { // Keep the first error. err = err1 } @@ -252,13 +255,13 @@ func (d endpointDelegator) HandleEvent(e Event) (err error) { return err } -// MessagingDelegator implments a CoreHandler and delegates to a MessagingHandler. +// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. // You can modify the exported fields before you pass the MessagingDelegator to // a Pump. type MessagingDelegator struct { delegate MessagingHandler connection, session, link endpointDelegator - handshaker, flowcontroller CoreHandler + handshaker, flowcontroller EventHandler // AutoSettle (default true) automatically pre-settle outgoing messages. AutoSettle bool @@ -271,7 +274,7 @@ type MessagingDelegator struct { PeerCloseError bool } -func NewMessagingDelegator(h MessagingHandler) CoreHandler { +func NewMessagingDelegator(h MessagingHandler) EventHandler { return &MessagingDelegator{ delegate: h, connection: endpointDelegator{ @@ -303,13 +306,15 @@ func NewMessagingDelegator(h MessagingHandler) CoreHandler { } } -func handleIf(h CoreHandler, e Event) error { +func handleIf(h EventHandler, e Event) error { if h != nil { return h.HandleEvent(e) } return nil } +// Handle a proton event by passing the corresponding MessagingEvent(s) to +// the MessagingHandler. func (d *MessagingDelegator) HandleEvent(e Event) error { handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. @@ -341,7 +346,20 @@ func (d *MessagingDelegator) HandleEvent(e Event) error { } case ETransportTailClosed: - d.delegate.HandleMessagingEvent(MDisconnected, e) + c := e.Connection() + for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) { + e2 := e + e2.link = l + e2.session = l.Session() + d.delegate.HandleMessagingEvent(MLinkDisconnected, e2) + } + for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) { + e2 := e + e2.session = s + d.delegate.HandleMessagingEvent(MSessionDisconnected, e2) + } + d.delegate.HandleMessagingEvent(MConnectionDisconnected, e) + d.delegate.HandleMessagingEvent(MFinal, e) } return nil } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go index 564ca6c..c9c5ca3 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go @@ -72,7 +72,7 @@ data to/from a net.Conn. You can create multiple Pumps to handle multiple connections concurrently. Methods in this package can only be called in the goroutine that executes the -corresponding Pump.Run(). You implement the CoreHandler or MessagingHandler +corresponding Pump.Run(). You implement the EventHandler or MessagingHandler interfaces and provide those values to NewPump(). Their HandleEvent method will be called in the Pump goroutine, in typical event-driven style. @@ -109,10 +109,10 @@ type Pump struct { transport *C.pn_transport_t connection *C.pn_connection_t collector *C.pn_collector_t - read *bufferChan // Read buffers channel. - write *bufferChan // Write buffers channel. - handlers []CoreHandler // Handlers for proton events. - running chan struct{} // This channel will be closed when the goroutines are done. + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. } const bufferSize = 4096 @@ -129,7 +129,7 @@ func init() { // The goroutine will exit when the pump is closed or disconnected. // You can check for errors on Pump.Error. // -func NewPump(conn net.Conn, handlers ...CoreHandler) (*Pump, error) { +func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) { // Save the connection ID for Connection.String() p := &Pump{ Inject: make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack @@ -344,7 +344,7 @@ func (p *Pump) handle(e Event) error { func (p *Pump) process() error { // FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { - e := Event{ce} + e := makeEvent(ce) if err := p.handle(e); err != nil { return err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go index 3584311..d2c4e43 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go @@ -24,6 +24,7 @@ package event //#include //#include //#include +//#include //#include //#include //#include @@ -38,6 +39,34 @@ import ( // FIXME aconway 2015-05-05: Documentation for generated types. +// Event is an AMQP protocol event. +type Event struct { + pn *C.pn_event_t + eventType EventType + connection Connection + session Session + link Link + delivery Delivery +} + +func makeEvent(pn *C.pn_event_t) Event { + return Event{ + pn: pn, + eventType: EventType(C.pn_event_type(pn)), + connection: Connection{C.pn_event_connection(pn)}, + session: Session{C.pn_event_session(pn)}, + link: Link{C.pn_event_link(pn)}, + delivery: Delivery{C.pn_event_delivery(pn)}, + } +} +func (e Event) IsNil() bool { return e.eventType == EventType(0) } +func (e Event) Type() EventType { return e.eventType } +func (e Event) Connection() Connection { return e.connection } +func (e Event) Session() Session { return e.session } +func (e Event) Link() Link { return e.link } +func (e Event) Delivery() Delivery { return e.delivery } +func (e Event) String() string { return e.Type().String() } + // Data holds a pointer to decoded AMQP data. // Use proton.marshal/unmarshal to access it as Go data types. // http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go index 8f678ca..f53e8bb 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go @@ -41,26 +41,6 @@ import ( // #include import "C" -type Event struct{ pn *C.pn_event_t } - -func (e Event) IsNil() bool { return e.pn == nil } -func (e Event) Type() EventType { - return EventType(C.pn_event_type(e.pn)) -} -func (e Event) Connection() Connection { - return Connection{C.pn_event_connection(e.pn)} -} -func (e Event) Session() Session { - return Session{C.pn_event_session(e.pn)} -} -func (e Event) Link() Link { - return Link{C.pn_event_link(e.pn)} -} -func (e Event) Delivery() Delivery { - return Delivery{C.pn_event_delivery(e.pn)} -} -func (e Event) String() string { return e.Type().String() } - type EventType int const ( @@ -411,6 +391,9 @@ func (d Delivery) Update(state uint64) { func (d Delivery) Clear() { C.pn_delivery_clear(d.pn) } +func (d Delivery) Current() bool { + return bool(C.pn_delivery_current(d.pn)) +} func (d Delivery) Settle() { C.pn_delivery_settle(d.pn) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index 6836788..ac19e6f 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -254,7 +254,7 @@ class EndpointStateHandler(Handler): if event.connection.remote_condition: self.on_connection_error(event) elif self.is_local_closed(event.connection): - self.on_connection_closed(event) + self.on_connection_closed(event) else: self.on_connection_closing(event) event.connection.close() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org