qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: NO-JIRA: go: Fix issues found by "go vet", minor simplification and bugfixing.
Date Thu, 07 Jan 2016 22:27:38 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 9f219cef0 -> 7486d5b85


NO-JIRA: go: Fix issues found by "go vet", minor simplification and bugfixing.

- refactor to fix go vet errors about copying a Mutex.
- hide Link endpoint interface, provide simpler LinkSettings common interface for Sender,
Receiver.
- simplified connection close logic - process all final events.
- fix async sender blocking when no outcome requested.
- simplify receiver flow control logic
- simplify Send() logic


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7486d5b8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7486d5b8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7486d5b8

Branch: refs/heads/master
Commit: 7486d5b853ff26f707efe0ded73fba0d231d3733
Parents: 9f219ce
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Dec 11 13:08:48 2015 -0500
Committer: Alan Conway <aconway@redhat.com>
Committed: Thu Jan 7 17:26:45 2016 -0500

----------------------------------------------------------------------
 .../src/qpid.apache.org/electron/connection.go  |  12 +-
 .../go/src/qpid.apache.org/electron/endpoint.go |   5 +-
 .../go/src/qpid.apache.org/electron/handler.go  |   6 +-
 .../go/src/qpid.apache.org/electron/link.go     |  90 ++++++------
 .../qpid.apache.org/electron/messaging_test.go  | 114 +++++++++------
 .../go/src/qpid.apache.org/electron/receiver.go | 139 ++++++++-----------
 .../go/src/qpid.apache.org/electron/sender.go   |  61 ++++----
 .../go/src/qpid.apache.org/electron/session.go  |   6 +-
 .../go/src/qpid.apache.org/proton/engine.go     |  74 +++++-----
 .../go/src/qpid.apache.org/proton/wrappers.go   |   2 +-
 10 files changed, 264 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 386875d..1f8bd40 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -120,7 +120,7 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption)
 	for _, set := range setting {
 		set(c)
 	}
-	c.endpoint = makeEndpoint(c.engine.String())
+	c.endpoint.init(c.engine.String())
 	c.eConnection = c.engine.Connection()
 	go c.run()
 	return c, nil
@@ -134,9 +134,15 @@ func (c *connection) run() {
 	c.closed(Closed)
 }
 
-func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
+func (c *connection) Close(err error) {
+	c.err.Set(err)
+	c.engine.Close(err)
+}
 
-func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
+func (c *connection) Disconnect(err error) {
+	c.err.Set(err)
+	c.engine.Disconnect(err)
+}
 
 func (c *connection) Session(setting ...SessionOption) (Session, error) {
 	var s Session

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index 2b1f62d..fc701c6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -52,6 +52,9 @@ type Endpoint interface {
 	// Done returns a channel that will close when the endpoint closes.
 	// Error() will contain the reason.
 	Done() <-chan struct{}
+
+	// Called in handler goroutine when endpoint is remotely closed.
+	closed(err error) error
 }
 
 // DEVELOPER NOTES
@@ -64,7 +67,7 @@ type endpoint struct {
 	done chan struct{}
 }
 
-func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})}
}
+func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
 
 // Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
 // proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 0237156..eb53df3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -29,7 +29,7 @@ import (
 type handler struct {
 	delegator    *proton.MessagingAdapter
 	connection   *connection
-	links        map[proton.Link]Link
+	links        map[proton.Link]Endpoint
 	sentMessages map[proton.Delivery]sentMessage
 	sessions     map[proton.Session]*session
 }
@@ -37,7 +37,7 @@ type handler struct {
 func newHandler(c *connection) *handler {
 	h := &handler{
 		connection:   c,
-		links:        make(map[proton.Link]Link),
+		links:        make(map[proton.Link]Endpoint),
 		sentMessages: make(map[proton.Delivery]sentMessage),
 		sessions:     make(map[proton.Session]*session),
 	}
@@ -152,7 +152,7 @@ func (h *handler) incoming(in Incoming) {
 	}
 }
 
-func (h *handler) addLink(pl proton.Link, el Link) {
+func (h *handler) addLink(pl proton.Link, el Endpoint) {
 	h.links[pl] = el
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 91efa8e..80b4d5c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -24,11 +24,8 @@ import (
 	"qpid.apache.org/proton"
 )
 
-// Link is the common interface for AMQP links. Sender and Receiver provide
-// more methods for the sending or receiving end of a link respectively.
-type Link interface {
-	Endpoint
-
+// Settings associated with a link
+type LinkSettings interface {
 	// Source address that messages are coming from.
 	Source() string
 
@@ -53,46 +50,41 @@ type Link interface {
 
 	// Session containing the Link
 	Session() Session
-
-	// Called in event loop on closed event.
-	closed(err error)
-	// Called to open a link (local or accepted incoming link)
-	open()
 }
 
 // LinkOption can be passed when creating a sender or receiver link to set optional configuration.
-type LinkOption func(*link)
+type LinkOption func(*linkSettings)
 
 // Source returns a LinkOption that sets address that messages are coming from.
-func Source(s string) LinkOption { return func(l *link) { l.source = s } }
+func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } }
 
 // Target returns a LinkOption that sets address that messages are going to.
-func Target(s string) LinkOption { return func(l *link) { l.target = s } }
+func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
 
 // LinkName returns a LinkOption that sets the link name.
-func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
+func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
 
 // SndSettle returns a LinkOption that sets the send settle mode
-func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } }
+func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle =
m } }
 
 // RcvSettle returns a LinkOption that sets the send settle mode
-func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } }
+func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle =
m } }
 
 // SndSettleMode returns a LinkOption that defines when the sending end of the
 // link settles message delivery.
 type SndSettleMode proton.SndSettleMode
 
 // Capacity returns a LinkOption that sets the link capacity
-func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
+func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
 
 // Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a
sender.
-func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
+func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
 
 // AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
 // are sent but no acknowledgment is received, messages can be lost if there is
 // a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 func AtMostOnce() LinkOption {
-	return func(l *link) {
+	return func(l *linkSettings) {
 		SndSettle(SndSettled)(l)
 		RcvSettle(RcvFirst)(l)
 	}
@@ -104,7 +96,7 @@ func AtMostOnce() LinkOption {
 // chance that the message will be received twice in this case.  Sets
 // SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 func AtLeastOnce() LinkOption {
-	return func(l *link) {
+	return func(l *linkSettings) {
 		SndSettle(SndUnsettled)(l)
 		RcvSettle(RcvFirst)(l)
 	}
@@ -129,10 +121,7 @@ const (
 	RcvSecond = RcvSettleMode(proton.RcvSecond)
 )
 
-type link struct {
-	endpoint
-
-	// Link settings.
+type linkSettings struct {
 	source    string
 	target    string
 	linkName  string
@@ -141,31 +130,35 @@ type link struct {
 	rcvSettle RcvSettleMode
 	capacity  int
 	prefetch  bool
+	session   *session
+	eLink     proton.Link
+}
 
-	session *session
-	eLink   proton.Link
+type link struct {
+	endpoint
+	linkSettings
 }
 
-func (l *link) Source() string           { return l.source }
-func (l *link) Target() string           { return l.target }
-func (l *link) LinkName() string         { return l.linkName }
-func (l *link) IsSender() bool           { return l.isSender }
-func (l *link) IsReceiver() bool         { return !l.isSender }
-func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
-func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
-func (l *link) Session() Session         { return l.session }
-func (l *link) Connection() Connection   { return l.session.Connection() }
+func (l *linkSettings) Source() string           { return l.source }
+func (l *linkSettings) Target() string           { return l.target }
+func (l *linkSettings) LinkName() string         { return l.linkName }
+func (l *linkSettings) IsSender() bool           { return l.isSender }
+func (l *linkSettings) IsReceiver() bool         { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
 
+func (l *link) Session() Session       { return l.session }
+func (l *link) Connection() Connection { return l.session.Connection() }
 func (l *link) engine() *proton.Engine { return l.session.connection.engine }
 func (l *link) handler() *handler      { return l.session.connection.handler }
 
-// Set up link fields and open the proton.Link
-func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) {
-	l := link{
-		session:  sn,
+// Open a link and return the linkSettings.
+func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error)
{
+	l := linkSettings{
 		isSender: isSender,
 		capacity: 1,
 		prefetch: false,
+		session:  sn,
 	}
 	for _, set := range setting {
 		set(&l)
@@ -179,31 +172,29 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link,
error)
 		l.eLink = l.session.eSession.Receiver(l.linkName)
 	}
 	if l.eLink.IsNil() {
-		l.err.Set(fmt.Errorf("cannot create link %s", l))
-		return l, l.err.Get()
+		return l, fmt.Errorf("cannot create link %s", l.eLink)
 	}
 	l.eLink.Source().SetAddress(l.source)
 	l.eLink.Target().SetAddress(l.target)
 	l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
 	l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
 	l.eLink.Open()
-	l.endpoint = makeEndpoint(l.eLink.String())
 	return l, nil
 }
 
 type incomingLink struct {
 	incoming
-	link
+	linkSettings
+	eLink proton.Link
+	sn    *session
 }
 
 // Set up a link from an incoming proton.Link.
 func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
 	l := incomingLink{
 		incoming: makeIncoming(eLink),
-		link: link{
-			session:   sn,
+		linkSettings: linkSettings{
 			isSender:  eLink.IsSender(),
-			eLink:     eLink,
 			source:    eLink.RemoteSource().Address(),
 			target:    eLink.RemoteTarget().Address(),
 			linkName:  eLink.Name(),
@@ -211,7 +202,8 @@ func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
 			rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
 			capacity:  1,
 			prefetch:  false,
-			endpoint:  makeEndpoint(eLink.String()),
+			eLink:     eLink,
+			session:   sn,
 		},
 	}
 	return l
@@ -239,7 +231,3 @@ func (l *link) Close(err error) {
 		}
 	})
 }
-
-func (l *link) open() {
-	l.eLink.Open()
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 5af57e8..0de7d16 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -270,24 +270,26 @@ func TestTimeouts(t *testing.T) {
 	}
 }
 
-// clientServer that returns sender/receiver pairs at opposite ends of link.
+// A server that returns the opposite end of each client link via channels.
 type pairs struct {
-	t      *testing.T
-	client Session
-	server Connection
-	rchan  chan Receiver
-	schan  chan Sender
+	t        *testing.T
+	client   Session
+	server   Connection
+	rchan    chan Receiver
+	schan    chan Sender
+	capacity int
+	prefetch bool
 }
 
-func newPairs(t *testing.T) *pairs {
+func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
 	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
 	p.client, p.server = newClientServer(t)
 	go func() {
 		for i := range p.server.Incoming() {
 			switch i := i.(type) {
 			case *IncomingReceiver:
-				i.SetCapacity(1)
-				i.SetPrefetch(false)
+				i.SetCapacity(capacity)
+				i.SetPrefetch(prefetch)
 				p.rchan <- i.Accept().(Receiver)
 			case *IncomingSender:
 				p.schan <- i.Accept().(Sender)
@@ -303,6 +305,7 @@ func (p *pairs) close() {
 	closeClientServer(p.client, p.server)
 }
 
+// Return a client sender and server receiver
 func (p *pairs) senderReceiver() (Sender, Receiver) {
 	snd, err := p.client.Sender()
 	fatalIf(p.t, err)
@@ -310,6 +313,7 @@ func (p *pairs) senderReceiver() (Sender, Receiver) {
 	return snd, rcv
 }
 
+// Return a client receiver and server sender
 func (p *pairs) receiverSender() (Receiver, Sender) {
 	rcv, err := p.client.Receiver()
 	fatalIf(p.t, err)
@@ -322,7 +326,7 @@ type result struct {
 	err   error
 }
 
-func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
+func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
 
 func doSend(snd Sender, results chan result) {
 	err := snd.SendSync(amqp.NewMessage()).Error
@@ -338,30 +342,48 @@ func doDisposition(ack <-chan Outcome, results chan result) {
 	results <- result{"disposition", (<-ack).Error}
 }
 
+// Senders get credit immediately if receivers have prefetch set
+func TestSendReceivePrefetch(t *testing.T) {
+	pairs := newPairs(t, 1, true)
+	s, r := pairs.senderReceiver()
+	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
+	if _, err := r.Receive(); err != nil {
+		t.Error(err)
+	}
+}
+
+// Senders do not get credit till Receive() if receivers don't have prefetch
+func TestSendReceiveNoPrefetch(t *testing.T) {
+	pairs := newPairs(t, 1, false)
+	s, r := pairs.senderReceiver()
+	done := make(chan struct{}, 1)
+	go func() {
+		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
+		close(done)
+	}()
+	select {
+	case <-done:
+		t.Errorf("send should be blocked on credit")
+	default:
+		if _, err := r.Receive(); err != nil {
+			t.Error(err)
+		} else {
+			<-done
+		} // Should be unblocked now
+	}
+}
+
 // Test that closing Links interrupts blocked link functions.
 func TestLinkCloseInterrupt(t *testing.T) {
-	want := amqp.Errorf("x", "all bad")
-	pairs := newPairs(t)
+	want := amqp.Error{Name: "x", Description: "all bad"}
+	pairs := newPairs(t, 1, false)
 	results := make(chan result) // Collect expected errors
 
-	// Sender.Close() interrupts Send()
-	snd, rcv := pairs.senderReceiver()
-	go doSend(snd, results)
-	snd.Close(want)
-	if r := <-results; want != r.err {
-		t.Errorf("want %#v got %#v", want, r)
-	}
-
-	// Remote Receiver.Close() interrupts Send()
-	snd, rcv = pairs.senderReceiver()
-	go doSend(snd, results)
-	rcv.Close(want)
-	if r := <-results; want != r.err {
-		t.Errorf("want %#v got %#v", want, r)
-	}
+	// Note closing the link does not interrupt Send() calls, the AMQP spec says
+	// that deliveries can be settled after the link is closed.
 
 	// Receiver.Close() interrupts Receive()
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv := pairs.senderReceiver()
 	go doReceive(rcv, results)
 	rcv.Close(want)
 	if r := <-results; want != r.err {
@@ -379,44 +401,50 @@ func TestLinkCloseInterrupt(t *testing.T) {
 
 // Test closing the server end of a connection.
 func TestConnectionCloseInterrupt1(t *testing.T) {
-	want := amqp.Errorf("x", "bad")
-	pairs := newPairs(t)
+	want := amqp.Error{Name: "x", Description: "bad"}
+	pairs := newPairs(t, 1, true)
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
 	snd, rcv := pairs.senderReceiver()
-	go doReceive(rcv, results)
-	ack := snd.SendWaitable(amqp.NewMessage())
-	go doDisposition(ack, results)
-	snd, rcv = pairs.senderReceiver()
 	go doSend(snd, results)
+
+	rcv.Receive()
 	rcv, snd = pairs.receiverSender()
 	go doReceive(rcv, results)
+
+	snd, rcv = pairs.senderReceiver()
+	ack := snd.SendWaitable(amqp.NewMessage())
+	rcv.Receive()
+	go doDisposition(ack, results)
+
 	pairs.server.Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
-			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF.
-			t.Logf("want %v got %v", want, r.err)
+			t.Logf("want %v got %v", want, r)
 		}
 	}
 }
 
 // Test closing the client end of the connection.
 func TestConnectionCloseInterrupt2(t *testing.T) {
-	want := amqp.Errorf("x", "bad")
-	pairs := newPairs(t)
+	want := amqp.Error{Name: "x", Description: "bad"}
+	pairs := newPairs(t, 1, true)
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
 	snd, rcv := pairs.senderReceiver()
-	go doReceive(rcv, results)
-	ack := snd.SendWaitable(amqp.NewMessage())
-	go doDisposition(ack, results)
-	snd, rcv = pairs.senderReceiver()
 	go doSend(snd, results)
+	rcv.Receive()
+
 	rcv, snd = pairs.receiverSender()
 	go doReceive(rcv, results)
-	pairs.client.Close(want)
+
+	snd, rcv = pairs.senderReceiver()
+	ack := snd.SendWaitable(amqp.NewMessage())
+	go doDisposition(ack, results)
+
+	pairs.client.Connection().Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
 			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 22bdc7e..f2b7a52 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -29,7 +29,8 @@ import (
 // Receiver is a Link that receives messages.
 //
 type Receiver interface {
-	Link
+	Endpoint
+	LinkSettings
 
 	// Receive blocks until a message is available or until the Receiver is closed
 	// and has no more buffered messages.
@@ -64,108 +65,88 @@ type Receiver interface {
 	Capacity() int
 }
 
-// Flow control policy for a receiver.
-type policy interface {
-	// Called at the start of Receive() to adjust credit before fetching a message.
-	Pre(*receiver)
-	// Called after Receive() has received a message from Buffer() before it returns.
-	// Non-nil error means no message was received because of an error.
-	Post(*receiver, error)
-}
-
-type prefetchPolicy struct{}
-
-func (p prefetchPolicy) Flow(r *receiver) {
-	r.engine().Inject(func() {
-		if r.Error() != nil {
-			return
-		}
-		_, _, max := r.credit()
-		if max > 0 {
-			r.eLink.Flow(max)
-		}
-	})
-}
-func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
-func (p prefetchPolicy) Post(r *receiver, err error) {
-	if err == nil {
-		p.Flow(r)
-	}
-}
-
-type noPrefetchPolicy struct{ waiting int }
-
-func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
-	r.engine().Inject(func() {
-		if r.Error() != nil {
-			return
-		}
-		len, credit, max := r.credit()
-		add := p.waiting - (len + credit)
-		if add > max {
-			add = max // Don't overflow
-		}
-		if add > 0 {
-			r.eLink.Flow(add)
-		}
-	})
-}
-func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
-func (p noPrefetchPolicy) Post(r *receiver, err error) {
-	p.waiting--
-	if err == nil {
-		p.Flow(r)
-	}
-}
-
 // Receiver implementation
 type receiver struct {
 	link
-	buffer chan ReceivedMessage
-	policy policy
+	buffer  chan ReceivedMessage
+	callers int
 }
 
+func (r *receiver) Capacity() int  { return cap(r.buffer) }
+func (r *receiver) Prefetch() bool { return r.prefetch }
+
 // Call in proton goroutine
-func newReceiver(l link) *receiver {
-	r := &receiver{link: l}
+func newReceiver(ls linkSettings) *receiver {
+	r := &receiver{link: link{linkSettings: ls}}
+	r.endpoint.init(r.link.eLink.String())
 	if r.capacity < 1 {
 		r.capacity = 1
 	}
-	if r.prefetch {
-		r.policy = &prefetchPolicy{}
-	} else {
-		r.policy = &noPrefetchPolicy{}
-	}
 	r.buffer = make(chan ReceivedMessage, r.capacity)
 	r.handler().addLink(r.eLink, r)
-	r.link.open()
+	r.link.eLink.Open()
+	if r.prefetch {
+		r.flow(r.maxFlow())
+	}
 	return r
 }
 
-// call in proton goroutine.
-func (r *receiver) credit() (buffered, credit, max int) {
-	return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
+// Call in proton gorotine. Max additional credit we can request.
+func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.eLink.Credit()
}
+
+func (r *receiver) flow(credit int) {
+	if credit > 0 {
+		r.eLink.Flow(credit)
+	}
 }
 
-func (r *receiver) Capacity() int  { return cap(r.buffer) }
-func (r *receiver) Prefetch() bool { return r.prefetch }
+// Inject flow check per-caller call when prefetch is off.
+// Called with inc=1 at start of call, inc = -1 at end
+func (r *receiver) caller(inc int) {
+	r.engine().Inject(func() {
+		r.callers += inc
+		need := r.callers - (len(r.buffer) + r.eLink.Credit())
+		max := r.maxFlow()
+		if need > max {
+			need = max
+		}
+		r.flow(need)
+	})
+}
 
+// Inject flow top-up if prefetch is enabled
+func (r *receiver) flowTopUp() {
+	if r.prefetch {
+		r.engine().Inject(func() { r.flow(r.maxFlow()) })
+	}
+}
+
+// Not claled
 func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 	return r.ReceiveTimeout(Forever)
 }
 
-func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error)
{
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) {
 	assert(r.buffer != nil, "Receiver is not open: %s", r)
-	r.policy.Pre(r)
-	defer func() { r.policy.Post(r, err) }()
+	select { // Check for immediate availability
+	case rm := <-r.buffer:
+		r.flowTopUp()
+		return rm, nil
+	default:
+	}
+	if !r.prefetch { // Per-caller flow control
+		r.caller(+1)
+		defer r.caller(-1)
+	}
 	rmi, err := timedReceive(r.buffer, timeout)
 	switch err {
-	case Timeout:
-		return ReceivedMessage{}, Timeout
+	case nil:
+		r.flowTopUp()
+		return rmi.(ReceivedMessage), err
 	case Closed:
 		return ReceivedMessage{}, r.Error()
 	default:
-		return rmi.(ReceivedMessage), nil
+		return ReceivedMessage{}, err
 	}
 }
 
@@ -192,11 +173,11 @@ func (r *receiver) message(delivery proton.Delivery) {
 	}
 }
 
-func (r *receiver) closed(err error) {
-	r.link.closed(err)
+func (r *receiver) closed(err error) error {
 	if r.buffer != nil {
 		close(r.buffer)
 	}
+	return r.link.closed(err)
 }
 
 // ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
@@ -240,5 +221,5 @@ func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch =
prefetch
 
 // Accept accepts an incoming receiver endpoint
 func (in *IncomingReceiver) Accept() Endpoint {
-	return in.accept(func() Endpoint { return newReceiver(in.link) })
+	return in.accept(func() Endpoint { return newReceiver(in.linkSettings) })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 834eb75..2f0e965 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -38,7 +38,8 @@ import (
 // Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
 //
 type Sender interface {
-	Link
+	Endpoint
+	LinkSettings
 
 	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
 	// Returns an Outcome, which may contain an error if the message could not be sent.
@@ -83,6 +84,12 @@ type Outcome struct {
 	Value interface{}
 }
 
+func (o Outcome) send(ack chan<- Outcome) {
+	if ack != nil {
+		ack <- o
+	}
+}
+
 // SentStatus indicates the status of a sent message.
 type SentStatus int
 
@@ -144,44 +151,37 @@ type sender struct {
 func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t
time.Duration) {
 	// wait for credit
 	if _, err := timedReceive(s.credit, t); err != nil {
-		if err == Closed && s.Error != nil {
+		if err == Closed && s.Error() != nil {
 			err = s.Error()
 		}
-		ack <- Outcome{Unsent, err, v}
+		Outcome{Unsent, err, v}.send(ack)
 		return
 	}
 	// Send a message in handler goroutine
 	err := s.engine().Inject(func() {
 		if s.Error() != nil {
-			if ack != nil {
-				ack <- Outcome{Unsent, s.Error(), v}
-			}
+			Outcome{Unsent, s.Error(), v}.send(ack)
 			return
 		}
-		if delivery, err := s.eLink.Send(m); err == nil {
-			if ack != nil { // We must report an outcome
-				if s.SndSettle() == SndSettled {
-					delivery.Settle() // Pre-settle if required
-					ack <- Outcome{Accepted, nil, v}
-				} else {
-					s.handler().sentMessages[delivery] = sentMessage{ack, v}
-				}
-			} else { // ack == nil, can't report outcome
-				if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to.
-					delivery.Settle()
-				}
-			}
-		} else { // err != nil
-			if ack != nil {
-				ack <- Outcome{Unsent, err, v}
+
+		delivery, err2 := s.eLink.Send(m)
+		switch {
+		case err2 != nil:
+			Outcome{Unsent, err2, v}.send(ack)
+		case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
+			if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy
+				delivery.Settle()
 			}
+			Outcome{Accepted, nil, v}.send(ack) // Assume accepted
+		default:
+			s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler
 		}
 		if s.eLink.Credit() > 0 { // Signal there is still credit
 			s.sendable()
 		}
 	})
-	if err != nil && ack != nil {
-		ack <- Outcome{Unsent, err, v}
+	if err != nil {
+		Outcome{Unsent, err, v}.send(ack)
 	}
 }
 
@@ -237,15 +237,16 @@ func (s *sender) SendSync(m amqp.Message) Outcome {
 }
 
 // handler goroutine
-func (s *sender) closed(err error) {
-	s.link.closed(err)
+func (s *sender) closed(err error) error {
 	close(s.credit)
+	return s.link.closed(err)
 }
 
-func newSender(l link) *sender {
-	s := &sender{link: l, credit: make(chan struct{}, 1)}
+func newSender(ls linkSettings) *sender {
+	s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)}
+	s.endpoint.init(s.link.eLink.String())
 	s.handler().addLink(s.eLink, s)
-	s.link.open()
+	s.link.eLink.Open()
 	return s
 }
 
@@ -263,7 +264,7 @@ type IncomingSender struct {
 
 // Accept accepts an incoming sender endpoint
 func (in *IncomingSender) Accept() Endpoint {
-	return in.accept(func() Endpoint { return newSender(in.link) })
+	return in.accept(func() Endpoint { return newSender(in.linkSettings) })
 }
 
 // Call in injected functions to check if the sender is valid.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 18d8bc8..1bbc52c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -53,8 +53,8 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption)
*ses
 	s := &session{
 		connection: c,
 		eSession:   es,
-		endpoint:   makeEndpoint(es.String()),
 	}
+	s.endpoint.init(es.String())
 	for _, set := range setting {
 		set(s)
 	}
@@ -81,7 +81,7 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, err error)
{
 		if s.Error() != nil {
 			return s.Error()
 		}
-		l, err := localLink(s, true, setting...)
+		l, err := makeLocalLink(s, true, setting...)
 		if err == nil {
 			snd = newSender(l)
 		}
@@ -95,7 +95,7 @@ func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error)
{
 		if s.Error() != nil {
 			return s.Error()
 		}
-		l, err := localLink(s, false, setting...)
+		l, err := makeLocalLink(s, false, setting...)
 		if err == nil {
 			rcv = newReceiver(l)
 		}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 13d44b8..eecda7a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -33,9 +33,9 @@ import "C"
 
 import (
 	"fmt"
-	"io"
 	"net"
 	"sync"
+	"time"
 	"unsafe"
 )
 
@@ -172,7 +172,7 @@ func (eng *Engine) String() string {
 }
 
 func (eng *Engine) Id() string {
-	return fmt.Sprintf("%eng", &eng)
+	return fmt.Sprintf("%p", eng)
 }
 
 func (eng *Engine) Error() error {
@@ -223,19 +223,35 @@ func (eng *Engine) InjectWait(f func() error) error {
 //
 func (eng *Engine) Server() { eng.transport.SetServer() }
 
-// Close the engine's connection, returns when the engine has exited.
+func (eng *Engine) disconnect() {
+	eng.transport.CloseHead()
+	eng.transport.CloseTail()
+	eng.conn.Close()
+	eng.dispatch()
+}
+
+// Close the engine's connection.
+// If err != nil pass it to the remote end as the close condition.
+// Returns when the remote end closes or disconnects.
 func (eng *Engine) Close(err error) {
-	eng.err.Set(err)
-	eng.Inject(func() {
-		CloseError(eng.connection, err)
-	})
+	eng.Inject(func() { CloseError(eng.connection, err) })
 	<-eng.running
 }
 
-// Disconnect the engine's connection without and AMQP close, returns when the engine has
exited.
+// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
+func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
+	eng.Inject(func() { CloseError(eng.connection, err) })
+	select {
+	case <-eng.running:
+	case <-time.After(timeout):
+		eng.Disconnect(err)
+	}
+}
+
+// Disconnect the engine's connection immediately without an AMQP close.
+// Process any termination events before returning.
 func (eng *Engine) Disconnect(err error) {
-	eng.err.Set(err)
-	eng.conn.Close()
+	eng.Inject(func() { eng.transport.Condition().SetError(err); eng.disconnect() })
 	<-eng.running
 }
 
@@ -283,7 +299,7 @@ func (eng *Engine) Run() error {
 
 	wbuf := eng.write.buffer()[:0]
 
-	for eng.err.Get() == nil {
+	for !eng.transport.Closed() {
 		if len(wbuf) == 0 {
 			eng.pop(&wbuf)
 		}
@@ -305,12 +321,19 @@ func (eng *Engine) Run() error {
 				f()
 			}
 		case err := <-readErr:
-			eng.netError(err)
+			eng.transport.Condition().SetError(err)
+			eng.transport.CloseTail()
 		case err := <-writeErr:
-			eng.netError(err)
+			eng.transport.Condition().SetError(err)
+			eng.transport.CloseHead()
+		}
+		eng.dispatch()
+		if eng.connection.State().RemoteClosed() && eng.connection.State().LocalClosed()
{
+			eng.disconnect()
 		}
-		eng.process()
 	}
+	eng.err.Set(EndpointError(eng.connection))
+	eng.err.Set(eng.transport.Condition().Error())
 	close(eng.write.buffers)
 	eng.conn.Close() // Make sure connection is closed
 	wait.Wait()
@@ -334,12 +357,6 @@ func (eng *Engine) Run() error {
 	return eng.err.Get()
 }
 
-func (eng *Engine) netError(err error) {
-	eng.err.Set(err)
-	eng.transport.CloseHead()
-	eng.transport.CloseTail()
-}
-
 func minInt(a, b int) int {
 	if a < b {
 		return a
@@ -378,18 +395,13 @@ func (eng *Engine) push(buf []byte) {
 	}
 }
 
-func (eng *Engine) handle(e Event) {
-	for _, h := range eng.handlers {
-		h.HandleEvent(e)
-	}
-	if e.Type() == ETransportClosed {
-		eng.err.Set(io.EOF)
-	}
-}
+func (eng *Engine) peek() *C.pn_event_t { return C.pn_collector_peek(eng.collector) }
 
-func (eng *Engine) process() {
-	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector)
{
-		eng.handle(makeEvent(ce, eng))
+func (eng *Engine) dispatch() {
+	for ce := eng.peek(); ce != nil; ce = eng.peek() {
+		for _, h := range eng.handlers {
+			h.HandleEvent(makeEvent(ce, eng))
+		}
 		C.pn_collector_pop(eng.collector)
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index cd547ed..70611d3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -334,7 +334,7 @@ func (c Condition) Error() error {
 	if c.IsNil() || !c.IsSet() {
 		return nil
 	}
-	return amqp.Error{c.Name(), c.Description()}
+	return amqp.Error{Name: c.Name(), Description: c.Description()}
 }
 
 // Set a Go error into a condition.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message