qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raj...@apache.org
Subject [04/50] [abbrv] qpid-proton git commit: PROTON-827: go binding: Added endpoint Disconnected events.
Date Fri, 15 May 2015 13:23:40 GMT
PROTON-827: go binding: Added endpoint Disconnected events.

On connection disconnect, the MessagingHandler receives LinkDisconnected,
SessionDisconnected and ConnectionDisconnected events for all disconnected
objects associated with the connection. Simplifies cleanup on disconnect.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1e4118f6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1e4118f6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1e4118f6

Branch: refs/heads/rajith-codec
Commit: 1e4118f63b9e64c3390f47d5ee91b17dfc24ab5a
Parents: 49335b7
Author: Alan Conway <aconway@redhat.com>
Authored: Wed May 6 16:03:05 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed May 6 17:04:27 2015 -0400

----------------------------------------------------------------------
 examples/go/event/broker.go                     | 10 +-
 proton-c/bindings/go/src/genwrap.go             |  6 +-
 .../go/src/qpid.apache.org/proton/event/doc.go  |  2 +-
 .../qpid.apache.org/proton/event/handlers.go    | 98 ++++++++++++--------
 .../go/src/qpid.apache.org/proton/event/pump.go | 14 +--
 .../qpid.apache.org/proton/event/wrappers.go    | 29 ++++++
 .../proton/event/wrappers_gen.go                | 23 +----
 proton-c/bindings/python/proton/handlers.py     |  2 +-
 8 files changed, 103 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/examples/go/event/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go
index 1c963e3..9720843 100644
--- a/examples/go/event/broker.go
+++ b/examples/go/event/broker.go
@@ -172,17 +172,9 @@ func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event)
 			q.subscribe(e.Link())
 		}
 
-	case event.MLinkClosing:
+	case event.MLinkDisconnected, event.MLinkClosing:
 		b.unsubscribe(e.Link())
 
-	case event.MDisconnected:
-		fallthrough
-	case event.MConnectionClosing:
-		c := e.Connection()
-		for l := c.LinkHead(event.SRemoteActive); !l.IsNil(); l = l.Next(event.SRemoteActive) {
-			b.unsubscribe(l)
-		}
-
 	case event.MSendable:
 		q := b.getQueue(e.Link().RemoteSource().Address())
 		q.popTo(e.Connection().Pump(), e.Link())

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go
index e269367..e34c045 100644
--- a/proton-c/bindings/go/src/genwrap.go
+++ b/proton-c/bindings/go/src/genwrap.go
@@ -152,9 +152,7 @@ var (
 func event(out io.Writer) {
 	event_h := readHeader("event")
 
-	// event.h API functions
-	apiWrapFns("event", event_h, out)
-	fmt.Fprintln(out, `func (e Event) String() string { return e.Type().String() }`)
+	// Event is implented by hand in wrappers.go
 
 	// Get all the pn_event_type_t enum values
 	var etypes []eventType
@@ -206,6 +204,8 @@ func (g genType) goConvert(value string) string {
 	switch g.Gotype {
 	case "string":
 		return fmt.Sprintf("C.GoString(%s)", value)
+	case "Event":
+		return fmt.Sprintf("makeEvent(%s)", value)
 	default:
 		return fmt.Sprintf("%s(%s)", g.Gotype, value)
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
index 6a1c8ac..a0d45d7 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
@@ -23,7 +23,7 @@ Package event provides a low-level API to the proton AMQP engine.
 For most tasks, consider instead package qpid.apache.org/proton/messaging.
 It provides a higher-level, concurrent API that is easier to use.
 
-The API is event based. There are two alternative styles of handler. CoreHandler
+The API is event based. There are two alternative styles of handler. EventHandler
 provides the core proton events. MessagingHandler provides a slighly simplified
 view of the event stream and automates some common tasks.
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
index 450a114..5fc679a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
@@ -27,8 +27,8 @@ import (
 	"qpid.apache.org/proton/internal"
 )
 
-// CoreHandler handles core proton events.
-type CoreHandler interface {
+// EventHandler handles core proton events.
+type EventHandler interface {
 	// HandleEvent is called with an event.
 	// Typically HandleEvent() is implemented as a switch on e.Type()
 	HandleEvent(e Event) error
@@ -44,11 +44,11 @@ func (h cHandler) HandleEvent(e Event) error {
 	return nil // FIXME aconway 2015-03-31: error handling
 }
 
-// MessagingHandler provides an alternative interface to CoreHandler.
+// MessagingHandler provides an alternative interface to EventHandler.
 // it is easier to use for most applications that send and receive messages.
 //
 // Implement this interface and then wrap your value with a MessagingHandlerDelegator.
-// MessagingHandlerDelegator implements CoreHandler and can be registered with a Pump.
+// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump.
 //
 type MessagingHandler interface {
 	HandleMessagingEvent(MessagingEventType, Event) error
@@ -57,82 +57,80 @@ type MessagingHandler interface {
 // MessagingEventType provides a set of events that are easier to work with than the
 // core events defined by EventType
 //
+// There are 3 types of "endpoint": Connection, Session and Link.
+// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error.
+// The meaning of these events is as follows:
+//
+// Opening: The remote end opened, the local end will open automatically.
+//
+// Opened: Both ends are open, regardless of which end opened first.
+//
+// Closing: The remote end closed without error, the local end will close automatically.
+//
+// Error: The remote end closed with an error, the local end will close automatically.
+//
+// Closed: Both ends are closed, regardless of which end closed first or if there was an
error.
+//
 type MessagingEventType int
 
 const (
 	// The event loop starts.
 	MStart MessagingEventType = iota
-
 	// The peer closes the connection with an error condition.
 	MConnectionError
-
 	// The peer closes the session with an error condition.
 	MSessionError
-
 	// The peer closes the link with an error condition.
 	MLinkError
-
 	// The peer Initiates the opening of the connection.
 	MConnectionOpening
-
 	// The peer initiates the opening of the session.
 	MSessionOpening
-
 	// The peer initiates the opening of the link.
 	MLinkOpening
-
 	// The connection is opened.
 	MConnectionOpened
-
 	// The session is opened.
 	MSessionOpened
-
 	// The link is opened.
 	MLinkOpened
-
 	// The peer initiates the closing of the connection.
 	MConnectionClosing
-
 	// The peer initiates the closing of the session.
 	MSessionClosing
-
 	// The peer initiates the closing of the link.
 	MLinkClosing
-
 	// Both ends of the connection are closed.
 	MConnectionClosed
-
 	// Both ends of the session are closed.
 	MSessionClosed
-
 	// Both ends of the link are closed.
 	MLinkClosed
-
-	// The socket is disconnected.
-	MDisconnected
-
+	// The connection is disconnected.
+	MConnectionDisconnected
+	// The session's connection was disconnected
+	MSessionDisconnected
+	// The session's connection was disconnected
+	MLinkDisconnected
 	// The sender link has credit and messages can
 	// therefore be transferred.
 	MSendable
-
 	// The remote peer accepts an outgoing message.
 	MAccepted
-
 	// The remote peer rejects an outgoing message.
 	MRejected
-
 	// The peer releases an outgoing message. Note that this may be in response to
 	// either the RELEASE or MODIFIED state as defined by the AMQP specification.
 	MReleased
-
 	// The peer has settled the outgoing message. This is the point at which it
 	// shouod never be retransmitted.
 	MSettled
-
 	// A message is received. Call proton.EventMessage(Event) to get the message.
 	// To manage the outcome of this messages (e.g. to accept or reject the message)
 	// use Event.Delivery().
 	MMessage
+	// The event loop terminates, there are no more events to process.
+	MFinal
 )
 
 func (t MessagingEventType) String() string {
@@ -169,8 +167,12 @@ func (t MessagingEventType) String() string {
 		return "SessionClosed"
 	case MLinkClosed:
 		return "LinkClosed"
-	case MDisconnected:
-		return "Disconnected"
+	case MConnectionDisconnected:
+		return "ConnectionDisconnected"
+	case MSessionDisconnected:
+		return "MSessionDisconnected"
+	case MLinkDisconnected:
+		return "MLinkDisconnected"
 	case MSendable:
 		return "Sendable"
 	case MAccepted:
@@ -223,19 +225,20 @@ func (d endpointDelegator) HandleEvent(e Event) (err error) {
 
 	case d.remoteClose:
 		var err1 error
-		if endpoint.RemoteCondition().IsSet() {
+		if endpoint.RemoteCondition().IsSet() { // Closed with error
 			err1 = d.delegate.HandleMessagingEvent(d.error, e)
-			if err1 == nil {
+			if err1 == nil { // Don't overwrite an application error.
 				err1 = endpoint.RemoteCondition().Error()
 			}
+		} else {
+			err1 = d.delegate.HandleMessagingEvent(d.closing, e)
 		}
 		if state.Is(SLocalClosed) {
 			err = d.delegate.HandleMessagingEvent(d.closed, e)
-		} else {
-			err = d.delegate.HandleMessagingEvent(d.closing, e)
+		} else if state.Is(SLocalActive) {
 			endpoint.Close()
 		}
-		if err1 != nil {
+		if err1 != nil { // Keep the first error.
 			err = err1
 		}
 
@@ -252,13 +255,13 @@ func (d endpointDelegator) HandleEvent(e Event) (err error) {
 	return err
 }
 
-// MessagingDelegator implments a CoreHandler and delegates to a MessagingHandler.
+// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler.
 // You can modify the exported fields before you pass the MessagingDelegator to
 // a Pump.
 type MessagingDelegator struct {
 	delegate                   MessagingHandler
 	connection, session, link  endpointDelegator
-	handshaker, flowcontroller CoreHandler
+	handshaker, flowcontroller EventHandler
 
 	// AutoSettle (default true) automatically pre-settle outgoing messages.
 	AutoSettle bool
@@ -271,7 +274,7 @@ type MessagingDelegator struct {
 	PeerCloseError bool
 }
 
-func NewMessagingDelegator(h MessagingHandler) CoreHandler {
+func NewMessagingDelegator(h MessagingHandler) EventHandler {
 	return &MessagingDelegator{
 		delegate: h,
 		connection: endpointDelegator{
@@ -303,13 +306,15 @@ func NewMessagingDelegator(h MessagingHandler) CoreHandler {
 	}
 }
 
-func handleIf(h CoreHandler, e Event) error {
+func handleIf(h EventHandler, e Event) error {
 	if h != nil {
 		return h.HandleEvent(e)
 	}
 	return nil
 }
 
+// Handle a proton event by passing the corresponding MessagingEvent(s) to
+// the MessagingHandler.
 func (d *MessagingDelegator) HandleEvent(e Event) error {
 	handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling.
 
@@ -341,7 +346,20 @@ func (d *MessagingDelegator) HandleEvent(e Event) error {
 		}
 
 	case ETransportTailClosed:
-		d.delegate.HandleMessagingEvent(MDisconnected, e)
+		c := e.Connection()
+		for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) {
+			e2 := e
+			e2.link = l
+			e2.session = l.Session()
+			d.delegate.HandleMessagingEvent(MLinkDisconnected, e2)
+		}
+		for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) {
+			e2 := e
+			e2.session = s
+			d.delegate.HandleMessagingEvent(MSessionDisconnected, e2)
+		}
+		d.delegate.HandleMessagingEvent(MConnectionDisconnected, e)
+		d.delegate.HandleMessagingEvent(MFinal, e)
 	}
 	return nil
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
index 564ca6c..c9c5ca3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
@@ -72,7 +72,7 @@ data to/from a net.Conn. You can create multiple Pumps to handle multiple
 connections concurrently.
 
 Methods in this package can only be called in the goroutine that executes the
-corresponding Pump.Run(). You implement the CoreHandler or MessagingHandler
+corresponding Pump.Run(). You implement the EventHandler or MessagingHandler
 interfaces and provide those values to NewPump(). Their HandleEvent method will be
 called in the Pump goroutine, in typical event-driven style.
 
@@ -109,10 +109,10 @@ type Pump struct {
 	transport  *C.pn_transport_t
 	connection *C.pn_connection_t
 	collector  *C.pn_collector_t
-	read       *bufferChan   // Read buffers channel.
-	write      *bufferChan   // Write buffers channel.
-	handlers   []CoreHandler // Handlers for proton events.
-	running    chan struct{} // This channel will be closed when the goroutines are done.
+	read       *bufferChan    // Read buffers channel.
+	write      *bufferChan    // Write buffers channel.
+	handlers   []EventHandler // Handlers for proton events.
+	running    chan struct{}  // This channel will be closed when the goroutines are done.
 }
 
 const bufferSize = 4096
@@ -129,7 +129,7 @@ func init() {
 // The goroutine will exit when the pump is closed or disconnected.
 // You can check for errors on Pump.Error.
 //
-func NewPump(conn net.Conn, handlers ...CoreHandler) (*Pump, error) {
+func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) {
 	// Save the connection ID for Connection.String()
 	p := &Pump{
 		Inject:     make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack
@@ -344,7 +344,7 @@ func (p *Pump) handle(e Event) error {
 func (p *Pump) process() error {
 	// FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump
 	for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector)
{
-		e := Event{ce}
+		e := makeEvent(ce)
 		if err := p.handle(e); err != nil {
 			return err
 		}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
index 3584311..d2c4e43 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
@@ -24,6 +24,7 @@ package event
 //#include <proton/session.h>
 //#include <proton/session.h>
 //#include <proton/delivery.h>
+//#include <proton/link.h>
 //#include <proton/event.h>
 //#include <proton/transport.h>
 //#include <proton/link.h>
@@ -38,6 +39,34 @@ import (
 
 // FIXME aconway 2015-05-05: Documentation for generated types.
 
+// Event is an AMQP protocol event.
+type Event struct {
+	pn         *C.pn_event_t
+	eventType  EventType
+	connection Connection
+	session    Session
+	link       Link
+	delivery   Delivery
+}
+
+func makeEvent(pn *C.pn_event_t) Event {
+	return Event{
+		pn:         pn,
+		eventType:  EventType(C.pn_event_type(pn)),
+		connection: Connection{C.pn_event_connection(pn)},
+		session:    Session{C.pn_event_session(pn)},
+		link:       Link{C.pn_event_link(pn)},
+		delivery:   Delivery{C.pn_event_delivery(pn)},
+	}
+}
+func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
+func (e Event) Type() EventType        { return e.eventType }
+func (e Event) Connection() Connection { return e.connection }
+func (e Event) Session() Session       { return e.session }
+func (e Event) Link() Link             { return e.link }
+func (e Event) Delivery() Delivery     { return e.delivery }
+func (e Event) String() string         { return e.Type().String() }
+
 // Data holds a pointer to decoded AMQP data.
 // Use proton.marshal/unmarshal to access it as Go data types.
 //

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
index 8f678ca..f53e8bb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
@@ -41,26 +41,6 @@ import (
 // #include <proton/connection.h>
 import "C"
 
-type Event struct{ pn *C.pn_event_t }
-
-func (e Event) IsNil() bool { return e.pn == nil }
-func (e Event) Type() EventType {
-	return EventType(C.pn_event_type(e.pn))
-}
-func (e Event) Connection() Connection {
-	return Connection{C.pn_event_connection(e.pn)}
-}
-func (e Event) Session() Session {
-	return Session{C.pn_event_session(e.pn)}
-}
-func (e Event) Link() Link {
-	return Link{C.pn_event_link(e.pn)}
-}
-func (e Event) Delivery() Delivery {
-	return Delivery{C.pn_event_delivery(e.pn)}
-}
-func (e Event) String() string { return e.Type().String() }
-
 type EventType int
 
 const (
@@ -411,6 +391,9 @@ func (d Delivery) Update(state uint64) {
 func (d Delivery) Clear() {
 	C.pn_delivery_clear(d.pn)
 }
+func (d Delivery) Current() bool {
+	return bool(C.pn_delivery_current(d.pn))
+}
 func (d Delivery) Settle() {
 	C.pn_delivery_settle(d.pn)
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 6836788..ac19e6f 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -254,7 +254,7 @@ class EndpointStateHandler(Handler):
         if event.connection.remote_condition:
             self.on_connection_error(event)
         elif self.is_local_closed(event.connection):
-            self.on_connection_closed(event)
+           self.on_connection_closed(event)
         else:
             self.on_connection_closing(event)
         event.connection.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message