qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: NO-JIRA: go: Bug fixes and improved examples.
Date Thu, 22 Oct 2015 22:16:18 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 6a306163f -> bc0a242e4


NO-JIRA: go: Bug fixes and improved examples.

package proton:
- Injecter() provided by event rather than connection. Allow different event-loop strategies.
- Add access to proton refcounts, may be useful for some apps.

package electron:
- simplified sender logic using credit flag.
- consistent link, session and connection options.

examples: simplified & improved

proton/broker: concurrent broker using handler per connection.
electron/broker: cleaned up for comparison to proton/broker.

examples/README.md discussion of brokers.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bc0a242e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bc0a242e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bc0a242e

Branch: refs/heads/master
Commit: bc0a242e4cc335f2aa496b98bf06d033904ca23d
Parents: 6a30616
Author: Alan Conway <aconway@redhat.com>
Authored: Tue Oct 13 10:31:01 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Thu Oct 22 18:14:17 2015 -0400

----------------------------------------------------------------------
 examples/go/CMakeLists.txt                      |   9 +-
 examples/go/README.md                           |  31 ++
 examples/go/electron/broker.go                  |  67 ++--
 examples/go/electron/receive.go                 |   5 +-
 examples/go/electron/send.go                    |   2 -
 examples/go/example_test.go                     |  57 ++-
 examples/go/proton/broker.go                    | 389 ++++++++++---------
 examples/go/util/util.go                        |   2 +-
 proton-c/bindings/go/CMakeLists.txt             |   7 +-
 .../src/qpid.apache.org/electron/connection.go  | 151 ++++---
 .../src/qpid.apache.org/electron/container.go   |   6 +-
 .../go/src/qpid.apache.org/electron/handler.go  | 102 +++--
 .../go/src/qpid.apache.org/electron/link.go     |  67 ++--
 .../go/src/qpid.apache.org/electron/receiver.go |  60 +--
 .../go/src/qpid.apache.org/electron/sender.go   | 120 +++---
 .../go/src/qpid.apache.org/electron/session.go  |  49 ++-
 .../go/src/qpid.apache.org/electron/time.go     |  19 +-
 .../go/src/qpid.apache.org/proton/doc.go        |  29 +-
 .../go/src/qpid.apache.org/proton/engine.go     |  71 ++--
 .../go/src/qpid.apache.org/proton/handlers.go   |  18 +-
 .../go/src/qpid.apache.org/proton/wrappers.go   |  98 +++--
 .../qpid/proton/amqp/messaging/Terminus.java    |   2 +-
 22 files changed, 736 insertions(+), 625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index c345523..32be548 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -20,6 +20,7 @@
 if(BUILD_GO)
 
   set(examples electron/broker electron/receive electron/send proton/broker)
+  file(GLOB_RECURSE example_source ${CMAKE_CURRENT_SOURCE_DIR}/*.go)
 
   # Build example exes
   foreach(example ${examples})
@@ -28,7 +29,8 @@ if(BUILD_GO)
     add_custom_command(
       OUTPUT ${target}
       COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${target} ${source}
-      DEPENDS  ${source} ${GO_TARGETS})
+      DEPENDS  ${example_source} ${GO_TARGETS}
+      WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
     list(APPEND example_exes ${target})
   endforeach()
 
@@ -36,8 +38,9 @@ if(BUILD_GO)
   set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/example_test)
   add_custom_command(
     OUTPUT ${test_exe}
-    DEPENDS ${example_exes}
-    COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go)
+    DEPENDS ${example_exes} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
+    COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
+    WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
 
   add_custom_target(go-test-exe ALL DEPENDS ${test_exe})
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index 0114d0e..ce9206b 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -87,3 +87,34 @@ Or use the Go broker and the python clients:
 
     python ../python/simple_send.py
     python ../python/simple_recv.py
+
+
+## A tale of two brokers.
+
+The `proton` and `electron` packages provide two alternate APIs for AMQP applications.
+See [the proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) for a discussion
+of why there are two APIs.
+
+The examples `proton/broker.go` and `electron/broker.go` both implement the same
+simple broker-like functionality using each of the two APIs. They both handle
+multiple connections concurrently and store messages on bounded queues
+implemented by Go channels.
+
+However the `electron/broker` is less than half as long as the `proton/broker`
+illustrating why it is better suited for most Go applications.
+
+`proton/broker` must explicitly handle proton events, which are processed in a
+single goroutine per connection since proton is not concurrent safe. Each
+connection uses channels to exchange messages between the event-handling
+goroutine and the shared queues that are accessible to all connections. Sending
+messages is particularly tricky since we must monitor the queue for available
+messages and the sending link for available credit in order to send messages.
+
+
+`electron/broker` takes advantage of the `electron` package, which hides all the
+event handling and passing of messages between goroutines beind behind
+straightforward interfaces for sending and receiving messages. The electron
+broker can implement links as simple goroutines that loop popping messages from
+a queue and sending them or receiving messages and pushing them to a queue.
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index 1e4a931..f1dce17 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -52,21 +52,20 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
 func main() {
 	flag.Usage = usage
 	flag.Parse()
-	if err := newBroker().run(); err != nil {
+	b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+	if err := b.run(); err != nil {
 		log.Fatal(err)
 	}
 }
 
+// State for the broker
 type broker struct {
 	queues    util.Queues
 	container electron.Container
 }
 
-func newBroker() *broker {
-	return &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
-}
-
-func (b *broker) run() (err error) {
+// Listens for connections and starts an electron.Connection for each one.
+func (b *broker) run() error {
 	listener, err := net.Listen("tcp", *addr)
 	if err != nil {
 		return err
@@ -76,46 +75,29 @@ func (b *broker) run() (err error) {
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
-			util.Debugf("Accept error: %s", err)
+			util.Debugf("Accept error: %v", err)
 			continue
 		}
-		if err := b.connection(conn); err != nil {
-			if err != nil {
-				util.Debugf("Connection error: %s", err)
-				continue
-			}
+		c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept))
+		if err != nil {
+			util.Debugf("Connection error: %v", err)
+			continue
 		}
+		util.Debugf("Accepted %v", c)
 	}
 }
 
-// connection creates a new AMQP connection for a net.Conn.
-func (b *broker) connection(conn net.Conn) error {
-	c, err := b.container.Connection(conn)
-	if err != nil {
-		return err
-	}
-	c.Server()         // Enable server-side protocol negotiation.
-	c.Listen(b.accept) // Call accept() for remotely-opened endpoints.
-	if err := c.Open(); err != nil {
-		return err
-	}
-	util.Debugf("Accepted %s", c)
-	return nil
-}
-
 // accept remotely-opened endpoints (Session, Sender and Receiver)
 // and start goroutines to service them.
-func (b *broker) accept(ep electron.Endpoint) error {
-	switch ep := ep.(type) {
-	case electron.Sender:
-		util.Debugf("%s opened", ep)
-		go b.sender(ep)
-	case electron.Receiver:
-		util.Debugf("%s opened", ep)
-		ep.SetCapacity(100, true) // Pre-fetch 100 messages
-		go b.receiver(ep)
+func (b *broker) accept(i electron.Incoming) {
+	switch i := i.(type) {
+	case *electron.IncomingSender:
+		go b.sender(i.AcceptSender())
+	case *electron.IncomingReceiver:
+		go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages
+	default:
+		i.Accept()
 	}
-	return nil
 }
 
 // sender pops messages from a queue and sends them.
@@ -127,17 +109,16 @@ func (b *broker) sender(sender electron.Sender) {
 			return
 		}
 		if err := sender.SendForget(m); err == nil {
-			util.Debugf("send %s: %s", sender, util.FormatMessage(m))
+			util.Debugf("%s send: %s", sender, util.FormatMessage(m))
 		} else {
-			util.Debugf("send error %s: %s", sender, err)
+			util.Debugf("%s error: %s", sender, err)
 			q <- m // Put it back on the queue.
-			break
+			return
 		}
 	}
 }
 
-// receiver receives messages and pushes to the queue named by the receivers's
-// Target address
+// receiver receives messages and pushes to a queue.
 func (b *broker) receiver(receiver electron.Receiver) {
 	q := b.queues.Get(receiver.Target())
 	for {
@@ -146,7 +127,7 @@ func (b *broker) receiver(receiver electron.Receiver) {
 			q <- rm.Message
 			rm.Accept()
 		} else {
-			util.Debugf("%s: error %s", receiver, err)
+			util.Debugf("%s error: %s", receiver, err)
 			break
 		}
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index e450a75..f7d41fa 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -77,8 +77,6 @@ func main() {
 			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
 			util.ExitIf(err)
 			c, err := container.Connection(conn)
-			util.ExitIf(err)
-			util.ExitIf(c.Open())
 			connections <- c // Save connection so we can Close() when main() ends
 
 			// Create a Receiver using the path of the URL as the source address
@@ -106,9 +104,8 @@ func main() {
 
 	// print each message until the count is exceeded.
 	for i := uint64(0); i < *count; i++ {
-		util.Debugf("pre (%d/%d)\n", i, *count)
 		m := <-messages
-		util.Debugf("%s (%d/%d)\n", util.FormatMessage(m), i, *count)
+		util.Debugf("%s\n", util.FormatMessage(m))
 	}
 	fmt.Printf("Received %d messages\n", *count)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 6b7aec1..c9bdbc9 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -80,8 +80,6 @@ func main() {
 			util.ExitIf(err)
 			c, err := container.Connection(conn)
 			util.ExitIf(err)
-			err = c.Open()
-			util.ExitIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
 
 			// Create a Sender using the path of the URL as the AMQP address

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index 1e497b9..006e17c 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -28,6 +28,7 @@ import (
 	"flag"
 	"fmt"
 	"io"
+	"log"
 	"math/rand"
 	"net"
 	"os"
@@ -35,7 +36,6 @@ import (
 	"path"
 	"path/filepath"
 	"reflect"
-	"strings"
 	"testing"
 	"time"
 )
@@ -108,27 +108,8 @@ func checkEqual(want interface{}, got interface{}) error {
 	return fmt.Errorf("%#v != %#v", want, got)
 }
 
-// 'go build' uses the installed copy of the proton Go libraries, which may be out of date.
-func checkStaleLibs(t *testing.T) {
-	var stale []string
-	pp := "qpid.apache.org"
-	for _, p := range []string{pp + "/proton", pp + "/amqp", pp + "/electron"} {
-		out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput()
-		if err != nil {
-			t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out))
-		}
-		if string(out) != "false\n" {
-			stale = append(stale, p)
-		}
-	}
-	if len(stale) > 0 {
-		t.Fatalf("Stale libraries, run 'go install %s'", strings.Trim(fmt.Sprint(stale), "[]"))
-	}
-}
-
 // exampleCommand returns an exec.Cmd to run an example.
 func exampleCommand(t *testing.T, prog string, arg ...string) (cmd *exec.Cmd) {
-	checkStaleLibs(t)
 	args := []string{}
 	if *debug {
 		args = append(args, "-debug=true")
@@ -230,6 +211,7 @@ func goReceiveWant(t *testing.T, errchan chan<- error, want string, arg ...strin
 		errchan <- ready
 		buf := bytes.Buffer{}
 		io.Copy(&buf, out) // Collect the rest of the output
+		cmd.Wait()
 		errchan <- checkEqual(want, buf.String())
 		close(errchan)
 	}()
@@ -242,26 +224,30 @@ func TestExampleReceiveSend(t *testing.T) {
 		t.Skip("Skip demo tests in short mode")
 	}
 	testBroker.start(t)
-	recvErr := make(chan error)
-	recvCmd := goReceiveWant(
-		t, recvErr,
-		fmt.Sprintf("Received %d messages\n", expected),
-		exampleArgs(fmt.Sprintf("-count=%d", expected))...)
-	defer func() {
-		recvCmd.Process.Kill()
-		recvCmd.Wait()
-	}()
-	if err := <-recvErr; err != ready { // Wait for receiver ready
+
+	// Start receiver, wait for "listening" message on stdout
+	recvCmd := exampleCommand(t, "receive", exampleArgs(fmt.Sprintf("-count=%d", expected))...)
+	pipe, err := recvCmd.StdoutPipe()
+	if err != nil {
 		t.Fatal(err)
 	}
-	err := runExampleWant(t,
+	recvCmd.Start()
+	out := bufio.NewReader(pipe)
+	line, err := out.ReadString('\n')
+	if err := checkEqual("Listening on 3 connections\n", line); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := runExampleWant(t,
 		fmt.Sprintf("Received all %d acknowledgements\n", expected),
 		"send",
-		exampleArgs("-count", fmt.Sprintf("%d", *count))...)
-	if err != nil {
+		exampleArgs("-count", fmt.Sprintf("%d", *count))...); err != nil {
 		t.Fatal(err)
 	}
-	if err := <-recvErr; err != nil {
+
+	buf := bytes.Buffer{}
+	io.Copy(&buf, out)
+	if err := checkEqual(fmt.Sprintf("Received %d messages\n", expected), buf.String()); err != nil {
 		t.Fatal(err)
 	}
 }
@@ -276,6 +262,9 @@ var dir = flag.String("dir", "", "Directory containing example sources or binari
 var expected int
 
 func TestMain(m *testing.M) {
+	if out, err := exec.Command("go", "install", "qpid.apache.org/...").CombinedOutput(); err != nil {
+		log.Fatalf("go install failed: %s\n%s", err, out)
+	}
 	expected = (*count) * (*connections)
 	rand.Seed(time.Now().UTC().UnixNano())
 	testBroker = &broker{} // Broker is started on-demand by tests.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 75f14f5..3eb5880 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -30,7 +30,7 @@ import (
 	"./util"
 	"flag"
 	"fmt"
-	"io"
+	"log"
 	"net"
 	"os"
 	"qpid.apache.org/amqp"
@@ -53,247 +53,276 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
 func main() {
 	flag.Usage = usage
 	flag.Parse()
+	b := &broker{util.MakeQueues(*qsize)}
+	if err := b.run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+// State for the broker
+type broker struct {
+	queues util.Queues
+}
 
-	b := newBroker()
+// Listens for connections and starts a proton.Engine for each one.
+func (b *broker) run() error {
 	listener, err := net.Listen("tcp", *addr)
-	util.ExitIf(err)
+	if err != nil {
+		return err
+	}
 	defer listener.Close()
 	fmt.Printf("Listening on %s\n", listener.Addr())
-
-	// Loop accepting new connections.
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
-			util.Debugf("Accept error: %s", err)
+			util.Debugf("Accept error: %v", err)
 			continue
 		}
-		if err := b.connection(conn); err != nil {
-			if err != nil {
-				util.Debugf("Connection error: %s", err)
-				continue
-			}
+		adapter := proton.NewMessagingAdapter(newHandler(&b.queues))
+		// We want to accept messages when they are enqueued, not just when they
+		// are received, so we turn off auto-accept and prefetch by the adapter.
+		adapter.Prefetch = 0
+		adapter.AutoAccept = false
+		engine, err := proton.NewEngine(conn, adapter)
+		if err != nil {
+			util.Debugf("Connection error: %v", err)
+			continue
 		}
+		engine.Server() // Enable server-side protocol negotiation.
+		util.Debugf("Accepted connection %s", engine)
+		go func() { // Start goroutine to run the engine event loop
+			engine.Run()
+			util.Debugf("Closed %s", engine)
+		}()
 	}
 }
 
-type broker struct {
-	queues util.Queues
-}
-
-func newBroker() *broker {
-	return &broker{util.MakeQueues(*qsize)}
-}
-
-// connection creates a new AMQP connection for a net.Conn.
-func (b *broker) connection(conn net.Conn) error {
-	delegator := proton.NewMessagingDelegator(newHandler(&b.queues, *credit))
-	// We want to accept messages when they are enqueued, not just when they
-	// are received, so we turn off auto-accept and prefetch by the handler.
-	delegator.Prefetch = 0
-	delegator.AutoAccept = false
-	engine, err := proton.NewEngine(conn, delegator)
-	if err != nil {
-		return err
-	}
-	engine.Server() // Enable server-side protocol negotiation.
-	go func() {     // Start goroutine to run the engine event loop
-		engine.Run()
-		util.Debugf("Closed %s", engine)
-	}()
-	util.Debugf("Accepted %s", engine)
-	return nil
-}
-
-// receiver is a channel to buffer messages waiting to go on the queue.
-type receiver chan receivedMessage
-
-// receivedMessage is a message and the corresponding delivery for acknowledgement.
-type receivedMessage struct {
-	delivery proton.Delivery
-	message  amqp.Message
-}
-
-// sender is a signal channel, closed when we are done sending.
-type sender chan struct{}
-
 // handler handles AMQP events. There is one handler per connection.  The
-// handler does not need to be concurrent-safe as proton will serialize all
-// calls to a handler. We will use channels to communicate from the handler
-// to goroutines sending and receiving messages.
+// handler does not need to be concurrent-safe as proton.Engine will serialize
+// all calls to the handler. We use channels to communicate between the handler
+// goroutine and other goroutines sending and receiving messages.
 type handler struct {
 	queues    *util.Queues
-	credit    int // Credit window for receiver flow control.
-	receivers map[proton.Link]receiver
-	senders   map[proton.Link]sender
+	receivers map[proton.Link]*receiver
+	senders   map[proton.Link]*sender
+	injecter  proton.Injecter
 }
 
-func newHandler(queues *util.Queues, credit int) *handler {
+func newHandler(queues *util.Queues) *handler {
 	return &handler{
-		queues,
-		credit,
-		make(map[proton.Link]receiver),
-		make(map[proton.Link]sender),
+		queues:    queues,
+		receivers: make(map[proton.Link]*receiver),
+		senders:   make(map[proton.Link]*sender),
 	}
 }
 
-// Handle an AMQP event.
+// HandleMessagingEvent handles an event, called in the handler goroutine.
 func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
 	switch t {
 
+	case proton.MStart:
+		h.injecter = e.Injecter()
+
 	case proton.MLinkOpening:
-		l := e.Link()
-		var err error
-		if l.IsReceiver() {
-			err = h.receiver(l)
-		} else { // IsSender()
-			err = h.sender(l)
-		}
-		if err == nil {
-			util.Debugf("%s opened", l)
+		if e.Link().IsReceiver() {
+			h.startReceiver(e)
 		} else {
-			util.Debugf("%s open error: %s", l, err)
-			proton.CloseError(l, err)
+			h.startSender(e)
 		}
 
-	case proton.MLinkClosing:
-		l := e.Link()
-		if r, ok := h.receivers[l]; ok {
-			close(r)
-			delete(h.receivers, l)
-		} else if s, ok := h.senders[l]; ok {
-			close(s)
-			delete(h.senders, l)
-		}
-		util.Debugf("%s closed", l)
+	case proton.MLinkClosed:
+		h.linkClosed(e.Link(), e.Link().RemoteCondition().Error())
 
 	case proton.MSendable:
-		l := e.Link()
-		q := h.queues.Get(l.RemoteSource().Address())
-		if n, err := h.sendAll(e.Link(), q); err == nil && n > 0 {
-			// Still have credit, start a watcher.
-			go h.sendWatch(e.Link(), q)
+		if s, ok := h.senders[e.Link()]; ok {
+			s.sendable() // Signal the send goroutine that we have credit.
+		} else {
+			proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s sender not found", e.Link()))
 		}
 
 	case proton.MMessage:
-		l := e.Link()
-		d := e.Delivery()
-		m, err := d.Message() // Must decode message immediately before link state changes.
+		m, err := e.Delivery().Message() // Message() must be called while handling the MMessage event.
 		if err != nil {
-			util.Debugf("%s error decoding message: %s", e.Link(), err)
-			proton.CloseError(l, err)
-		} else {
-			// This will not block, AMQP credit prevents us from overflowing the buffer.
-			h.receivers[l] <- receivedMessage{d, m}
-			util.Debugf("%s received %s", l, util.FormatMessage(m))
+			proton.CloseError(e.Link(), err)
+			break
 		}
+		r, ok := h.receivers[e.Link()]
+		if !ok {
+			proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s receiver not found", e.Link()))
+			break
+		}
+		// This will not block as AMQP credit is set to the buffer capacity.
+		r.buffer <- receivedMessage{e.Delivery(), m}
+		util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m))
 
-	case proton.MConnectionClosing, proton.MDisconnected:
-		for l, r := range h.receivers {
-			close(r)
-			delete(h.receivers, l)
+	case proton.MConnectionClosed, proton.MDisconnected:
+		for l, _ := range h.receivers {
+			h.linkClosed(l, nil)
 		}
-		for l, s := range h.senders {
-			close(s)
-			delete(h.senders, l)
+		for l, _ := range h.senders {
+			h.linkClosed(l, nil)
 		}
 	}
 }
 
-// receiver is called by the handler when a receiver link opens.
+// linkClosed is called when a link has been closed by both ends.
+// It removes the link from the handlers maps and stops its goroutine.
+func (h *handler) linkClosed(l proton.Link, err error) {
+	if s, ok := h.senders[l]; ok {
+		s.stop()
+		delete(h.senders, l)
+	} else if r, ok := h.receivers[l]; ok {
+		r.stop()
+		delete(h.receivers, l)
+	}
+}
+
+// link has some common data and methods that are used by the sender and receiver types.
 //
-// It sets up data structures in the handler and then starts a goroutine
-// to receive messages and put them on a queue.
-func (h *handler) receiver(l proton.Link) error {
-	q := h.queues.Get(l.RemoteTarget().Address())
-	buffer := make(receiver, h.credit)
-	h.receivers[l] = buffer
-	l.Flow(cap(buffer)) // credit==cap(buffer) so we won't overflow the buffer.
-	go h.runReceive(l, buffer, q)
-	return nil
+// An active link is represented by a sender or receiver value and a goroutine
+// running its run() method. The run() method communicates with the handler via
+// channels.
+type link struct {
+	l proton.Link
+	q util.Queue
+	h *handler
+}
+
+func makeLink(l proton.Link, q util.Queue, h *handler) link {
+	lnk := link{l: l, q: q, h: h}
+	return lnk
+}
+
+// receiver has a channel to buffer messages that have been received by the
+// handler and are waiting to go on the queue. AMQP credit ensures that the
+// handler does not overflow the buffer and block.
+type receiver struct {
+	link
+	buffer chan receivedMessage
 }
 
-// runReceive moves messages from buffer to queue
-func (h *handler) runReceive(l proton.Link, buffer receiver, q util.Queue) {
-	for rm := range buffer {
-		q <- rm.message
-		rm2 := rm // Save in temp var for injected closure
-		err := l.Connection().Injecter().Inject(func() {
-			rm2.delivery.Accept()
-			l.Flow(1)
+// receivedMessage holds a message and a Delivery so that the message can be
+// acknowledged when it is put on the queue.
+type receivedMessage struct {
+	delivery proton.Delivery
+	message  amqp.Message
+}
+
+// startReceiver creates a receiver and a goroutine for its run() method.
+func (h *handler) startReceiver(e proton.Event) {
+	q := h.queues.Get(e.Link().RemoteTarget().Address())
+	r := &receiver{
+		link:   makeLink(e.Link(), q, h),
+		buffer: make(chan receivedMessage, *credit),
+	}
+	h.receivers[r.l] = r
+	r.l.Flow(cap(r.buffer)) // Give credit to fill the buffer to capacity.
+	go r.run()
+}
+
+// run runs in a separate goroutine. It moves messages from the buffer to the
+// queue for a receiver link, and injects a handler function to acknowledge the
+// message and send a credit.
+func (r *receiver) run() {
+	for rm := range r.buffer {
+		r.q <- rm.message
+		d := rm.delivery
+		// We are not in the handler goroutine so we Inject the accept function as a closure.
+		r.h.injecter.Inject(func() {
+			// Check that the receiver is still open, it may have been closed by the remote end.
+			if r == r.h.receivers[r.l] {
+				d.Accept()  // Accept the delivery
+				r.l.Flow(1) // Add one credit
+			}
 		})
-		if err != nil {
-			util.Debugf("%s receive error: %s", l, err)
-			proton.CloseError(l, err)
-		}
 	}
 }
 
-// sender is called by the handler when a sender link opens.
-// It sets up a sender structures in the handler.
-func (h *handler) sender(l proton.Link) error {
-	h.senders[l] = make(sender)
-	return nil
+// stop closes the buffer channel and waits for the run() goroutine to stop.
+func (r *receiver) stop() {
+	close(r.buffer)
 }
 
-// send one message in handler context, assumes we have credit.
-func (h *handler) send(l proton.Link, m amqp.Message, q util.Queue) error {
-	delivery, err := l.Send(m)
-	if err != nil {
-		h.closeSender(l, err)
-		return err
+// sender has a channel that is used to signal when there is credit to send messages.
+type sender struct {
+	link
+	credit chan struct{} // Channel to signal availability of credit.
+}
+
+// startSender creates a sender and starts a goroutine for sender.run()
+func (h *handler) startSender(e proton.Event) {
+	q := h.queues.Get(e.Link().RemoteSource().Address())
+	s := &sender{
+		link:   makeLink(e.Link(), q, h),
+		credit: make(chan struct{}, 1), // Capacity of 1 for signalling.
 	}
-	delivery.Settle() // Pre-settled, unreliable.
-	util.Debugf("%s sent %s", l, util.FormatMessage(m))
-	return nil
+	h.senders[e.Link()] = s
+	go s.run()
 }
 
-// sendAll sends as many messages as possible without blocking, call in handler context.
-// Returns the number of credits left, >0 means we ran out of messages.
-func (h *handler) sendAll(l proton.Link, q util.Queue) (int, error) {
-	for l.Credit() > 0 {
-		select {
-		case m, ok := <-q:
-			if ok { // Got a message
-				if err := h.send(l, m, q); err != nil {
-					return 0, err
-				}
-			} else { // Queue is closed
-				l.Close()
-				return 0, io.EOF
-			}
-		default: // Queue empty
-			return l.Credit(), nil
-		}
+// stop closes the credit channel and waits for the run() goroutine to stop.
+func (s *sender) stop() {
+	close(s.credit)
+}
+
+// sendable signals that the sender has credit, it does not block.
+// sender.credit has capacity 1, if it is already full we carry on.
+func (s *sender) sendable() {
+	select { // Non-blocking
+	case s.credit <- struct{}{}:
+	default:
 	}
-	return l.Credit(), nil
 }
 
-// sendWatch watches the queue for more messages and re-runs sendAll.
-// Run in a separate goroutine, so must inject handler functions.
-func (h *handler) sendWatch(l proton.Link, q util.Queue) {
-	select {
-	case m, ok := <-q:
-		l.Connection().Injecter().Inject(func() {
-			if ok {
-				if h.send(l, m, q) != nil {
+// run runs in a separate goroutine. It monitors the queue for messages and injects
+// a function to send them when there is credit
+func (s *sender) run() {
+	var q chan amqp.Message // q is nil initially as we have no credit.
+	for {
+		select {
+		case _, ok := <-s.credit:
+			if !ok { // sender closed
+				return
+			}
+			q = s.q // We have credit, enable selecting on the queue.
+
+		case m, ok := <-q: // q is only enabled when we have credit.
+			if !ok { // queue closed
+				return
+			}
+			q = nil                      // Assume all credit will be used used, will be signaled otherwise.
+			s.h.injecter.Inject(func() { // Inject handler function to actually send
+				if s.h.senders[s.l] != s { // The sender has been closed by the remote end.
+					go func() { q <- m }() // Put the message back on the queue but don't block
 					return
 				}
-				if n, err := h.sendAll(l, q); err != nil {
+				if s.sendOne(m) != nil {
 					return
-				} else if n > 0 {
-					go h.sendWatch(l, q) // Start a new watcher.
 				}
-			}
-		})
-	case <-h.senders[l]: // Closed
-		return
+				// Send as many more messages as we can without blocking
+				for s.l.Credit() > 0 {
+					select { // Non blocking receive from q
+					case m, ok := <-s.q:
+						if ok {
+							s.sendOne(m)
+						}
+					default: // Queue is empty but we have credit, signal the run() goroutine.
+						s.sendable()
+					}
+				}
+			})
+		}
 	}
 }
 
-// closeSender closes a sender link and signals goroutines processing that sender.
-func (h *handler) closeSender(l proton.Link, err error) {
-	util.Debugf("%s sender closed: %s", l, err)
-	proton.CloseError(l, err)
-	close(h.senders[l])
-	delete(h.senders, l)
+// sendOne runs in the handler goroutine. It sends a single message.
+func (s *sender) sendOne(m amqp.Message) error {
+	delivery, err := s.l.Send(m)
+	if err == nil {
+		delivery.Settle() // Pre-settled, unreliable.
+		util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
+	} else {
+		go func() { s.q <- m }() // Put the message back on the queue but don't block
+	}
+	return err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
index 5118467..20f2192 100644
--- a/examples/go/util/util.go
+++ b/examples/go/util/util.go
@@ -64,5 +64,5 @@ func FormatMessage(m amqp.Message) string {
 func init() {
 	log.SetFlags(0)
 	_, prog := path.Split(os.Args[0])
-	log.SetPrefix(prog + ": ")
+	log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid()))
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt
index 51c2d86..74c7e13 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -75,14 +75,17 @@ foreach (pkg amqp proton electron)
   set(sources "${GoFiles}${CgoFiles}")
 
   # Build the package library
-  add_custom_command(OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends})
+  add_custom_command(
+    OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends}
+    WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
   set(target go-package-${pkg})
   add_custom_target(${target} ALL DEPENDS ${lib})
 
   # Package test
   go_sources(TestGoFiles)
   set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/${pkg}.test)
-  add_custom_command(OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package}
+  add_custom_command(
+    OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package}
     DEPENDS ${target} qpid-proton)
   add_custom_target(go-package-test-${pkg} ALL DEPENDS ${test_exe})
   add_test(NAME go_test_${pkg} COMMAND ${test_exe} WORKING_DIRECTORY ${dir})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index bef8c17..d6761d6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -28,131 +28,114 @@ import (
 	"qpid.apache.org/internal"
 	"qpid.apache.org/proton"
 	"sync"
+	"time"
 )
 
 // Connection is an AMQP connection, created by a Container.
 type Connection interface {
 	Endpoint
 
-	// Server puts the connection in server mode, must be called before Open().
-	//
-	// A server connection will do protocol negotiation to accept a incoming AMQP
-	// connection. Normally you would call this for a connection created by
-	// net.Listener.Accept()
-	//
-	Server()
-
-	// Listen arranges for endpoints opened by the remote peer to be passed to accept().
-	// Listen() must be called before Connection.Open().
-	//
-	// accept() is passed a Session, Sender or Receiver.  It can examine endpoint
-	// properties and set some properties (e.g. Receiver.SetCapacity()) Returning nil
-	// will accept the endpoint, returning an error will reject it.
-	//
-	// accept() must not block or use the endpoint other than to examine or set
-	// properties.  It can start a goroutine to process the Endpoint, or pass the
-	// Endpoint to another goroutine via a channel, and that goroutine can use
-	// the endpoint as normal.
-	//
-	// The default Listen function is RejectEndpoint which rejects all endpoints.
-	// You can call Listen(AcceptEndpoint) to accept all endpoints
-	Listen(accept func(Endpoint) error)
-
-	// Open the connection, ready for use.
-	Open() error
-
 	// Sender opens a new sender on the DefaultSession.
 	//
 	// v can be a string, which is used as the Target address, or a SenderSettings
 	// struct containing more details settings.
-	Sender(setting ...LinkSetting) (Sender, error)
+	Sender(...LinkSetting) (Sender, error)
 
 	// Receiver opens a new Receiver on the DefaultSession().
 	//
 	// v can be a string, which is used as the
 	// Source address, or a ReceiverSettings struct containing more details
 	// settings.
-	Receiver(setting ...LinkSetting) (Receiver, error)
+	Receiver(...LinkSetting) (Receiver, error)
 
 	// DefaultSession() returns a default session for the connection. It is opened
 	// on the first call to DefaultSession and returned on subsequent calls.
 	DefaultSession() (Session, error)
 
 	// Session opens a new session.
-	Session() (Session, error)
+	Session(...SessionSetting) (Session, error)
 
 	// Container for the connection.
 	Container() Container
 
 	// Disconnect the connection abruptly with an error.
 	Disconnect(error)
-}
 
-// AcceptEndpoint pass to Connection.Listen to accept all endpoints
-func AcceptEndpoint(Endpoint) error { return nil }
+	// Wait waits for the connection to be disconnected.
+	Wait() error
 
-// RejectEndpoint pass to Connection.Listen to reject all endpoints
-func RejectEndpoint(Endpoint) error {
-	return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
+	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
+	WaitTimeout(time.Duration) error
+}
+
+// ConnectionSetting can be passed when creating a connection.
+// See functions that return ConnectionSetting for details
+type ConnectionSetting func(*connection)
+
+// Server setting puts the connection in server mode.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
+func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } }
+
+// Accepter provides a function to be called when a connection receives an incoming
+// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
+//
+// The accept() function must not block or use the accepted endpoint.
+// It can pass the endpoint to another goroutine for processing.
+//
+// By default all incoming endpoints are rejected.
+func Accepter(accept func(Incoming)) ConnectionSetting {
+	return func(c *connection) { c.accept = accept }
 }
 
 type connection struct {
 	endpoint
 	listenOnce, defaultSessionOnce, closeOnce sync.Once
 
-	// Set before Open()
-	container *container
-	conn      net.Conn
-	accept    func(Endpoint) error
-
-	// Set by Open()
+	container   *container
+	conn        net.Conn
+	accept      func(Incoming)
 	handler     *handler
 	engine      *proton.Engine
 	err         internal.ErrorHolder
 	eConnection proton.Connection
 
 	defaultSession Session
+	done           chan struct{}
 }
 
-func newConnection(conn net.Conn, cont *container) (*connection, error) {
-	c := &connection{container: cont, conn: conn, accept: RejectEndpoint}
+func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) {
+	c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
 	c.handler = newHandler(c)
 	var err error
 	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 	if err != nil {
 		return nil, err
 	}
+	for _, set := range setting {
+		set(c)
+	}
 	c.str = c.engine.String()
 	c.eConnection = c.engine.Connection()
+	go func() { c.engine.Run(); close(c.done) }()
 	return c, nil
 }
 
-func (c *connection) Server() { c.engine.Server() }
-
-func (c *connection) Listen(accept func(Endpoint) error) { c.accept = accept }
-
-func (c *connection) Open() error {
-	go c.engine.Run()
-	return nil
-}
-
-func (c *connection) Close(err error) { c.engine.Close(err) }
+func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 
-func (c *connection) Disconnect(err error) { c.engine.Disconnect(err) }
+func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
 
-func (c *connection) closed(err error) {
-	// Call from another goroutine to initiate close without deadlock.
-	go c.Close(err)
-}
-
-func (c *connection) Session() (Session, error) {
+func (c *connection) Session(setting ...SessionSetting) (Session, error) {
 	var s Session
 	err := c.engine.InjectWait(func() error {
 		eSession, err := c.engine.Connection().Session()
 		if err == nil {
 			eSession.Open()
 			if err == nil {
-				s = newSession(c, eSession)
+				s = newSession(c, eSession, setting...)
 			}
 		}
 		return err
@@ -189,3 +172,47 @@ func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
 }
 
 func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+	_, err := timedReceive(c.done, timeout)
+	if err == Timeout {
+		return Timeout
+	}
+	return c.Error()
+}
+
+// Incoming is the interface for incoming requests to open an endpoint.
+// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
+type Incoming interface {
+	// Accept the endpoint with default settings.
+	//
+	// You must not use the returned endpoint in the accept() function that
+	// receives the Incoming value, but you can pass it to other goroutines.
+	//
+	// Implementing types provide type-specific Accept functions that take additional settings.
+	Accept() Endpoint
+
+	// Reject the endpoint with an error
+	Reject(error)
+
+	error() error
+}
+
+type incoming struct {
+	err      error
+	accepted bool
+}
+
+func (i *incoming) Reject(err error) { i.err = err }
+
+func (i *incoming) error() error {
+	switch {
+	case i.err != nil:
+		return i.err
+	case !i.accepted:
+		return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
+	default:
+		return nil
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index 06a9a14..7bbc4b0 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -39,7 +39,7 @@ type Container interface {
 	// setting any Connection properties you need to set. Note the net.Conn
 	// can be an outgoing connection (e.g. made with net.Dial) or an incoming
 	// connection (e.g. made with net.Listener.Accept())
-	Connection(conn net.Conn) (Connection, error)
+	Connection(net.Conn, ...ConnectionSetting) (Connection, error)
 }
 
 type container struct {
@@ -66,6 +66,6 @@ func (cont *container) nextLinkName() string {
 	return cont.id + "@" + cont.linkNames.Next()
 }
 
-func (cont *container) Connection(conn net.Conn) (Connection, error) {
-	return newConnection(conn, cont)
+func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
+	return newConnection(conn, cont, setting...)
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1b07109..b518e42 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -27,7 +27,7 @@ import (
 // NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
 
 type handler struct {
-	delegator    *proton.MessagingDelegator
+	delegator    *proton.MessagingAdapter
 	connection   *connection
 	links        map[proton.Link]Link
 	sentMessages map[proton.Delivery]*sentMessage
@@ -41,8 +41,8 @@ func newHandler(c *connection) *handler {
 		sentMessages: make(map[proton.Delivery]*sentMessage),
 		sessions:     make(map[proton.Session]*session),
 	}
-	h.delegator = proton.NewMessagingDelegator(h)
-	// Disable auto features of MessagingDelegator, we do these ourselves.
+	h.delegator = proton.NewMessagingAdapter(h)
+	// Disable auto features of MessagingAdapter, we do these ourselves.
 	h.delegator.Prefetch = 0
 	h.delegator.AutoAccept = false
 	h.delegator.AutoSettle = false
@@ -50,6 +50,10 @@ func newHandler(c *connection) *handler {
 	return h
 }
 
+func (h *handler) internalError(fmt string, arg ...interface{}) {
+	proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
+}
+
 func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
 	switch t {
 
@@ -57,9 +61,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		if r, ok := h.links[e.Link()].(*receiver); ok {
 			r.message(e.Delivery())
 		} else {
-			proton.CloseError(
-				h.connection.eConnection,
-				amqp.Errorf(amqp.InternalError, "no receiver for link %s", e.Link()))
+			h.internalError("no receiver for link %s", e.Link())
 		}
 
 	case proton.MSettled:
@@ -68,19 +70,18 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 
 	case proton.MSendable:
-		h.trySend(e.Link())
+		if s, ok := h.links[e.Link()].(*sender); ok {
+			s.sendable()
+		} else {
+			h.internalError("no receiver for link %s", e.Link())
+		}
 
 	case proton.MSessionOpening:
 		if e.Session().State().LocalUninit() { // Remotely opened
-			s := newSession(h.connection, e.Session())
-			if err := h.connection.accept(s); err != nil {
-				proton.CloseError(e.Session(), (err))
-			} else {
-				h.sessions[e.Session()] = s
-				if s.capacity > 0 {
-					e.Session().SetIncomingCapacity(s.capacity)
-				}
-				e.Session().Open()
+			incoming := &IncomingSession{h: h, pSession: e.Session()}
+			h.connection.accept(incoming)
+			if err := incoming.error(); err != nil {
+				proton.CloseError(e.Session(), err)
 			}
 		}
 
@@ -95,33 +96,24 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 
 	case proton.MLinkOpening:
 		l := e.Link()
-		if l.State().LocalUninit() { // Remotely opened
-			ss := h.sessions[l.Session()]
-			if ss == nil {
-				proton.CloseError(
-					l, amqp.Errorf(amqp.InternalError, ("no session for link")))
-				break
-			}
-			var link Link
-			if l.IsReceiver() {
-				r := &receiver{link: incomingLink(ss, l)}
-				link = r
-				r.inAccept = true
-				defer func() { r.inAccept = false }()
-			} else {
-				link = &sender{link: incomingLink(ss, l)}
-			}
-			if err := h.connection.accept(link); err != nil {
-				proton.CloseError(l, err)
-				break
-			}
-			link.open()
+		if l.State().LocalActive() { // Already opened locally.
+			break
 		}
-
-	case proton.MLinkOpened:
-		l := e.Link()
-		if l.IsSender() {
-			h.trySend(l)
+		ss := h.sessions[l.Session()]
+		if ss == nil {
+			h.internalError("no session for link %s", e.Link())
+			break
+		}
+		var incoming Incoming
+		if l.IsReceiver() {
+			incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
+		} else {
+			incoming = &IncomingSender{makeIncomingLink(ss, l)}
+		}
+		h.connection.accept(incoming)
+		if err := incoming.error(); err != nil {
+			proton.CloseError(l, err)
+			break
 		}
 
 	case proton.MLinkClosing:
@@ -130,7 +122,17 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 	case proton.MLinkClosed:
 		h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
 
+	case proton.MConnectionClosing:
+		h.connection.err.Set(e.Connection().RemoteCondition().Error())
+
+	case proton.MConnectionClosed:
+		h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
+
 	case proton.MDisconnected:
+		h.connection.err.Set(e.Transport().Condition().Error())
+		// If err not set at this point (e.g. to Closed) then this is unexpected.
+		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
+
 		err := h.connection.Error()
 		for l, _ := range h.links {
 			h.linkClosed(l, err)
@@ -154,19 +156,3 @@ func (h *handler) linkClosed(l proton.Link, err error) {
 func (h *handler) addLink(rl proton.Link, ll Link) {
 	h.links[rl] = ll
 }
-
-func (h *handler) trySend(l proton.Link) {
-	if l.Credit() <= 0 {
-		return
-	}
-	if s, ok := h.links[l].(*sender); ok {
-		for ch := s.popBlocked(); l.Credit() > 0 && ch != nil; ch = s.popBlocked() {
-			if snd, ok := <-ch; ok {
-				s.doSend(snd)
-			}
-		}
-	} else {
-		h.connection.closed(
-			amqp.Errorf(amqp.InternalError, "cannot find sender for link %s", l))
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index abc8431..4bef53b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -60,39 +60,39 @@ type Link interface {
 	open()
 }
 
-// LinkSetting is a function that sets a link property. Passed when creating
-// a Sender or Receiver, do not use at any other time.
-type LinkSetting func(Link)
+// LinkSetting can be passed when creating a sender or receiver.
+// See functions that return LinkSetting for details
+type LinkSetting func(*link)
 
 // Source sets address that messages are coming from.
-func Source(s string) LinkSetting { return func(l Link) { l.(*link).source = s } }
+func Source(s string) LinkSetting { return func(l *link) { l.source = s } }
 
 // Target sets address that messages are going to.
-func Target(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+func Target(s string) LinkSetting { return func(l *link) { l.target = s } }
 
 // LinkName sets the link name.
-func LinkName(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } }
 
 // SndSettle sets the send settle mode
-func SndSettle(m SndSettleMode) LinkSetting { return func(l Link) { l.(*link).sndSettle = m } }
+func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } }
 
 // RcvSettle sets the send settle mode
-func RcvSettle(m RcvSettleMode) LinkSetting { return func(l Link) { l.(*link).rcvSettle = m } }
+func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } }
 
 // SndSettleMode defines when the sending end of the link settles message delivery.
 type SndSettleMode proton.SndSettleMode
 
 // Capacity sets the link capacity
-func Capacity(n int) LinkSetting { return func(l Link) { l.(*link).capacity = n } }
+func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } }
 
 // Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
-func Prefetch(p bool) LinkSetting { return func(l Link) { l.(*link).prefetch = p } }
+func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } }
 
 // AtMostOnce sets "fire and forget" mode, messages are sent but no
 // acknowledgment is received, messages can be lost if there is a network
 // failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 func AtMostOnce() LinkSetting {
-	return func(l Link) {
+	return func(l *link) {
 		SndSettle(SndSettled)(l)
 		RcvSettle(RcvFirst)(l)
 	}
@@ -104,7 +104,7 @@ func AtMostOnce() LinkSetting {
 // that the message will be received twice in this case.
 // Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 func AtLeastOnce() LinkSetting {
-	return func(l Link) {
+	return func(l *link) {
 		SndSettle(SndUnsettled)(l)
 		RcvSettle(RcvFirst)(l)
 	}
@@ -145,8 +145,6 @@ type link struct {
 	session *session
 	eLink   proton.Link
 	done    chan struct{} // Closed when link is closed
-
-	inAccept bool
 }
 
 func (l *link) Source() string           { return l.source }
@@ -163,8 +161,8 @@ func (l *link) engine() *proton.Engine { return l.session.connection.engine }
 func (l *link) handler() *handler      { return l.session.connection.handler }
 
 // Set up link fields and open the proton.Link
-func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error) {
-	l := &link{
+func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) {
+	l := link{
 		session:  sn,
 		isSender: isSender,
 		capacity: 1,
@@ -172,7 +170,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
 		done:     make(chan struct{}),
 	}
 	for _, set := range setting {
-		set(l)
+		set(&l)
 	}
 	if l.linkName == "" {
 		l.linkName = l.session.connection.container.nextLinkName()
@@ -184,7 +182,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
 	}
 	if l.eLink.IsNil() {
 		l.err.Set(internal.Errorf("cannot create link %s", l))
-		return nil, l.err.Get()
+		return l, l.err.Get()
 	}
 	l.eLink.Source().SetAddress(l.source)
 	l.eLink.Target().SetAddress(l.target)
@@ -195,20 +193,27 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
 	return l, nil
 }
 
+type incomingLink struct {
+	incoming
+	link
+}
+
 // Set up a link from an incoming proton.Link.
-func incomingLink(sn *session, eLink proton.Link) link {
-	l := link{
-		session:   sn,
-		isSender:  eLink.IsSender(),
-		eLink:     eLink,
-		source:    eLink.RemoteSource().Address(),
-		target:    eLink.RemoteTarget().Address(),
-		linkName:  eLink.Name(),
-		sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
-		rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
-		capacity:  1,
-		prefetch:  false,
-		done:      make(chan struct{}),
+func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
+	l := incomingLink{
+		link: link{
+			session:   sn,
+			isSender:  eLink.IsSender(),
+			eLink:     eLink,
+			source:    eLink.RemoteSource().Address(),
+			target:    eLink.RemoteTarget().Address(),
+			linkName:  eLink.Name(),
+			sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
+			rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
+			capacity:  1,
+			prefetch:  false,
+			done:      make(chan struct{}),
+		},
 	}
 	l.str = eLink.String()
 	return l

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 92c0b90..59ac018 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -20,10 +20,9 @@ under the License.
 package electron
 
 import (
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/internal"
 	"qpid.apache.org/proton"
-	"qpid.apache.org/amqp"
-	"sync"
 	"time"
 )
 
@@ -63,10 +62,6 @@ type Receiver interface {
 	// Capacity is the size (number of messages) of the local message buffer
 	// These are messages received but not yet returned to the application by a call to Receive()
 	Capacity() int
-
-	// SetCapacity sets Capacity and Prefetch of an accepted Receiver.
-	// May only be called in an accept() function, see Connection.Listen()
-	SetCapacity(capacity int, prefetch bool)
 }
 
 // Flow control policy for a receiver.
@@ -120,18 +115,12 @@ func (p noPrefetchPolicy) Post(r *receiver, err error) {
 // Receiver implementation
 type receiver struct {
 	link
-	buffer    chan ReceivedMessage
-	policy    policy
-	setupOnce sync.Once
-}
-
-func (r *receiver) SetCapacity(capacity int, prefetch bool) {
-	internal.Assert(r.inAccept, "Receiver.SetCapacity called outside of accept function")
-	r.capacity = capacity
-	r.prefetch = prefetch
+	buffer chan ReceivedMessage
+	policy policy
 }
 
-func (r *receiver) setup() {
+func newReceiver(l link) *receiver {
+	r := &receiver{link: l}
 	if r.capacity < 1 {
 		r.capacity = 1
 	}
@@ -141,6 +130,9 @@ func (r *receiver) setup() {
 		r.policy = &noPrefetchPolicy{}
 	}
 	r.buffer = make(chan ReceivedMessage, r.capacity)
+	r.handler().addLink(r.eLink, r)
+	r.link.open()
+	return r
 }
 
 // call in proton goroutine.
@@ -156,15 +148,14 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 }
 
 func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
-	r.setupOnce.Do(r.setup)
 	internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
 	r.policy.Pre(r)
 	defer func() { r.policy.Post(r, err) }()
-	rmi, ok, timedout := timedReceive(r.buffer, timeout)
-	switch {
-	case timedout:
+	rmi, err := timedReceive(r.buffer, timeout)
+	switch err {
+	case Timeout:
 		return ReceivedMessage{}, Timeout
-	case !ok:
+	case Closed:
 		return ReceivedMessage{}, r.Error()
 	default:
 		return rmi.(ReceivedMessage), nil
@@ -194,12 +185,6 @@ func (r *receiver) message(delivery proton.Delivery) {
 	}
 }
 
-func (r *receiver) open() {
-	r.setupOnce.Do(r.setup)
-	r.link.open()
-	r.handler().addLink(r.eLink, r)
-}
-
 func (r *receiver) closed(err error) {
 	r.link.closed(err)
 	if r.buffer != nil {
@@ -230,3 +215,24 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
 
 // Reject is short for Acknowledge(Rejected)
 func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
+
+// IncomingReceiver is passed to the accept() function given to Connection.Listen()
+// when there is an incoming request for a receiver link.
+type IncomingReceiver struct {
+	incomingLink
+}
+
+// Link provides information about the incoming link.
+func (i *IncomingReceiver) Link() Link { return i }
+
+// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
+func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
+	i.capacity = capacity
+	i.prefetch = prefetch
+	return i.Accept().(Receiver)
+}
+
+func (i *IncomingReceiver) Accept() Endpoint {
+	i.accepted = true
+	return newReceiver(i.link)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 3124f74..68cfbb3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -23,10 +23,9 @@ package electron
 import "C"
 
 import (
-	"container/list"
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/internal"
 	"qpid.apache.org/proton"
-	"qpid.apache.org/amqp"
 	"reflect"
 	"time"
 )
@@ -69,7 +68,7 @@ type sendMessage struct {
 
 type sender struct {
 	link
-	blocked list.List // Channel of sendMessage for blocked senders.
+	credit chan struct{} // Signal available credit.
 }
 
 // Disposition indicates the outcome of a settled message delivery.
@@ -102,31 +101,6 @@ func (d Disposition) String() string {
 	}
 }
 
-// Send a message, assumes there is credit
-func (s *sender) doSend(snd sendMessage) {
-	delivery, err := s.eLink.Send(snd.m)
-	switch sm := snd.sm.(type) {
-	case nil:
-		delivery.Settle()
-	case *sentMessage:
-		sm.delivery = delivery
-		if err != nil {
-			sm.settled(err)
-		} else {
-			s.handler().sentMessages[delivery] = sm
-		}
-	default:
-		internal.Assert(false, "bad SentMessage type %T", snd.sm)
-	}
-}
-
-func (s *sender) popBlocked() chan sendMessage {
-	if s.blocked.Len() > 0 {
-		return s.blocked.Remove(s.blocked.Front()).(chan sendMessage)
-	}
-	return nil
-}
-
 func (s *sender) Send(m amqp.Message) (SentMessage, error) {
 	return s.SendTimeout(m, Forever)
 }
@@ -152,52 +126,58 @@ func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error
 }
 
 func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
-	if s.Error() != nil {
-		return nil, s.Error()
-	}
-	var err error
-	if timeout == 0 {
-		err = s.engine().InjectWait(func() error {
-			if s.eLink.Credit() > 0 {
-				s.doSend(snd)
-				return nil
-			}
-			return Timeout
-		})
-	} else {
-		buf := make(chan sendMessage)
-		done := make(chan struct{})
-		defer close(buf)
-		s.engine().Inject(func() { // Runs concurrently
-			if s.eLink.Credit() > 0 {
-				s.doSend(snd)
-				close(done) // Signal already sent
-			} else {
-				s.blocked.PushBack(buf)
-			}
-		})
-		select {
-		case <-done: // Sent without blocking
-		case buf <- snd: // Sent via blocking channel
-		case <-s.done:
+	if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
+		if err == Closed {
 			err = s.Error()
-		case <-After(timeout):
-			err = Timeout
+			internal.Assert(err != nil)
 		}
+		return nil, err
 	}
-	if err != nil {
+	if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
 		return nil, err
 	}
 	return snd.sm, nil
 }
 
+// Send a message. Handler goroutine
+func (s *sender) doSend(snd sendMessage) {
+	delivery, err := s.eLink.Send(snd.m)
+	switch sm := snd.sm.(type) {
+	case nil:
+		delivery.Settle()
+	case *sentMessage:
+		sm.delivery = delivery
+		if err != nil {
+			sm.settled(err)
+		} else {
+			s.handler().sentMessages[delivery] = sm
+		}
+	default:
+		internal.Assert(false, "bad SentMessage type %T", snd.sm)
+	}
+	if s.eLink.Credit() > 0 {
+		s.sendable() // Signal credit.
+	}
+}
+
+// Signal the sender has credit. Any goroutine.
+func (s *sender) sendable() {
+	select { // Non-blocking
+	case s.credit <- struct{}{}: // Set the flag if not already set.
+	default:
+	}
+}
+
 func (s *sender) closed(err error) {
 	s.link.closed(err)
+	close(s.credit)
 }
 
-func (s *sender) open() {
-	s.link.open()
+func newSender(l link) *sender {
+	s := &sender{link: l, credit: make(chan struct{}, 1)}
 	s.handler().addLink(s.eLink, s)
+	s.link.open()
+	return s
 }
 
 // SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
@@ -285,7 +265,7 @@ func (sm *sentMessage) Disposition() (Disposition, error) {
 }
 
 func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
-	if _, _, timedout := timedReceive(sm.done, timeout); timedout {
+	if _, err := timedReceive(sm.done, timeout); err == Timeout {
 		return sm.disposition, Timeout
 	} else {
 		return sm.disposition, sm.err
@@ -317,3 +297,19 @@ func (sm *sentMessage) finish() {
 }
 
 func (sm *sentMessage) Error() error { return sm.err }
+
+// IncomingSender is passed to the accept() function given to Connection.Listen()
+// when there is an incoming request for a sender link.
+type IncomingSender struct {
+	incomingLink
+}
+
+// Link provides information about the incoming link.
+func (i *IncomingSender) Link() Link { return i }
+
+func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
+
+func (i *IncomingSender) Accept() Endpoint {
+	i.accepted = true
+	return newSender(i.link)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 612658a..3531da6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -24,7 +24,6 @@ import (
 )
 
 // Session is an AMQP session, it contains Senders and Receivers.
-//
 type Session interface {
 	Endpoint
 
@@ -36,10 +35,6 @@ type Session interface {
 	// Source address, or a ReceiverSettings struct containing more details
 	// settings.
 	Receiver(...LinkSetting) (Receiver, error)
-
-	// SetCapacity sets the session buffer capacity in bytes.
-	// Only has effect if called in an accept() function, see Connection.Listen()
-	SetCapacity(bytes uint)
 }
 
 type session struct {
@@ -49,13 +44,27 @@ type session struct {
 	capacity   uint
 }
 
+// SessionSetting can be passed when creating a sender or receiver.
+// See functions that return SessionSetting for details
+type SessionSetting func(*session)
+
+// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
+func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } }
+
 // in proton goroutine
-func newSession(c *connection, es proton.Session) *session {
-	return &session{
+func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session {
+	s := &session{
 		connection: c,
 		eSession:   es,
 		endpoint:   endpoint{str: es.String()},
 	}
+	for _, set := range setting {
+		set(s)
+	}
+	c.handler.sessions[s.eSession] = s
+	s.eSession.SetIncomingCapacity(s.capacity)
+	s.eSession.Open()
+	return s
 }
 
 func (s *session) Connection() Connection     { return s.connection }
@@ -71,8 +80,7 @@ func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
 	err = s.engine().InjectWait(func() error {
 		l, err := localLink(s, true, setting...)
 		if err == nil {
-			snd = &sender{link: *l}
-			snd.(*sender).open()
+			snd = newSender(l)
 		}
 		return err
 	})
@@ -83,8 +91,7 @@ func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
 	err = s.engine().InjectWait(func() error {
 		l, err := localLink(s, false, setting...)
 		if err == nil {
-			rcv = &receiver{link: *l}
-			rcv.(*receiver).open()
+			rcv = newReceiver(l)
 		}
 		return err
 	})
@@ -96,3 +103,23 @@ func (s *session) closed(err error) {
 	s.err.Set(err)
 	s.err.Set(Closed)
 }
+
+// IncomingSession is passed to the accept() function given to Connection.Listen()
+// when there is an incoming session request.
+type IncomingSession struct {
+	incoming
+	h        *handler
+	pSession proton.Session
+	capacity uint
+}
+
+// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
+func (i *IncomingSession) AcceptSession(bytes uint) Session {
+	i.capacity = bytes
+	return i.Accept().(Session)
+}
+
+func (i *IncomingSession) Accept() Endpoint {
+	i.accepted = true
+	return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
index ee61332..3407b82 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -49,24 +49,25 @@ const Forever time.Duration = -1
 // timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
 // forever. Other values mean block up to the timeout.
 //
-func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) {
+// Returns error Timeout on timeout, Closed on channel close.
+func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) {
 	cases := []reflect.SelectCase{
 		reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
 	}
-	switch {
-	case timeout == 0: // Non-blocking receive
+	if timeout == 0 { // Non-blocking
 		cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
-	case timeout == Forever: // Block forever, nothing to add
-	default: // Block up to timeout
+	} else { // Block up to timeout
 		cases = append(cases,
-			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))})
+			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
 	}
-	chosen, recv, recvOk := reflect.Select(cases)
+	chosen, value, ok := reflect.Select(cases)
 	switch {
+	case !ok:
+		return nil, Closed
 	case chosen == 0:
-		return recv.Interface(), recvOk, false
+		return value.Interface(), nil
 	default:
-		return nil, false, true
+		return nil, Timeout
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
index e9d6d6f..51f70f8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -23,9 +23,9 @@ You can write clients and servers using this library.
 
 This package is a port of the Proton C API into Go (see
 http://qpid.apache.org/proton) Go programmers may find the 'electron' package
-more convenient, it provides a concurrent-safe API that allows you to do
-procedural loops in goroutines rather than implementing event handlers that must
-run in a single goroutine.
+more convenient. qpid.apache.org/electron provides a concurrent-safe API that
+allows you to run procedural loops in arbitrary goroutines rather than
+implementing event handlers that must run in a single goroutine.
 
 Consult the C API documentation at http://qpid.apache.org/proton for more
 information about the types here. There is a 1-1 correspondence between C type
@@ -42,21 +42,24 @@ goroutine that feeds events to a proton.MessagingHandler, which you must impleme
 See the Engine documentation for more.
 
 MessagingHandler defines an event handling interface that you can implement to
-react to AMQP protocol events. (There is also a lower-level EventHandler, but
-MessagingHandler provides a simpler set of events and automates common tasks for you.)
+react to AMQP protocol events. There is also a lower-level EventHandler, but
+MessagingHandler provides a simpler set of events and automates common tasks for you,
+for most applications it will be more convenient.
 
-All events generated by proton are handled in the single event-loop goroutine
-associated with the Connection and Engine. You can use Engine.Inject() or
-Engine.InjectWait() to inject additional functions into the event loop. Only
-injected functions or handler functions can use proton types (such as Session,
-Link etc.) Handlers and injected functions can set up channels to communicate
-with other goroutines..
+NOTE: Methods on most types defined in this package (Sessions, Links etc.)  can
+*only* be called in the event handler goroutine of the relevant
+Connection/Engine, either by the HandleEvent method of a handler type or in a
+function injected into the goroutine via Inject() or InjectWait() Handlers and
+injected functions can set up channels to communicate with other goroutines.
+Note the Injecter associated with a handler available as part of the Event value
+passed to HandleEvent.
 
-Separate Connection and Engine instances are independent, and can run concurrently.
+Separate Engine instances are independent, and can run concurrently.
 
 The 'electron' package is built on the proton package but instead offers a
 concurrent-safe API that can use simple procedural loops rather than event
-handlers to express application logic. It may be easier to use w for some applications.
+handlers to express application logic. It is easier to use for most
+applications.
 
 */
 package proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 5dc8727..2cebb49 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -39,15 +39,26 @@ import (
 	"unsafe"
 )
 
-// Injecter allows functions to be "injected" into an event-processing loop.
+// Injecter allows functions to be "injected" into the event-processing loop, to
+// be called in the same goroutine as event handlers.
 type Injecter interface {
-	// Inject a function into an event-loop concurrency context.
+	// Inject a function into the engine goroutine.
 	//
-	// f() will be called in the same concurrency context as event handers, so it
-	// can safely use values that can used be used in that context. If f blocks it
-	// will block the event loop so be careful calling blocking functions in f.
+	// f() will be called in the same goroutine as event handlers, so it can safely
+	// use values belonging to event handlers without synchronization. f() should
+	// not block, no further events or injected functions can be processed until
+	// f() returns.
 	//
-	// Returns a non-nil error if the function could not be injected.
+	// Returns a non-nil error if the function could not be injected and will
+	// never be called. Otherwise the function will eventually be called.
+	//
+	// Note that proton values (Link, Session, Connection etc.) that existed when
+	// Inject(f) was called may have become invalid by the time f() is executed.
+	// Handlers should handle keep track of Closed events to ensure proton values
+	// are not used after they become invalid. One technique is to have map from
+	// proton values to application values. Check that the map has the correct
+	// proton/application value pair at the start of the injected function and
+	// delete the value from the map when handling a Closed event.
 	Inject(f func()) error
 
 	// InjectWait is like Inject but does not return till f() has completed.
@@ -72,19 +83,14 @@ func (b *bufferChan) buffer() []byte {
 }
 
 // Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
-// Handler functions in a single event-loop goroutine. Actions taken by Handler
-// functions (such as sending messages) are encoded and written to the
-// net.Conn. Create a engine with NewEngine()
-//
-// The Engine runs a proton event loop in the goroutine that calls Engine.Run()
-// and creates goroutines to feed data to/from a net.Conn. You can create
-// multiple Engines to handle multiple connections concurrently.
+// Handler functions sequentially in a single goroutine. Actions taken by
+// Handler functions (such as sending messages) are encoded and written to the
+// net.Conn. You can create multiple Engines to handle multiple connections
+// concurrently.
 //
-// Methods on proton values defined in this package (Sessions, Links etc.) can
-// only be called in the goroutine that executes the corresponding
-// Engine.Run(). You implement the EventHandler or MessagingHandler interfaces
-// and provide those values to NewEngine(). Their HandleEvent method will be
-// called in the Engine goroutine, in typical event-driven style.
+// You implement the EventHandler and/or MessagingHandler interfaces and provide
+// those values to NewEngine(). Their HandleEvent method will be called in the
+// event-handling goroutine.
 //
 // Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
 // other goroutines, store them, or use them as map indexes. Effectively they are
@@ -161,7 +167,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 	}
 	C.pn_connection_collect(eng.connection.pn, eng.collector)
 	eng.connection.Open()
-	connectionContexts.Put(eng.connection, connectionContext{eng, eng.String()})
+	connectionContexts.Put(eng.connection, connectionContext{eng.String()})
 	return eng, nil
 }
 
@@ -223,6 +229,7 @@ func (eng *Engine) Server() { eng.transport.SetServer() }
 
 // Close the engine's connection, returns when the engine has exited.
 func (eng *Engine) Close(err error) {
+	eng.err.Set(err)
 	eng.Inject(func() {
 		CloseError(eng.connection, err)
 	})
@@ -231,9 +238,7 @@ func (eng *Engine) Close(err error) {
 
 // Disconnect the engine's connection without and AMQP close, returns when the engine has exited.
 func (eng *Engine) Disconnect(err error) {
-	if err != nil {
-		eng.err.Set(err)
-	}
+	eng.err.Set(err)
 	eng.conn.Close()
 	<-eng.running
 }
@@ -281,8 +286,8 @@ func (eng *Engine) Run() error {
 	}()
 
 	wbuf := eng.write.buffer()[:0]
-loop:
-	for {
+
+	for eng.err.Get() == nil {
 		if len(wbuf) == 0 {
 			eng.pop(&wbuf)
 		}
@@ -309,13 +314,12 @@ loop:
 			eng.netError(err)
 		}
 		eng.process()
-		if eng.err.Get() != nil {
-			break loop
-		}
 	}
 	close(eng.write.buffers)
 	eng.conn.Close() // Make sure connection is closed
 	wait.Wait()
+	close(eng.running) // Signal goroutines have exited and Error is set.
+
 	connectionContexts.Delete(eng.connection)
 	if !eng.connection.IsNil() {
 		eng.connection.Free()
@@ -332,15 +336,10 @@ loop:
 			C.pn_handler_free(h.pn)
 		}
 	}
-	close(eng.running) // Signal goroutines have exited and Error is set.
 	return eng.err.Get()
 }
 
 func (eng *Engine) netError(err error) {
-	if err == nil {
-		err = internal.Errorf("unknown network error")
-	}
-	eng.conn.Close() // Make sure both sides are closed
 	eng.err.Set(err)
 	eng.transport.CloseHead()
 	eng.transport.CloseTail()
@@ -389,17 +388,13 @@ func (eng *Engine) handle(e Event) {
 		h.HandleEvent(e)
 	}
 	if e.Type() == ETransportClosed {
-		eng.err.Set(e.Connection().RemoteCondition().Error())
-		eng.err.Set(e.Connection().Transport().Condition().Error())
-		if eng.err.Get() == nil {
-			eng.err.Set(io.EOF)
-		}
+		eng.err.Set(io.EOF)
 	}
 }
 
 func (eng *Engine) process() {
 	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
-		eng.handle(makeEvent(ce))
+		eng.handle(makeEvent(ce, eng))
 		C.pn_collector_pop(eng.collector)
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index 8a5cbf8..53b744c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -191,7 +191,7 @@ type endpointDelegator struct {
 	remoteOpen, remoteClose, localOpen, localClose EventType
 	opening, opened, closing, closed, error        MessagingEvent
 	endpoint                                       func(Event) Endpoint
-	delegator                                      *MessagingDelegator
+	delegator                                      *MessagingAdapter
 }
 
 // HandleEvent handles an open/close event for an endpoint in a generic way.
@@ -240,10 +240,10 @@ func (d endpointDelegator) HandleEvent(e Event) {
 	}
 }
 
-// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler.
-// You can modify the exported fields before you pass the MessagingDelegator to
+// MessagingAdapter implments a EventHandler and delegates to a MessagingHandler.
+// You can modify the exported fields before you pass the MessagingAdapter to
 // a Engine.
-type MessagingDelegator struct {
+type MessagingAdapter struct {
 	mhandler                  MessagingHandler
 	connection, session, link endpointDelegator
 	flowcontroller            EventHandler
@@ -261,8 +261,8 @@ type MessagingDelegator struct {
 	PeerCloseError bool
 }
 
-func NewMessagingDelegator(h MessagingHandler) *MessagingDelegator {
-	return &MessagingDelegator{
+func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter {
+	return &MessagingAdapter{
 		mhandler:       h,
 		flowcontroller: nil,
 		AutoSettle:     true,
@@ -281,7 +281,7 @@ func handleIf(h EventHandler, e Event) {
 
 // Handle a proton event by passing the corresponding MessagingEvent(s) to
 // the MessagingHandler.
-func (d *MessagingDelegator) HandleEvent(e Event) {
+func (d *MessagingAdapter) HandleEvent(e Event) {
 	handleIf(d.flowcontroller, e)
 
 	switch e.Type() {
@@ -352,7 +352,7 @@ func (d *MessagingDelegator) HandleEvent(e Event) {
 	}
 }
 
-func (d *MessagingDelegator) incoming(e Event) (err error) {
+func (d *MessagingAdapter) incoming(e Event) (err error) {
 	delivery := e.Delivery()
 	if delivery.HasMessage() {
 		d.mhandler.HandleMessagingEvent(MMessage, e)
@@ -368,7 +368,7 @@ func (d *MessagingDelegator) incoming(e Event) (err error) {
 	return
 }
 
-func (d *MessagingDelegator) outgoing(e Event) (err error) {
+func (d *MessagingAdapter) outgoing(e Event) (err error) {
 	delivery := e.Delivery()
 	if delivery.Updated() {
 		switch delivery.Remote().Type() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 7d40890..45a6722 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -24,20 +24,20 @@ package proton
 
 //#include <proton/codec.h>
 //#include <proton/connection.h>
-//#include <proton/session.h>
-//#include <proton/session.h>
 //#include <proton/delivery.h>
-//#include <proton/link.h>
 //#include <proton/event.h>
-//#include <proton/transport.h>
 //#include <proton/link.h>
+//#include <proton/link.h>
+//#include <proton/object.h>
+//#include <proton/session.h>
+//#include <proton/transport.h>
 //#include <stdlib.h>
 import "C"
 
 import (
 	"fmt"
-	"qpid.apache.org/internal"
 	"qpid.apache.org/amqp"
+	"qpid.apache.org/internal"
 	"reflect"
 	"time"
 	"unsafe"
@@ -45,34 +45,75 @@ import (
 
 // TODO aconway 2015-05-05: Documentation for generated types.
 
+// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the
+// Go type implementing this interface. For low level, at-your-own-risk use only.
+type CHandle interface {
+	// CPtr returns the unsafe C pointer, equivalent to a C void*.
+	CPtr() unsafe.Pointer
+}
+
+// Incref increases the refcount of a proton value, which prevents the
+// underlying C struct being freed until you call Decref().
+//
+// It can be useful to "pin" a proton value in memory while it is in use by
+// goroutines other than the event loop goroutine. For example if you Incref() a
+// Link, the underlying object is not freed when the link is closed, so means
+// other goroutines can continue to safely use it as an index in a map or inject
+// it into the event loop goroutine. There will of course be an error if you try
+// to use a link after it is closed, but not a segmentation fault.
+func Incref(c CHandle) {
+	if p := c.CPtr(); p != nil {
+		C.pn_incref(p)
+	}
+}
+
+// Decref decreases the refcount of a proton value, freeing the underlying C
+// struct if this is the last reference.  Only call this if you previously
+// called Incref() for this value.
+func Decref(c CHandle) {
+	if p := c.CPtr(); p != nil {
+		C.pn_decref(p)
+	}
+}
+
 // Event is an AMQP protocol event.
 type Event struct {
 	pn         *C.pn_event_t
 	eventType  EventType
 	connection Connection
+	transport  Transport
 	session    Session
 	link       Link
 	delivery   Delivery
+	injecter   Injecter
 }
 
-func makeEvent(pn *C.pn_event_t) Event {
+func makeEvent(pn *C.pn_event_t, injecter Injecter) Event {
 	return Event{
 		pn:         pn,
 		eventType:  EventType(C.pn_event_type(pn)),
 		connection: Connection{C.pn_event_connection(pn)},
+		transport:  Transport{C.pn_event_transport(pn)},
 		session:    Session{C.pn_event_session(pn)},
 		link:       Link{C.pn_event_link(pn)},
 		delivery:   Delivery{C.pn_event_delivery(pn)},
+		injecter:   injecter,
 	}
 }
 func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
 func (e Event) Type() EventType        { return e.eventType }
 func (e Event) Connection() Connection { return e.connection }
+func (e Event) Transport() Transport   { return e.transport }
 func (e Event) Session() Session       { return e.session }
 func (e Event) Link() Link             { return e.link }
 func (e Event) Delivery() Delivery     { return e.delivery }
 func (e Event) String() string         { return e.Type().String() }
 
+// Injecter should not be used in a handler function, but it can be passed to
+// other goroutines (via a channel or to a goroutine started by handler
+// functions) to let them inject functions back into the handlers goroutine.
+func (e Event) Injecter() Injecter { return e.injecter }
+
 // Data holds a pointer to decoded AMQP data.
 // Use amqp.marshal/unmarshal to access it as Go data types.
 //
@@ -132,12 +173,14 @@ type Endpoint interface {
 	String() string
 }
 
-// CloseError sets an error condition on an endpoint and closes the endpoint.
+// CloseError sets an error condition on an endpoint and closes the endpoint (if not already closed)
 func CloseError(e Endpoint, err error) {
 	if err != nil {
 		e.Condition().SetError(err)
 	}
-	e.Close()
+	if e.State().RemoteActive() {
+		e.Close()
+	}
 }
 
 // EndpointError returns the remote error if there is one, the local error if not
@@ -204,6 +247,20 @@ func (l Link) Delivery(tag string) Delivery {
 	return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
 }
 
+func (l Link) Connection() Connection { return l.Session().Connection() }
+
+// Human-readable link description including name, source, target and direction.
+func (l Link) String() string {
+	switch {
+	case l.IsNil():
+		return fmt.Sprintf("<nil-link>")
+	case l.IsSender():
+		return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
+	default:
+		return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
+	}
+}
+
 func cPtr(b []byte) *C.char {
 	if len(b) == 0 {
 		return nil
@@ -229,20 +286,11 @@ func (s Session) Receiver(name string) Link {
 
 // Context information per connection.
 type connectionContext struct {
-	injecter Injecter
-	str      string
+	str string
 }
 
 var connectionContexts = internal.MakeSafeMap()
 
-// Injecter for event-loop associated with this connection.
-func (c Connection) Injecter() Injecter {
-	if cc, ok := connectionContexts.Get(c).(connectionContext); ok {
-		return cc.injecter
-	}
-	return nil
-}
-
 // Unique (per process) string identifier for a connection, useful for debugging.
 func (c Connection) String() string {
 	if cc, ok := connectionContexts.Get(c).(connectionContext); ok {
@@ -279,20 +327,6 @@ func (s Session) String() string {
 	return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
 }
 
-func (l Link) Connection() Connection { return l.Session().Connection() }
-
-// Human-readable link description including name, source, target and direction.
-func (l Link) String() string {
-	switch {
-	case l.IsNil():
-		return fmt.Sprintf("<nil-link>")
-	case l.IsSender():
-		return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
-	default:
-		return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
-	}
-}
-
 // Error returns an instance of amqp.Error or nil.
 func (c Condition) Error() error {
 	if c.IsNil() || !c.IsSet() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
index ac28b32..ef09837 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
@@ -48,7 +48,7 @@ public abstract class Terminus
         _dynamic = other._dynamic;
         if (other._dynamicNodeProperties != null) {
             // TODO: Do we need to copy or can we make a simple reference?
-            _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // FIXME
+            _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // yFIXME
         }
         if (other._capabilities != null) {
             _capabilities = other._capabilities.clone(); // FIXME?


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message