qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-827: go binding: partial implementation of concurrent messaging API with examples.
Date Tue, 05 May 2015 23:26:27 GMT
PROTON-827: go binding: partial implementation of concurrent messaging API with examples.

Please see proton-c/bindings/go/README.md for details of this update.

Implemented a good chunk of the concurrent Go messaging API with send.go and receive.go examples.
The examples work with event/broker.go (which uses the event API) and with the python
broker.py, simple_send.py and simple_recv.py examples.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0c11d11c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0c11d11c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0c11d11c

Branch: refs/heads/master
Commit: 0c11d11cdb5f1185219b907d1ad9ba22451512cc
Parents: 8d1d20e
Author: Alan Conway <aconway@redhat.com>
Authored: Mon Apr 27 16:09:26 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Tue May 5 19:23:21 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 examples/go/event/broker.go                     | 188 +++---
 examples/go/example.go                          |  54 --
 examples/go/receive.go                          |  99 +++-
 examples/go/send.go                             | 115 +++-
 proton-c/bindings/go/README.md                  | 106 ++--
 proton-c/bindings/go/src/Makefile               |  17 +-
 proton-c/bindings/go/src/genwrap.go             |  43 +-
 .../go/src/qpid.apache.org/proton/doc.go        |  18 +-
 .../go/src/qpid.apache.org/proton/dummy.go      |  82 ---
 .../go/src/qpid.apache.org/proton/error.go      | 111 ----
 .../go/src/qpid.apache.org/proton/event/doc.go  |  12 +-
 .../src/qpid.apache.org/proton/event/error.go   |  77 ---
 .../qpid.apache.org/proton/event/handlers.go    | 145 +++--
 .../src/qpid.apache.org/proton/event/message.go |  75 +++
 .../go/src/qpid.apache.org/proton/event/pump.go | 347 +++++++----
 .../qpid.apache.org/proton/event/wrappers.go    | 115 +++-
 .../proton/event/wrappers_gen.go                | 584 +++++++++++++------
 .../qpid.apache.org/proton/internal/error.go    | 125 ++++
 .../src/qpid.apache.org/proton/interop_test.go  |   8 +-
 .../go/src/qpid.apache.org/proton/marshal.go    |  15 +-
 .../go/src/qpid.apache.org/proton/message.go    |  66 +--
 .../src/qpid.apache.org/proton/messaging/doc.go |  28 +
 .../proton/messaging/example_test.go            | 268 +++++++++
 .../qpid.apache.org/proton/messaging/handler.go |  70 +++
 .../proton/messaging/messaging.go               | 250 ++++++++
 .../go/src/qpid.apache.org/proton/uid.go        |  40 ++
 .../go/src/qpid.apache.org/proton/unmarshal.go  |  30 +-
 .../go/src/qpid.apache.org/proton/url.go        |   5 +-
 proton-c/bindings/python/proton/handlers.py     |   2 +-
 30 files changed, 2158 insertions(+), 940 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2af3e68..2b509c6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,6 @@ eclipse-classes
 
 # The usual location for proton-c build files
 proton-c/build
+
+# Executables built by go binding tests
+proton-c/bindings/go/bin

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/event/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go
index 086043f..8b227bc 100644
--- a/examples/go/event/broker.go
+++ b/examples/go/event/broker.go
@@ -36,15 +36,29 @@ import (
 	"log"
 	"net"
 	"os"
+	"path"
 	"qpid.apache.org/proton"
 	"qpid.apache.org/proton/event"
+	"sync"
 )
 
-// panicIf is simplistic error handling for example code, not recommended practice.
-func panicIf(err error) {
-	if err != nil {
-		panic(err)
+// Command-line flags
+var addr = flag.String("addr", ":amqp", "Listening address")
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
+var full = flag.Bool("full", false, "Print full message not just body.")
+
+func main() {
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+		flag.PrintDefaults()
 	}
+	flag.Parse()
+	b := newBroker()
+	err := b.listen(*addr)
+	fatalIf(err)
 }
 
 // queue is a structure representing a queue.
@@ -54,18 +68,20 @@ type queue struct {
 	consumers map[event.Link]bool // Set of consumer links
 }
 
-func newQueue(name string) *queue {
-	debug.Printf("Create queue %s\n", name)
-	return &queue{name, list.New(), make(map[event.Link]bool)}
+type logLink event.Link // Wrapper to print links in format for logging
+
+func (ll logLink) String() string {
+	l := event.Link(ll)
+	return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
 }
 
 func (q *queue) subscribe(link event.Link) {
-	debug.Printf("Subscribe to %s\n", q.name)
+	debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
 	q.consumers[link] = true
 }
 
 func (q *queue) unsubscribe(link event.Link) {
-	debug.Printf("Unsubscribe from %s\n", q.name)
+	debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name)
 	delete(q.consumers, link)
 }
 
@@ -73,30 +89,37 @@ func (q *queue) empty() bool {
 	return len(q.consumers) == 0 && q.messages.Len() == 0
 }
 
-func (q *queue) publish(message proton.Message) {
-	debug.Printf("Push to %s: %#v\n", q.name, message)
+func (q *queue) push(context *event.Pump, message proton.Message) {
 	q.messages.PushBack(message)
-	q.dispatch()
+	q.pop(context)
 }
 
-func (q *queue) dispatchTo(link event.Link) bool {
+func (q *queue) popTo(context *event.Pump, link event.Link) bool {
 	if q.messages.Len() != 0 && link.Credit() > 0 {
-		message := q.messages.Front().Value.(proton.Message)
-		debug.Printf("Pop from %s: %#v\n", q.name, message)
+		message := q.messages.Remove(q.messages.Front()).(proton.Message)
+		debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message})
 		// The first return parameter is an event.Delivery.
 		// The Deliver can be used to track message status, e.g. so we can re-delver on failure.
 		// This demo broker doesn't do that.
-		_, err := message.Send(link)
-		panicIf(err)
-		q.messages.Remove(q.messages.Front())
+		linkPump := link.Session().Connection().Pump()
+		if context == linkPump {
+			if context == nil {
+				log.Fatal("pop in nil context")
+			}
+			link.Send(message) // link is in the current pump, safe to call Send() direct
+		} else {
+			linkPump.Inject <- func() { // Inject to link's pump
+				link.Send(message) // FIXME aconway 2015-05-04: error handlig
+			}
+		}
 		return true
 	}
 	return false
 }
 
-func (q *queue) dispatch() (dispatched bool) {
+func (q *queue) pop(context *event.Pump) (popped bool) {
 	for c, _ := range q.consumers {
-		dispatched = dispatched || q.dispatchTo(c)
+		popped = popped || q.popTo(context, c)
 	}
 	return
 }
@@ -104,54 +127,79 @@ func (q *queue) dispatch() (dispatched bool) {
 // broker implements event.MessagingHandler and reacts to events by moving messages on or off queues.
 type broker struct {
 	queues map[string]*queue
-	pumps  map[*event.Pump]struct{} // Set of running event pumps (i.e. connections)
+	lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming...
 }
 
 func newBroker() *broker {
-	return &broker{queues: make(map[string]*queue), pumps: make(map[*event.Pump]struct{})}
+	return &broker{queues: make(map[string]*queue)}
 }
 
 func (b *broker) getQueue(name string) *queue {
 	q := b.queues[name]
 	if q == nil {
-		q = newQueue(name)
+		debug.Printf("Create queue %s", name)
+		q = &queue{name, list.New(), make(map[event.Link]bool)}
 		b.queues[name] = q
 	}
 	return q
 }
 
+func (b *broker) unsubscribe(l event.Link) {
+	if l.IsSender() {
+		q := b.queues[l.RemoteSource().Address()]
+		if q != nil {
+			q.unsubscribe(l)
+			if q.empty() {
+				debug.Printf("Delete queue %s", q.name)
+				delete(b.queues, q.name)
+			}
+		}
+	}
+}
+
 func (b *broker) Handle(t event.MessagingEventType, e event.Event) error {
+	// FIXME aconway 2015-05-04: locking is un-golike, better example coming soon.
+	// Needed because Handle is called for multiple connections concurrently
+	// and the queue data structures are not thread safe.
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
 	switch t {
 
 	case event.MLinkOpening:
 		if e.Link().IsSender() {
-			// FIXME aconway 2015-03-23: handle dynamic consumers
-			b.getQueue(e.Link().RemoteSource().Address()).subscribe(e.Link())
+			q := b.getQueue(e.Link().RemoteSource().Address())
+			q.subscribe(e.Link())
 		}
 
 	case event.MLinkClosing:
-		if e.Link().IsSender() {
-			q := b.getQueue(e.Link().RemoteSource().Address())
-			q.unsubscribe(e.Link())
-			if q.empty() {
-				delete(b.queues, q.name)
-			}
+		b.unsubscribe(e.Link())
+
+	case event.MDisconnected:
+		fallthrough
+	case event.MConnectionClosing:
+		c := e.Connection()
+		for l := c.LinkHead(event.SRemoteActive); !l.IsNil(); l = l.Next(event.SRemoteActive) {
+			b.unsubscribe(l)
 		}
 
 	case event.MSendable:
-		b.getQueue(e.Link().RemoteSource().Address()).dispatchTo(e.Link())
+		q := b.getQueue(e.Link().RemoteSource().Address())
+		q.popTo(e.Connection().Pump(), e.Link())
 
 	case event.MMessage:
-		m, err := proton.EventMessage(e)
-		panicIf(err)
-		b.getQueue(e.Link().RemoteTarget().Address()).publish(m)
+		m, err := event.DecodeMessage(e)
+		fatalIf(err)
+		qname := e.Link().RemoteTarget().Address()
+		debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m})
+		b.getQueue(qname).push(e.Connection().Pump(), m)
 	}
 	return nil
 }
 
 func (b *broker) listen(addr string) (err error) {
 	// Use the standard Go "net" package to listen for connections.
-	info.Printf("Listening on %s\n", addr)
+	info.Printf("Listening on %s", addr)
 	listener, err := net.Listen("tcp", addr)
 	if err != nil {
 		return err
@@ -160,46 +208,56 @@ func (b *broker) listen(addr string) (err error) {
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
-			info.Printf("Accept error: %s\n", err)
+			info.Printf("Accept error: %s", err)
 			continue
 		}
-		info.Printf("Accepted connection %s<-%s\n", conn.LocalAddr(), conn.RemoteAddr())
 		pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
-		panicIf(err)
+		fatalIf(err)
+		info.Printf("Accepted %s[%p]", pump, pump)
 		pump.Server()
-		b.pumps[pump] = struct{}{}
-		go pump.Run()
+		go func() {
+			pump.Run()
+			if pump.Error == nil {
+				info.Printf("Closed %s", pump)
+			} else {
+				info.Printf("Closed %s: %s", pump, pump.Error)
+			}
+		}()
 	}
 }
 
-var addr = flag.String("addr", ":amqp", "Listening address")
-var quiet = flag.Bool("quiet", false, "Don't print informational messages")
-var debugFlag = flag.Bool("debug", false, "Print debugging messages")
-var info, debug *log.Logger
-
-func output(enable bool) io.Writer {
-	if enable {
-		return os.Stdout
-	} else {
-		return ioutil.Discard
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
 	}
+	return log.New(ioutil.Discard, "", 0)
 }
 
-func main() {
-	flag.Usage = func() {
-		fmt.Fprintf(os.Stderr, `
-Usage: %s [queue ...]
-A simple broker. Queues are created automatically for sender or receiver addrsses.
-`, os.Args[0])
-		flag.PrintDefaults()
-	}
+var info, debug *log.Logger
+
+func init() {
 	flag.Parse()
-	debug = log.New(output(*debugFlag), "debug: ", log.Ltime)
-	info = log.New(output(!*quiet), "info: ", log.Ltime)
-	b := newBroker()
-	err := b.listen(*addr)
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
 	if err != nil {
-		fmt.Println(err)
-		os.Exit(1)
+		log.Fatal(err)
+	}
+}
+
+type formatMessage struct{ m proton.Message }
+
+func (fm formatMessage) String() string {
+	if *full {
+		return fmt.Sprintf("%#v", fm.m)
+	} else {
+		return fmt.Sprintf("%#v", fm.m.Body())
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/example.go
----------------------------------------------------------------------
diff --git a/examples/go/example.go b/examples/go/example.go
deleted file mode 100644
index 08ecfeb..0000000
--- a/examples/go/example.go
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
-	"fmt"
-	"qpid.apache.org/proton"
-	"sync"
-)
-
-func receive(c proton.Connection, addr string, wait *sync.WaitGroup) {
-	defer wait.Done()
-	r := c.Receiver(addr)
-	defer r.Close()
-	for m := range r.Receive { // r.Receive is a chan Message
-		fmt.Println("received: ", addr, m.Body(), m.Subject())
-		if m.Subject() == "stop" {
-			return
-		}
-	}
-}
-
-func main() {
-	var c1, c2 proton.Connection
-	c1.Open("amqp://foo:amqp")
-	defer c1.Close()
-	c2.Open("amqp://localhost:4567")
-	defer c2.Close()
-
-	var wait sync.WaitGroup
-	wait.Add(2)
-
-	go receive(c1, "foo", &wait)
-	go receive(c2, "bar", &wait)
-
-	wait.Wait()
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
index 231e0ce..fc1c85a 100644
--- a/examples/go/receive.go
+++ b/examples/go/receive.go
@@ -22,25 +22,24 @@ package main
 import (
 	"flag"
 	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
 	"math"
 	"net"
 	"os"
+	"path"
 	"qpid.apache.org/proton"
+	"qpid.apache.org/proton/messaging"
 	"sync"
 	"time"
 )
 
-// Simplistic error handling for demo. Not recommended.
-func panicIf(err error) {
-	if err != nil {
-		panic(err)
-	}
-}
-
 // Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
 var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.")
 var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.")
-var short = flag.Bool("short", false, "Short format of message: body only")
+var full = flag.Bool("full", false, "Print full message not just body.")
 
 func main() {
 	// Parse flags and arguments, print usage message on error.
@@ -76,29 +75,44 @@ Receive messages from all the listed URLs concurrently and print them.
 
 	wait.Add(len(urls)) // Wait for one goroutine per URL.
 
-	for _, urlStr := range urls {
-		// Start a goroutine to receive from urlStr
+	// Arrange to close all connections on exit
+	connections := make([]*messaging.Connection, len(urls))
+	defer func() {
+		for _, c := range connections {
+			if c != nil {
+				c.Close()
+			}
+		}
+	}()
+
+	for i, urlStr := range urls {
+		debug.Printf("Connecting to %s", urlStr)
 		go func(urlStr string) {
 			defer wait.Done()                   // Notify main() that this goroutine is done.
 			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			panicIf(err)
+			fatalIf(err)
 
 			// Open a standard Go net.Conn for the AMQP connection
 			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			panicIf(err)
-			defer conn.Close() // Close conn on goroutine exit.
+			fatalIf(err)
 
-			pc, err := proton.Connect(conn) // This is our AMQP connection.
-			panicIf(err)
-			// We could 'defer pc.Close()' but conn.close() will automatically close the proton connection.
+			pc, err := messaging.Connect(conn) // This is our AMQP connection.
+			fatalIf(err)
+			connections[i] = pc
 
 			// For convenience a proton.Connection provides a DefaultSession()
 			// pc.Receiver() is equivalent to pc.DefaultSession().Receiver()
 			r, err := pc.Receiver(url.Path)
-			panicIf(err)
+			fatalIf(err)
 
-			for m := range r.Receive { // r.Receive is a channel to receive messages.
-				select {
+			for {
+				var m proton.Message
+				select { // Receive a message or stop.
+				case m = <-r.Receive:
+				case <-stop: // The program is stopping.
+					return
+				}
+				select { // Send m to main() or stop
 				case messages <- m: // Send m to main()
 				case <-stop: // The program is stopping.
 					return
@@ -106,24 +120,57 @@ Receive messages from all the listed URLs concurrently and print them.
 			}
 		}(urlStr)
 	}
+	info.Printf("Listening")
 
 	// time.After() returns a channel that will close when the timeout is up.
 	timer := time.After(duration)
 
 	// main() prints each message and checks for count or timeout being exceeded.
-	for i := *count; i > 0; i-- {
+	for i := int64(0); i < *count; i++ {
 		select {
 		case m := <-messages:
-			if *short {
-				fmt.Println(m.Body())
-			} else {
-				fmt.Printf("%#v\n\n", m)
-			}
+			debug.Print(formatMessage{m})
 		case <-timer: // Timeout has expired
 			i = 0
 		}
 	}
-
+	info.Printf("Received %d messages", *count)
 	close(stop) // Signal all goroutines to stop.
 	wait.Wait() // Wait for all goroutines to finish.
 }
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
+	}
+	return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+	flag.Parse()
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)                                               // Use default logger for errors.
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+type formatMessage struct{ m proton.Message }
+
+func (fm formatMessage) String() string {
+	if *full {
+		return fmt.Sprintf("%#v", fm.m)
+	} else {
+		return fmt.Sprintf("%#v", fm.m.Body())
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
index 4a7f947..3c18466 100644
--- a/examples/go/send.go
+++ b/examples/go/send.go
@@ -22,23 +22,28 @@ package main
 import (
 	"flag"
 	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
 	"math"
 	"net"
 	"os"
+	"path"
 	"qpid.apache.org/proton"
+	"qpid.apache.org/proton/messaging"
 	"sync"
 )
 
-// Simplistic error handling for demo. Not recommended.
-func panicIf(err error) {
-	if err != nil {
-		panic(err)
-	}
-}
-
 // Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
 var count = flag.Int64("count", 0, "Send this may messages per address. 0 means unlimited.")
 
+// Ack associates an info string with an acknowledgement
+type Ack struct {
+	ack  messaging.Acknowledgement
+	info string
+}
+
 func main() {
 	// Parse flags and arguments, print usage message on error.
 	flag.Usage = func() {
@@ -53,45 +58,101 @@ To each URL, send the string "path-n" where n is the message number.
 	urls := flag.Args() // Non-flag arguments are URLs to receive from
 	if len(urls) == 0 {
 		flag.Usage()
-		fmt.Fprintf(os.Stderr, "No URL provided")
+		fmt.Fprintf(os.Stderr, "No URL provided\n")
 		os.Exit(1)
 	}
 	if *count == 0 {
 		*count = math.MaxInt64
 	}
 
+	// Create a channel to receive all the acknowledgements
+	acks := make(chan Ack)
+
 	// Create a goroutine for each URL that sends messages.
 	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
 	wait.Add(len(urls))     // Wait for one goroutine per URL.
 
-	for _, urlStr := range urls {
-		// Start a goroutine to receive from urlStr
-		go func(urlStr string) {
-			defer wait.Done()                   // Notify main() that this goroutine is done.
-			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			panicIf(err)
+	// Arrange to close all connections on exit
+	connections := make([]*messaging.Connection, len(urls))
+	defer func() {
+		for _, c := range connections {
+			c.Close()
+		}
+	}()
+
+	for i, urlStr := range urls {
+		url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+		fatalIf(err)
+		debug.Printf("Connecting to %v", url)
 
-			// Open a standard Go net.Conn for the AMQP connection
-			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			panicIf(err)
-			defer conn.Close() // Close conn on goroutine exit.
+		// Open a standard Go net.Conn for the AMQP connection
+		conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+		fatalIf(err)
 
-			pc, err := proton.Connect(conn) // This is our AMQP connection.
-			panicIf(err)
-			// We could 'defer pc.Close()' but conn.close() will automatically close the proton connection.
+		pc, err := messaging.Connect(conn) // This is our AMQP connection using conn.
+		fatalIf(err)
+		connections[i] = pc
 
-			// For convenience a proton.Connection provides a DefaultSession()
-			// pc.Sender() is equivalent to pc.DefaultSession().Sender()
+		// Start a goroutine to send to urlStr
+		go func(urlStr string) {
+			defer wait.Done() // Notify main() that this goroutine is done.
+
+			// FIXME aconway 2015-04-29: sessions, default sessions, senders...
+			// Create a sender using the path of the URL as the AMQP target address
 			s, err := pc.Sender(url.Path)
-			panicIf(err)
+			fatalIf(err)
 
 			for i := int64(0); i < *count; i++ {
 				m := proton.NewMessage()
-				m.SetBody(fmt.Sprintf("%v-%v", url.Path, i))
-				err := s.Send(m)
-				panicIf(err)
+				body := fmt.Sprintf("%v-%v", url.Path, i)
+				m.SetBody(body)
+				ack, err := s.Send(m)
+				fatalIf(err)
+				acks <- Ack{ack, body}
 			}
 		}(urlStr)
 	}
+
+	// Wait for all the acknowledgements
+	expect := int(*count) * len(urls)
+	debug.Printf("Started senders, expect %v acknowledgements", expect)
+	for i := 0; i < expect; i++ {
+		ack, ok := <-acks
+		if !ok {
+			info.Fatalf("acks channel closed after only %d acks\n", i)
+		}
+		d := <-ack.ack
+		debug.Printf("acknowledgement[%v] %v", i, ack.info)
+		if d != messaging.Accepted {
+			info.Printf("Unexpected disposition %v", d)
+		}
+	}
+	info.Printf("Received all %v acknowledgements", expect)
 	wait.Wait() // Wait for all goroutines to finish.
 }
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
+	}
+	return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+	flag.Parse()
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)                                               // Use default logger for errors.
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
index fc04d79..597481c 100644
--- a/proton-c/bindings/go/README.md
+++ b/proton-c/bindings/go/README.md
@@ -30,55 +30,33 @@ There are two types of developer we want to support
 
 ## Status
 
-There are two Go modules so far. See the documentation using
+Package proton encodes and decodes AMQP messages and data as Go types.
 
-    godoc apache.org/proton
-    godoc apache.org/proton/event
-
-The proton module maps between AMQP and Go types and has a Go representation of
-an AMQP message. It is the beginning of the "real" Go API. For examples of what
-this API will look like see:
-
-- [receive.go](../../../examples/go/receive.go) uses channels and goroutines to receive concurrently.
-- [send.go](../../../examples/go/send.go) less interesting but there for symmetry.
+Sub-packages 'event' and 'messaging' provide two alternative ways to write
+AMQP clients and servers. 'messaging' is easier for general purpose use. 'event'
+gives complete low-level control of the underlying proton C engine.
 
-The event module is a port of the proton C and python MessagingHandler APIs. It
-provides low-level, goroutine-unsafe but (mostly) complete access to proton. It
-is the foundation for building the Go API and may be useful for advanced AMQP
-projects or cross-langauge proton development in future.
+The event package is fairly complete, with the exception of the proton
+reactor. It's unclear if the reactor is important for go.
 
-The event API is functional but not completely complete. The Go API doesn't
-exist yet, there is some dummy code so the examples will compile and run.
-
-## The event driven API
+The messaging package is just starting. The examples work but anything else might not.
 
-The event module contains
+### Examples
 
-- Go Proton events (AMQP events only, no reactor events yet)
-- Go MessagingHandler events (equivalent to python MessagingHandler.)
-- Pump to feed data between a Go net.Conn connection and a proton event loop.
+messaging API:
 
-The Pump uses 3 goroutines per connection, one to read, one to write and one to
-run the proton event loop. Proton's thread-unsafe data is never used outside the
-event loop goroutine.
+- [receive.go](../../../examples/go/receive.go) receive from many connections concurrently
+- [send.go](../../../examples/go/send.go) send to many connections concurrently
 
-This API provides direct access to proton events, equivalent to C or python
-event API. It does not yet support reactor events or allow multiple connections
-to be handled in a single event loop goroutine, these are temporary limitations.
+event API:
+- [broker.go](../../../examples/go/event/broker.go) simple mini-broker
 
-There is one example: examples/go/broker.go. It is a port of
-examples/python/broker.py and can be used with the python `simple_send.py` and
-`simple_recv.py` clients.
+The examples work with each other and with the python `broker.py`,
+`simple_send.py` and `simple_receive.py`.
 
-The broker example works for simple tests but is concurrency-unsafe. It uses a
-single `broker`, implementing MessagingHandler, with multiple pumps. The proton
-event loops are safe in their separate goroutines but the `broker` state (queues
-etc.) is not. We can fix this by funneling multiple connections into a single
-event loop as mentioned above.
+## The event driven API
 
-However this API is not the end of the story. It will be the foundation to build
-a more Go-like API that allows *any* goroutine to send or receive messages
-without having to know anything about event loops or pumps.
+See the package documentation for details.
 
 ## The Go API
 
@@ -87,15 +65,49 @@ AMQP messages and other information (acknowledgments, flow control instructions
 etc.) using channels. There will be no user-visible locks and no need to run
 user code in special goroutines, e.g. as handlers in a proton event loop.
 
-There is a (trivial, speculative, incomplete) example in examples/go/example.go
-of what part of it might look like. It shows receiving messages concurrently
-from two different connections in a single goroutine (it omits sessions). 
+See the package documentation for emerging details.
+
+Currently using a channel to receive messages, a function to send them (channels
+internally) and a channel as a "future" for acknowledgements to senders. This
+may change.
+
+## Design Questions
+
+
+1. Error reporting and handling, esp. async. errors:
+
+What are common patterns for handling errors across channels?  I.e. the thing at
+one end of the channel blows up, how do you tell the other end?
+
+readers: you can close the channel, but there's no error info. You could pass a
+struct { data, error } or use a second channel. Pros & cons?
+
+writers: you can't close without a panic so you need a second channel.  Is this
+a normal pattern:
+
+    select {
+        data -> sendChan: sentit()
+        err := <- errChan: oops(err)
+    }
+
+2. Use of channels:
+
+I recently saw an interesting Go tip: "Make your API synchronous because in Go
+it is simple to make a sync call async by putting it in a goroutine."
+
+What are the tradeoffs of exposing channels directly in the API vs. hiding them
+behind methods? Exposing lets users select directly, less overhead than starting
+a goroutine, creating MORE channels and selecting those. Hiding lets us wrap
+patterns like the 'select {data, err}' pattern above, which is easier and less
+error prone than asking users to do it themselves.
+
+The standard net.Conn uses blocking methods, not channels. I did as the tip says
+and wrapped them in goroutines and channels. The library does expose *read*
+channels e.g. time.After. Are there any *write* channels in the standard
+library? I note that time.After has no errors, and suspect that may be a key
+factor in the descison.
 
-There is a tempting analogy between Go channels and AMQP links, but Go channels
-are much simpler beasts than AMQP links. It is likely a link will be implemented
-by a Go type that uses more than one channel. E.g. there will probably be
-separate channels for messages and acknowledgments, perhaps also for flow
-control status.
+3. The "future" pattern for acknowledgements: super easy in Go but depends on 1. and 2. above.
 
 ## Why a separate API for Go?
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/Makefile
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/Makefile b/proton-c/bindings/go/src/Makefile
index bbcaded..98baa4c 100644
--- a/proton-c/bindings/go/src/Makefile
+++ b/proton-c/bindings/go/src/Makefile
@@ -6,20 +6,11 @@
 GENERATED=qpid.apache.org/proton/event/wrappers_gen.go
 
 test: $(GENERATED)
-	go test $(GOFLAGS) qpid.apache.org/proton
-	go test $(GOFLAGS) qpid.apache.org/proton/event
+	go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton
+	go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/event
+	go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/messaging
 
-rebuild: $(GENERATED)
-	go build -a $(GOFLAGS) qpid.apache.org/proton
-	go build -a $(GOFLAGS) qpid.apache.org/proton/event
-	go test $(GOFLAGS) -c -a qpid.apache.org/proton
-	go test $(GOFLAGS) -c -a qpid.apache.org/proton/event
-
-qpid.apache.org/proton/event/wrappers_gen.go: genwrap.go ../../../include/proton/*.h
+$(GENERATED): genwrap.go ../../../include/proton/*.h
 	go run genwrap.go
 
-broker: test
-	go build $(GOFLAGS) ~/proton/examples/go/event/broker.go
-	./broker
-
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go
index 83eb34c..e269367 100644
--- a/proton-c/bindings/go/src/genwrap.go
+++ b/proton-c/bindings/go/src/genwrap.go
@@ -44,7 +44,11 @@ func mixedCase(s string) string {
 	return result
 }
 
-var templateFuncs = template.FuncMap{"mixedCase": mixedCase}
+func mixedCaseTrim(s, prefix string) string {
+	return mixedCase(strings.TrimPrefix(s, prefix))
+}
+
+var templateFuncs = template.FuncMap{"mixedCase": mixedCase, "mixedCaseTrim": mixedCaseTrim}
 
 func doTemplate(out io.Writer, data interface{}, tmpl string) {
 	panicIf(template.Must(template.New("").Funcs(templateFuncs).Parse(tmpl)).Execute(out, data))
@@ -73,7 +77,7 @@ const ({{range $values}}
 func (e {{mixedCase $enumName}}) String() string {
 	switch e {
 {{range $values}}
-	case C.{{.}}: return "{{mixedCase .}}" {{end}}
+	case C.{{.}}: return "{{mixedCaseTrim . "PN_"}}" {{end}}
 	}
 	return "unknown"
 }
@@ -131,7 +135,7 @@ type eventType struct {
 func newEventType(cName string) eventType {
 	var etype eventType
 	etype.Cname = cName
-	etype.Name = mixedCase(strings.TrimPrefix(cName, "PN_"))
+	etype.Name = mixedCaseTrim(cName, "PN_")
 	etype.Fname = "On" + etype.Name
 	etype.Iname = etype.Fname + "Interface"
 	return etype
@@ -141,7 +145,7 @@ var (
 	enumDefRe   = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;")
 	enumValRe   = regexp.MustCompile("PN_[A-Z_]+")
 	skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER")
-	skipFnRe    = regexp.MustCompile("attach|context|class|collect|^recv$|^send$")
+	skipFnRe    = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport")
 )
 
 // Generate event wrappers.
@@ -183,6 +187,7 @@ type genType struct {
 	Ctype, Gotype string
 	ToGo          func(value string) string
 	ToC           func(value string) string
+	Assign        func(value string) string
 }
 
 func (g genType) printBody(out io.Writer, value string) {
@@ -237,19 +242,24 @@ func mapType(ctype string) (g genType) {
 	case "C.uint64_t":
 		g.Gotype = "uint64"
 	case "C.uint32_t":
+		g.Gotype = "uint16"
+	case "C.uint16_t":
 		g.Gotype = "uint32"
 	case "C.const char *":
-		g.Gotype = "string"
-		g.Ctype = "C.CString"
+		fallthrough
 	case "C.char *":
 		g.Gotype = "string"
 		g.Ctype = "C.CString"
+		g.ToC = func(v string) string { return fmt.Sprintf("%sC", v) }
+		g.Assign = func(v string) string {
+			return fmt.Sprintf("%sC := C.CString(%s)\n defer C.free(unsafe.Pointer(%sC))\n", v, v, v)
+		}
 	case "C.pn_seconds_t":
 		g.Gotype = "time.Duration"
 		g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) }
 	case "C.pn_error_t *":
 		g.Gotype = "error"
-		g.ToGo = func(v string) string { return fmt.Sprintf("pnError(%s)", v) }
+		g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) }
 	default:
 		pnId := regexp.MustCompile(" *pn_([a-z_]+)_t *\\*? *")
 		match := pnId.FindStringSubmatch(g.Ctype)
@@ -285,6 +295,7 @@ func splitArgs(argstr string) []genArg {
 	}
 	args := make([]genArg, 0)
 	for _, item := range strings.Split(argstr, ",") {
+		item = strings.Trim(item, " \n")
 		typeName := typeNameRe.FindStringSubmatch(item)
 		if typeName == nil {
 			panic(fmt.Errorf("Can't split argument type/name %#v", item))
@@ -318,6 +329,16 @@ func cArgs(args []genArg) string {
 	return l
 }
 
+func cAssigns(args []genArg) string {
+	l := "\n"
+	for _, arg := range args {
+		if arg.Assign != nil {
+			l += fmt.Sprintf("%s\n", arg.Assign(arg.Name))
+		}
+	}
+	return l
+}
+
 // Return the go name of the function or "" to skip the function.
 func goFnName(api, fname string) string {
 	// Skip class, context and attachment functions.
@@ -328,13 +349,13 @@ func goFnName(api, fname string) string {
 	case "link.get_drain":
 		return "IsDrain"
 	default:
-		return mixedCase(strings.TrimPrefix(fname, "get_"))
+		return mixedCaseTrim(fname, "get_")
 	}
 }
 
 func apiWrapFns(api, header string, out io.Writer) {
 	fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api)
-	fmt.Fprintf(out, "func (%c %s) isNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0])
+	fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0])
 	fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api))
 	for _, m := range fn.FindAllStringSubmatch(header, -1) {
 		rtype, fname, argstr := mapType(m[1]), m[2], m[3]
@@ -345,6 +366,7 @@ func apiWrapFns(api, header string, out io.Writer) {
 		args := splitArgs(argstr)
 		fmt.Fprintf(out, "func (%c %s) %s", api[0], mixedCase(api), gname)
 		fmt.Fprintf(out, "(%s) %s { ", goArgs(args), rtype.Gotype)
+		fmt.Fprint(out, cAssigns(args))
 		rtype.printBody(out, fmt.Sprintf("C.pn_%s_%s(%c.pn%s)", api, fname, api[0], cArgs(args)))
 		fmt.Fprintf(out, "}\n")
 	}
@@ -363,10 +385,13 @@ package event
 
 import (
 	"time"
+  "unsafe"
+  "qpid.apache.org/proton/internal"
 )
 
 // #include <proton/types.h>
 // #include <proton/event.h>
+// #include <stdlib.h>
 `)
 	for _, api := range apis {
 		fmt.Fprintf(out, "// #include <proton/%s.h>\n", api)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
index 40e1f7c..38c2d00 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -18,15 +18,17 @@ under the License.
 */
 
 /*
-Package proton is a Go binding for the proton AMQP protocol engine.
+Package proton encodes and decodes AMQP messages and data as Go types.
 
-It alows you to construct and parse AMQP messages, and to implement AMQP
-clients, servers and intermediaries that can exchange messages with any
-AMQP 1.0 compliant endpoint.
+It follows the standard 'encoding' libraries pattern. The mapping between AMQP
+and Go types is described in the documentation of the Marshal and Unmarshal
+functions.
 
-Encoding and decoding AMQP data follows the pattern of the standard
-encoding/json and encoding/xml packages.The mapping between AMQP and Go types is
-described in the documentation of the Marshal and Unmarshal functions.
+The sub-packages 'event' and 'messaging' provide two alternative ways to write
+AMQP clients and servers. 'messaging' is easier for general purpose use. 'event'
+gives complete low-level control of the underlying proton C engine.
+
+AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
 */
 package proton
 
@@ -34,3 +36,5 @@ package proton
 import "C"
 
 // This file is just for the package comment.
+
+// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go b/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
deleted file mode 100644
index 2f83760..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package proton
-
-import (
-	"fmt"
-	"net"
-)
-
-// Placeholder definitions to allow examples to compile.
-
-type Connection struct {
-	Server bool // Server connection does protocol negotiation
-	// FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc.
-}
-
-// Map an AMQP connection using conn
-func (c Connection) Connect(conn net.Conn) error { return nil }
-func (c Connection) Close() error                { return nil }
-
-func (c Connection) Receiver(addr string) (*Receiver, error) {
-	// FIXME aconway 2015-04-10: dummy implementation to test examples, returns endless messages.
-	r := &Receiver{make(chan Message), make(chan struct{})}
-	go func() {
-		for i := 0; ; i++ {
-			m := NewMessage()
-			m.SetBody(fmt.Sprintf("%v-%v", addr, i))
-			select {
-			case r.Receive <- m:
-			case <-r.closed:
-				return
-			}
-		}
-	}()
-	return r, nil
-}
-
-func (c Connection) Sender(addr string) (*Sender, error) {
-	return &Sender{}, nil
-}
-
-type Receiver struct {
-	Receive chan Message
-	closed  chan struct{}
-}
-
-func (r Receiver) Close() error { return nil }
-
-type Sender struct{}
-
-func (s Sender) Send(m Message) error { fmt.Println(m.Body()); return nil }
-func (s Sender) Close() error         { return nil }
-
-// Connect makes a default client connection using conn.
-//
-// For more control do:
-//     c := Connection{}
-//     // set parameters on c
-//     c.Connect(conn)
-//
-func Connect(conn net.Conn) (Connection, error) {
-	c := Connection{}
-	c.Connect(conn)
-	return c, nil
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
deleted file mode 100644
index 95927bc..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package proton
-
-// FIXME aconway 2015-04-08: consolidate with event/errors.go
-
-// #include <proton/error.h>
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"fmt"
-	"reflect"
-	"runtime"
-)
-
-var pnErrorNames = map[int]string{
-	C.PN_EOS:        "end of data",
-	C.PN_ERR:        "error",
-	C.PN_OVERFLOW:   "overflow",
-	C.PN_UNDERFLOW:  "underflow",
-	C.PN_STATE_ERR:  "bad state",
-	C.PN_ARG_ERR:    "invalid argument",
-	C.PN_TIMEOUT:    "timeout",
-	C.PN_INTR:       "interrupted",
-	C.PN_INPROGRESS: "in progress",
-}
-
-func pnErrorName(code int) string {
-	name := pnErrorNames[code]
-	if name != "" {
-		return name
-	} else {
-		return fmt.Sprintf("unknown-error(%s)", code)
-	}
-}
-
-// pnError converst a pn_error_t to a Go error. Returns nil if e has 0 error status
-func pnError(prefix string, e *C.pn_error_t) error {
-	if e == nil || int(C.pn_error_code(e)) == 0 {
-		return nil
-	}
-	code := int(C.pn_error_code(e))
-	return errorf("%s %s: %s", pnErrorName(code), prefix,
-		C.GoString(C.pn_error_text(e)))
-}
-
-type BadUnmarshal struct {
-	AMQPType string
-	GoType   reflect.Type
-}
-
-func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal {
-	return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)}
-}
-
-func (e BadUnmarshal) Error() string {
-	if e.GoType.Kind() != reflect.Ptr {
-		return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
-	} else {
-		return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
-	}
-}
-
-// errorf creates an error with a formatted message
-func errorf(format string, a ...interface{}) error {
-	return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...))
-}
-
-// doRecover is called to recover from internal panics
-func doRecover(err *error) {
-	r := recover()
-	switch r := r.(type) {
-	case nil:
-		return
-	case runtime.Error:
-		panic(r)
-	case error:
-		*err = r
-	default:
-		panic(r)
-	}
-}
-
-// panicIf panics if condition is true, the panic value is errorf(fmt, args...)
-func panicIf(condition bool, fmt string, args ...interface{}) {
-	if condition {
-		panic(errorf(fmt, args...))
-	}
-}
-
-func dataError(prefix string, data *C.pn_data_t) error {
-	return pnError(prefix, C.pn_data_error(data))
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
index d55dc7d..6a1c8ac 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
@@ -18,7 +18,17 @@ under the License.
 */
 
 /*
-Package event provides an event-oriented API to the  proton AMQP engine.
+Package event provides a low-level API to the proton AMQP engine.
+
+For most tasks, consider instead package qpid.apache.org/proton/messaging.
+It provides a higher-level, concurrent API that is easier to use.
+
+The API is event based. There are two alternative styles of handler. CoreHandler
+provides the core proton events. MessagingHandler provides a slighly simplified
+view of the event stream and automates some common tasks.
+
+See type Pump documentation for more details of the interaction between proton
+events and goroutines.
 */
 package event
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go
deleted file mode 100644
index 42382a4..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package event // FIXME aconway 2015-03-26: duplicated from package proton, clean up
-
-// #include <proton/error.h>
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"fmt"
-	"runtime"
-)
-
-var pnErrorNames = map[int]string{
-	C.PN_EOS:        "end of data",
-	C.PN_ERR:        "error",
-	C.PN_OVERFLOW:   "overflow",
-	C.PN_UNDERFLOW:  "underflow",
-	C.PN_STATE_ERR:  "bad state",
-	C.PN_ARG_ERR:    "invalid argument",
-	C.PN_TIMEOUT:    "timeout",
-	C.PN_INTR:       "interrupted",
-	C.PN_INPROGRESS: "in progress",
-}
-
-func pnErrorName(code int) string {
-	name := pnErrorNames[code]
-	if name != "" {
-		return name
-	} else {
-		return "unknown"
-	}
-}
-
-func pnError(e *C.pn_error_t) error {
-	if e == nil || C.pn_error_code(e) == 0 {
-		return nil
-	}
-	return errorf("%s: %s", pnErrorName(int(C.pn_error_code(e))), C.GoString(C.pn_error_text(e)))
-}
-
-// errorf creates an error with a formatted message
-func errorf(format string, a ...interface{}) error {
-	return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...))
-}
-
-// doRecover is called to recover from internal panics
-func doRecover(err *error) {
-	r := recover()
-	switch r := r.(type) {
-	case nil:
-		return
-	case runtime.Error:
-		panic(r)
-	case error:
-		*err = r
-	default:
-		panic(r)
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
index d1ce953..a9e1468 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
@@ -23,6 +23,10 @@ package event
 // #include <proton/handlers.h>
 import "C"
 
+import (
+	"qpid.apache.org/proton/internal"
+)
+
 // CoreHandler handles core proton events.
 type CoreHandler interface {
 	// Handle is called with an event.
@@ -40,24 +44,18 @@ func (h cHandler) Handle(e Event) error {
 	return nil // FIXME aconway 2015-03-31: error handling
 }
 
-func HandShaker() CoreHandler {
-	return cHandler{C.pn_handshaker()}
-}
-
-func FlowController(prefetch int) CoreHandler {
-	return cHandler{C.pn_flowcontroller(C.int(prefetch))}
-}
-
-// MessagingHandler provides a higher-level, easier-to-use interface for writing
-// applications that send and receive messages.
+// MessagingHandler provides an alternative interface to CoreHandler,
+// it is easier to use for most applications that send and receive messages.
+//
+// Implement this interface and then wrap your value with a MessagingHandlerDelegator.
+// MessagingHandlerDelegator implements CoreHandler and can be registered with a Pump.
 //
-// You can implement this interface and wrap it with a MessagingHandlerDelegator
 type MessagingHandler interface {
 	Handle(MessagingEventType, Event) error
 }
 
-// MessagingEventType provides an easier set of event types to work with
-// that the core proton EventType.
+// MessagingEventType provides a set of events that are easier to work with than the
+// core events defined by EventType
 //
 type MessagingEventType int
 
@@ -101,13 +99,13 @@ const (
 	// The peer initiates the closing of the link.
 	MLinkClosing
 
-	// The connection is closed.
+	// Both ends of the connection are closed.
 	MConnectionClosed
 
-	// The session is closed.
+	// Both ends of the session are closed.
 	MSessionClosed
 
-	// The link is closed.
+	// Both ends of the link are closed.
 	MLinkClosed
 
 	// The socket is disconnected.
@@ -137,7 +135,60 @@ const (
 	MMessage
 )
 
-// Capture common patterns for endpoints opening/closing
+func (t MessagingEventType) String() string {
+	switch t {
+	case MStart:
+		return "Start"
+	case MConnectionError:
+		return "ConnectionError"
+	case MSessionError:
+		return "SessionError"
+	case MLinkError:
+		return "LinkError"
+	case MConnectionOpening:
+		return "ConnectionOpening"
+	case MSessionOpening:
+		return "SessionOpening"
+	case MLinkOpening:
+		return "LinkOpening"
+	case MConnectionOpened:
+		return "ConnectionOpened"
+	case MSessionOpened:
+		return "SessionOpened"
+	case MLinkOpened:
+		return "LinkOpened"
+	case MConnectionClosing:
+		return "ConnectionClosing"
+	case MSessionClosing:
+		return "SessionClosing"
+	case MLinkClosing:
+		return "LinkClosing"
+	case MConnectionClosed:
+		return "ConnectionClosed"
+	case MSessionClosed:
+		return "SessionClosed"
+	case MLinkClosed:
+		return "LinkClosed"
+	case MDisconnected:
+		return "Disconnected"
+	case MSendable:
+		return "Sendable"
+	case MAccepted:
+		return "Accepted"
+	case MRejected:
+		return "Rejected"
+	case MReleased:
+		return "Released"
+	case MSettled:
+		return "Settled"
+	case MMessage:
+		return "Message"
+	default:
+		return "Unknown"
+	}
+}
+
+// endpointDelegator captures common patterns for endpoints opening/closing
 type endpointDelegator struct {
 	remoteOpen, remoteClose, localOpen, localClose EventType
 	opening, opened, closing, closed, error        MessagingEventType
@@ -145,52 +196,63 @@ type endpointDelegator struct {
 	delegate                                       MessagingHandler
 }
 
-func (d endpointDelegator) Handle(e Event) error {
+// Handle handles an open/close event for an endpoint in a generic way.
+func (d endpointDelegator) Handle(e Event) (err error) {
 	endpoint := d.endpoint(e)
 	state := endpoint.State()
 
 	switch e.Type() {
 
 	case d.localOpen:
-		if state.RemoteOpen() {
-			return d.delegate.Handle(d.opened, e)
+		if state.Is(SRemoteActive) {
+			err = d.delegate.Handle(d.opened, e)
 		}
 
 	case d.remoteOpen:
 		switch {
-		case state.LocalOpen():
-			return d.delegate.Handle(d.opened, e)
-		case state.LocalUninitialized():
-			err := d.delegate.Handle(d.opening, e)
+		case state.Is(SLocalActive):
+			err = d.delegate.Handle(d.opened, e)
+		case state.Is(SLocalUninit):
+			err = d.delegate.Handle(d.opening, e)
 			if err == nil {
 				endpoint.Open()
 			}
-			return err
 		}
 
 	case d.remoteClose:
-		switch {
-		case endpoint.RemoteCondition().IsSet():
-			d.delegate.Handle(d.error, e)
-		case state.LocalClosed():
-			d.delegate.Handle(d.closed, e)
-		default:
-			d.delegate.Handle(d.closing, e)
+		var err1 error
+		if endpoint.RemoteCondition().IsSet() {
+			err1 = d.delegate.Handle(d.error, e)
+			if err1 == nil {
+				err1 = endpoint.RemoteCondition().Error()
+			}
+		}
+		if state.Is(SLocalClosed) {
+			err = d.delegate.Handle(d.closed, e)
+		} else {
+			err = d.delegate.Handle(d.closing, e)
+			endpoint.Close()
+		}
+		if err1 != nil {
+			err = err1
 		}
-		endpoint.Close()
 
 	case d.localClose:
-		// Nothing to do
+		if state.Is(SRemoteClosed) {
+			err = d.delegate.Handle(d.closed, e)
+		}
 
 	default:
-		panic("internal error") // We shouldn't be called with any other event type.
+		// We shouldn't be called with any other event type.
+		panic(internal.Errorf("internal error, not an open/close event: %s", e))
 	}
-	return nil
+
+	return err
 }
 
 // MessagingDelegator implments a CoreHandler and delegates to a MessagingHandler.
 // You can modify the exported fields before you pass the MessagingDelegator to
-// a Pump
+// a Pump.
 type MessagingDelegator struct {
 	delegate                   MessagingHandler
 	connection, session, link  endpointDelegator
@@ -231,7 +293,6 @@ func NewMessagingDelegator(h MessagingHandler) CoreHandler {
 			func(e Event) Endpoint { return e.Link() },
 			h,
 		},
-		handshaker:     HandShaker(),
 		flowcontroller: nil,
 		AutoSettle:     true,
 		AutoAccept:     true,
@@ -248,13 +309,12 @@ func handleIf(h CoreHandler, e Event) error {
 }
 
 func (d *MessagingDelegator) Handle(e Event) error {
-	handleIf(d.handshaker, e)
 	handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling.
 
 	switch e.Type() {
 
 	case EConnectionInit:
-		d.flowcontroller = FlowController(d.Prefetch)
+		d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))}
 		d.delegate.Handle(MStart, e)
 
 	case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose:
@@ -277,6 +337,9 @@ func (d *MessagingDelegator) Handle(e Event) error {
 		} else {
 			d.outgoing(e)
 		}
+
+	case ETransportTailClosed:
+		d.delegate.Handle(MDisconnected, e)
 	}
 	return nil
 }
@@ -284,7 +347,7 @@ func (d *MessagingDelegator) Handle(e Event) error {
 func (d *MessagingDelegator) incoming(e Event) (err error) {
 	delivery := e.Delivery()
 	if delivery.Readable() && !delivery.Partial() {
-		if e.Link().State().LocalClosed() {
+		if e.Link().State().Is(SLocalClosed) {
 			e.Link().Advance()
 			if d.AutoAccept {
 				delivery.Release(false)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go
new file mode 100644
index 0000000..bd7dddd
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package event
+
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"qpid.apache.org/proton"
+	"qpid.apache.org/proton/internal"
+)
+
+// DecodeMessage decodes the message containined in a delivery event.
+func DecodeMessage(e Event) (m proton.Message, err error) {
+	defer internal.DoRecover(&err)
+	delivery := e.Delivery()
+	if !delivery.Readable() || delivery.Partial() {
+		return nil, internal.Errorf("attempting to get incomplete message")
+	}
+	data := make([]byte, delivery.Pending())
+	result := delivery.Link().Recv(data)
+	if result != len(data) {
+		return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result))
+	}
+	return proton.DecodeMessage(data)
+}
+
+// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link.
+var tags proton.UidCounter
+
+// Send sends a proton.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) Send(m proton.Message) (Delivery, error) {
+	if !link.IsSender() {
+		return Delivery{}, internal.Errorf("attempt to send message on receiving link")
+	}
+	// FIXME aconway 2015-04-08: buffering, error handling
+	delivery := link.Delivery(tags.Next())
+	bytes, err := m.Encode(nil)
+	if err != nil {
+		return Delivery{}, internal.Errorf("cannot send mesage %s", err)
+	}
+	result := link.SendBytes(bytes)
+	link.Advance()
+	if result != len(bytes) {
+		if result < 0 {
+			return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result))
+		} else {
+			return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes))
+		}
+	}
+	if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names
+		delivery.Settle()
+	}
+	return delivery, nil
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
index 17257b5..480c994 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
@@ -25,104 +25,160 @@ package event
 // #include <proton/reactor.h>
 // #include <proton/handlers.h>
 // #include <proton/transport.h>
+// #include <proton/session.h>
 // #include <memory.h>
+// #include <stdlib.h>
 //
+// PN_HANDLE(REMOTE_ADDR)
 import "C"
 
 import (
+	"fmt"
+	"io"
 	"net"
+	"qpid.apache.org/proton/internal"
 	"sync"
 	"unsafe"
 )
 
-// FIXME aconway 2015-04-09: We should allow data from multiple connections to be pumped
-// into a single event loop (using the proton Reactor)
-// That would allow the user to decide if they want an event-loop goroutine per connection
-// or if they want to handle several connections in one event loop.
+// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
+type bufferChan struct {
+	buffers    chan []byte
+	buf1, buf2 []byte
+}
 
-// Pump reads from a net.Conn, decodes AMQP events and calls the appropriate
-// Handler functions. Actions taken by Handler functions (such as sending messages)
-// are encoded and written to the net.Conn.
-//
+func newBufferChan(size int) *bufferChan {
+	return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
+}
+
+func (b *bufferChan) buffer() []byte {
+	b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
+	return b.buf1[:cap(b.buf1)]
+}
+
+// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if
+// the pum stops. Make this a function that selects on running.
+
+// FIXME aconway 2015-05-05: for consistency should Pump be called Driver?
+
+/*
+Pump reads from a net.Conn, decodes AMQP events and calls the appropriate
+Handler functions. Actions taken by Handler functions (such as sending messages)
+are encoded and written to the net.Conn.
+
+The proton protocol engine is single threaded (per connection). The Pump runs
+proton in the goroutine that calls Pump.Run() and creates goroutines to feed
+data to/from a net.Conn. You can create multiple Pumps to handle multiple
+connections concurrently.
+
+Methods in this package can only be called in the goroutine that executes the
+corresponding Pump.Run(). You implement the CoreHandler or MessagingHandler
+interfaces and provide those values to NewPump(). Their Handle method will be
+called in the Pump goroutine, in typical event-driven style.
+
+Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
+other goroutines, store them, or use them as map indexes. Effectively they are
+just C pointers.  Other goroutines cannot call their methods directly but they
+can can create function closures that call their methods and send those closures
+to the Pump.Inject channel. They will execute safely in the pump
+goroutine. Injected functions, or your handlers, can set up channels to get
+results back to other goroutines.
+
+You are responsible for ensuring you don't use an event value after the C object
+has been deleted. The handler methods will tell you when a value is no longer
+valid. For example after a MethodHandler handles a LinkClosed event, that link
+is no longer valid. If you do Link.Close() yourself (in a handler or injected
+function) the link remains valid until the corresponing LinkClosed event is
+received by the handler.
+
+Pump.Close() will take care of cleaning up any remaining values and types when
+you are done with the Pump. All values associated with a pump become invalid
+when you call Pump.Close()
+
+The qpid.apache.org/proton/messaging package will do all this for you, so unless
+you are doing something fairly low-level it is probably a better choice.
+
+*/
 type Pump struct {
+	// Error is set on exit from Run() if there was an error.
+	Error error
+	// Channel to inject functions to be executed in the Pump's proton event loop.
+	Inject chan func()
+
 	conn       net.Conn
 	transport  *C.pn_transport_t
 	connection *C.pn_connection_t
 	collector  *C.pn_collector_t
-	read       chan []byte    // Read buffers channel. Will close when pump closes.
-	write      chan []byte    // Write buffer channel. Must be closed when read closes.
-	waiter     sync.WaitGroup // Wait for read and write goroutines to complete.
-	handlers   []CoreHandler  // Handlers for proton events.
-
-	inject chan func()   // Functions inject into the loop
-	closed chan struct{} // This channel will be closed when the remote end closes.
+	read       *bufferChan   // Read buffers channel.
+	write      *bufferChan   // Write buffers channel.
+	handlers   []CoreHandler // Handlers for proton events.
+	running    chan struct{} // This channel will be closed when the goroutines are done.
 }
 
 const bufferSize = 4096
 
-// NewPump initializes a pump with a connection and handlers.. Call `go Run()` to start it running.
+var pumps map[*C.pn_connection_t]*Pump
+
+func init() {
+	pumps = make(map[*C.pn_connection_t]*Pump)
+}
+
+// NewPump initializes a pump with a connection and handlers. To start it running:
+//    p := NewPump(...)
+//    go run p.Run()
+// The goroutine will exit when the pump is closed or disconnected.
+// You can check for errors on Pump.Error.
+//
 func NewPump(conn net.Conn, handlers ...CoreHandler) (*Pump, error) {
+	// Save the connection ID for Connection.String()
 	p := &Pump{
+		Inject:     make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack
 		conn:       conn,
 		transport:  C.pn_transport(),
 		connection: C.pn_connection(),
 		collector:  C.pn_collector(),
 		handlers:   handlers,
-		read:       make(chan []byte),
-		write:      make(chan []byte),
-		inject:     make(chan func()),
-		closed:     make(chan struct{}),
+		read:       newBufferChan(bufferSize),
+		write:      newBufferChan(bufferSize),
+		running:    make(chan struct{}),
 	}
-	p.handlers = append(p.handlers, handlers...)
-
 	if p.transport == nil || p.connection == nil || p.collector == nil {
-		return nil, errorf("failed to allocate pump")
+		return nil, internal.Errorf("failed to allocate pump")
 	}
 	pnErr := int(C.pn_transport_bind(p.transport, p.connection))
 	if pnErr != 0 {
-		return nil, errorf("cannot setup pump: %s", pnErrorName(pnErr))
+		return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr))
 	}
 	C.pn_connection_collect(p.connection, p.collector)
 	C.pn_connection_open(p.connection)
+	pumps[p.connection] = p
 	return p, nil
 }
 
-// Server puts the Pump in server mode, meaning it will auto-detect security settings on
-// the incoming connnection such as use of SASL and SSL.
-// Must be called before Run()
-func (p *Pump) Server() {
-	C.pn_transport_set_server(p.transport)
+func (p *Pump) String() string {
+	return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr())
 }
 
-// Run the pump. Normally called in a goroutine as: go pump.Run()
-func (p *Pump) Run() {
-	go p.run()
+func (p *Pump) Id() string {
+	return fmt.Sprintf("%p", &p)
 }
 
-// Pump handles the connction close event to close itself.
-func (p *Pump) Handle(e Event) error {
-	switch e.Type() {
-	case EConnectionLocalClose:
-		return p.close()
+// setError sets error only if not already set
+func (p *Pump) setError(e error) {
+	if p.Error == nil {
+		p.Error = e
 	}
-	return nil
 }
 
-// Closing the pump will also close the net.Conn and stop associated goroutines.
-func (p *Pump) Close() error {
-	// FIXME aconway 2015-04-08: broken
-
-	// Note this is called externally, outside the proton event loop.
-	// Polite AMQP close
-	p.inject <- func() { C.pn_connection_close(p.connection) }
-	_, _ = <-p.closed // Wait for remote close
-	return p.close()
+// Server puts the Pump in server mode, meaning it will auto-detect security settings on
+// the incoming connnection such as use of SASL and SSL.
+// Must be called before Run()
+//
+func (p *Pump) Server() {
+	C.pn_transport_set_server(p.transport)
 }
 
-// close private implementation, call in the event loop.
-func (p *Pump) close() error {
-	p.conn.Close()
-	p.waiter.Wait()
+func (p *Pump) free() {
 	if p.connection != nil {
 		C.pn_connection_free(p.connection)
 	}
@@ -138,74 +194,102 @@ func (p *Pump) close() error {
 			C.pn_handler_free(h.pn)
 		}
 	}
-	return nil // FIXME aconway 2015-03-31: error handling
 }
 
-// Start goroutines to feed the pn_transport_t from the pump.
-func (c *Pump) run() error {
-	// FIXME aconway 2015-03-17: error handling
-	c.waiter.Add(2)
-	var readError, writeError error
+// Close closes the AMQP connection, the net.Conn, and stops associated goroutines.
+// It will cause Run() to return. Run() may return earlier if the network disconnects
+// but you must still call Close() to clean everything up.
+//
+// Methods on values associated with the pump (Connections, Sessions, Links) will panic
+// if called after Close()
+//
+func (p *Pump) Close() error {
+	// If the pump is still running, inject a close. Either way wait for it to finish.
+	select {
+	case p.Inject <- func() { C.pn_connection_close(p.connection) }:
+		<-p.running // Wait to finish
+	case <-p.running: // Wait for goroutines to finish
+	}
+	delete(pumps, p.connection)
+	p.free()
+	return p.Error
+}
 
-	go func() { // Read
-		rbuf, rbuf2 := make([]byte, bufferSize), make([]byte, bufferSize)
+// Run the pump. Normally called in a goroutine as: go pump.Run()
+// An error dunring Run is stored on p.Error.
+//
+func (p *Pump) Run() {
+	// Signal errors from the read/write goroutines. Don't block if we don't
+	// read all the errors, we only care about the first.
+	error := make(chan error, 2)
+	// FIXME aconway 2015-05-04: 	stop := make(chan struct{}) // Closed to signal that read/write should stop.
+
+	wait := sync.WaitGroup{}
+	wait.Add(2)
+
+	go func() { // Read goroutine
+		defer wait.Done()
 		for {
-			rbuf = rbuf[:cap(rbuf)]
-			n, err := c.conn.Read(rbuf)
+			rbuf := p.read.buffer()
+			n, err := p.conn.Read(rbuf)
 			if n > 0 {
-				c.read <- rbuf[:n]
+				p.read.buffers <- rbuf[:n]
+			} else if err != nil {
+				close(p.read.buffers)
+				error <- err
+				return
 			}
-			if err != nil {
-				readError = err
-				break
-			}
-			rbuf, rbuf2 = rbuf2, rbuf // Swap the buffers, fill the one not in use.
 		}
-		close(c.read)
-		c.waiter.Done()
 	}()
 
-	go func() { // Write
-		for wbuf := range c.write {
-			_, err := c.conn.Write(wbuf)
+	go func() { // Write goroutine
+		defer wait.Done()
+		for {
+			wbuf, ok := <-p.write.buffers
+			if !ok {
+				return
+			}
+			_, err := p.conn.Write(wbuf)
 			if err != nil {
-				writeError = err
-				break
+				error <- err
+				return
 			}
 		}
-		c.waiter.Done()
 	}()
 
-	// Proton driver loop
-	wbuf, wbuf2 := make([]byte, bufferSize), make([]byte, bufferSize)
-	wbuf = c.pop(wbuf) // First write buffer
-	for {              // handle pn_transport_t
-		select {
-		case buf, ok := <-c.read: // Read a buffer
-			if !ok { // Read channel closed
-				break
-			}
-			c.push(buf)
-
-		case c.write <- wbuf: // Write a buffer
-			wbuf, wbuf2 = wbuf2, wbuf // Swap the buffers, fill the unused one.
-			wbuf = c.pop(wbuf)        // Next buffer to write
+	wbuf := p.write.buffer()[:0]
+loop:
+	for {
+		if len(wbuf) == 0 {
+			p.pop(&wbuf)
+		}
+		// Don't set wchan unless there is something to write.
+		var wchan chan []byte
+		if len(wbuf) > 0 {
+			wchan = p.write.buffers
+		}
 
-		case f := <-c.inject: // Function injected from another goroutine
+		select {
+		case buf := <-p.read.buffers: // Read a buffer
+			p.push(buf)
+		case wchan <- wbuf: // Write a buffer
+			wbuf = p.write.buffer()[:0]
+		case f := <-p.Inject: // Function injected from another goroutine
 			f()
+		case err := <-error: // Read or write error
+			p.setError(err)
+			C.pn_transport_close_tail(p.transport)
+			C.pn_transport_close_head(p.transport)
+		}
+		if err := p.process(); err != nil {
+			p.setError(err)
+			break loop
 		}
-		c.process() // FIXME aconway 2015-03-17: error handling
-	}
-
-	close(c.write)
-	c.waiter.Wait() // Wait for read/write goroutines to finish
-	switch {
-	case readError != nil:
-		return readError
-	case writeError != nil:
-		return writeError
 	}
-	return nil
+	close(p.write.buffers)
+	p.conn.Close()
+	wait.Wait()
+	close(p.running) // Signal goroutines have exited and Error is set.
 }
 
 func minInt(a, b int) int {
@@ -216,47 +300,58 @@ func minInt(a, b int) int {
 	}
 }
 
-func (c *Pump) pop(buf []byte) []byte {
-	pending := int(C.pn_transport_pending(c.transport))
-	if pending == int(C.PN_EOS) {
-		return nil
-	}
-	if pending < 0 {
-		panic(errorf(pnErrorName(pending)))
+func (p *Pump) pop(buf *[]byte) {
+	pending := int(C.pn_transport_pending(p.transport))
+	switch {
+	case pending == int(C.PN_EOS):
+		*buf = (*buf)[:]
+		return
+	case pending < 0:
+		panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
 	}
-	size := minInt(pending, cap(buf))
-	buf = buf[:size]
+	size := minInt(pending, cap(*buf))
+	*buf = (*buf)[:size]
 	if size == 0 {
-		return buf
+		return
 	}
-	C.memcpy(unsafe.Pointer(&buf[0]), unsafe.Pointer(C.pn_transport_head(c.transport)), C.size_t(size))
-	C.pn_transport_pop(c.transport, C.size_t(size))
-	return buf
+	C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size))
+	C.pn_transport_pop(p.transport, C.size_t(size))
 }
 
-func (c *Pump) push(buf []byte) {
+func (p *Pump) push(buf []byte) {
 	buf2 := buf
 	for len(buf2) > 0 {
-		n := int(C.pn_transport_push(c.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2))))
+		n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2))))
 		if n <= 0 {
-			panic(errorf("error in transport: %s", pnErrorName(n)))
+			panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
 		}
 		buf2 = buf2[n:]
 	}
 }
 
-func (c *Pump) process() {
-	for ce := C.pn_collector_peek(c.collector); ce != nil; ce = C.pn_collector_peek(c.collector) {
-		e := Event{ce}
-		for _, h := range c.handlers {
-			h.Handle(e) // FIXME aconway 2015-03-18: error handling
+func (p *Pump) handle(e Event) error {
+	for _, h := range p.handlers {
+		if err := h.Handle(e); err != nil {
+			return err
 		}
-		C.pn_collector_pop(c.collector)
 	}
+	if e.Type() == ETransportClosed {
+		return io.EOF
+	}
+	return nil
 }
 
-func (c *Pump) AddHandlers(handlers ...CoreHandler) {
-	c.inject <- func() {
-		c.handlers = append(c.handlers, handlers...)
+func (p *Pump) process() error {
+	// FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump
+	for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) {
+		e := Event{ce}
+		if err := p.handle(e); err != nil {
+			return err
+		}
+		C.pn_collector_pop(p.collector)
 	}
+	return nil
 }
+
+// Connectoin gets the Pump's connection value.
+func (p *Pump) Connection() Connection { return Connection{p.connection} }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
index c3c0a7d..3584311 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
@@ -21,17 +21,26 @@ package event
 
 //#include <proton/codec.h>
 //#include <proton/connection.h>
+//#include <proton/session.h>
+//#include <proton/session.h>
 //#include <proton/delivery.h>
 //#include <proton/event.h>
 //#include <proton/transport.h>
 //#include <proton/link.h>
+//#include <stdlib.h>
 import "C"
 
 import (
+	"fmt"
+	"qpid.apache.org/proton/internal"
 	"unsafe"
 )
 
-// Data holds a pointer to decoded AMQP data, proton.marshal/unmarshal to access it as Go data.
+// FIXME aconway 2015-05-05: Documentation for generated types.
+
+// Data holds a pointer to decoded AMQP data.
+// Use proton.marshal/unmarshal to access it as Go data types.
+//
 type Data struct{ pn *C.pn_data_t }
 
 func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} }
@@ -40,20 +49,24 @@ func (d Data) Free()                   { C.pn_data_free(d.pn) }
 func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) }
 func (d Data) Clear()                  { C.pn_data_clear(d.pn) }
 func (d Data) Rewind()                 { C.pn_data_rewind(d.pn) }
-func (d Data) Error() error            { return pnError(C.pn_data_error(d.pn)) }
+func (d Data) Error() error {
+	return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn)))
+}
 
 // State holds the state flags for an AMQP endpoint.
 type State byte
 
-func (s State) LocalUninitialized() bool { return s&C.PN_LOCAL_UNINIT != 0 }
-func (s State) LocalActive() bool        { return s&C.PN_LOCAL_ACTIVE != 0 }
-func (s State) LocalOpen() bool          { return s&C.PN_LOCAL_ACTIVE != 0 }
-func (s State) LocalClosed() bool        { return s&C.PN_LOCAL_CLOSED != 0 }
+const (
+	SLocalUninit  State = C.PN_LOCAL_UNINIT
+	SLocalActive        = C.PN_LOCAL_ACTIVE
+	SLocalClosed        = C.PN_LOCAL_CLOSED
+	SRemoteUninit       = C.PN_REMOTE_UNINIT
+	SRemoteActive       = C.PN_REMOTE_ACTIVE
+	SRemoteClosed       = C.PN_REMOTE_CLOSED
+)
 
-func (s State) RemoteUninitialized() bool { return s&C.PN_REMOTE_UNINIT != 0 }
-func (s State) RemoteActive() bool        { return s&C.PN_REMOTE_ACTIVE != 0 }
-func (s State) RemoteOpen() bool          { return s&C.PN_REMOTE_ACTIVE != 0 }
-func (s State) RemoteClosed() bool        { return s&C.PN_REMOTE_CLOSED != 0 }
+// Is is True if bits & state is non 0.
+func (s State) Is(bits State) bool { return s&bits != 0 }
 
 // Return a State containig just the local flags
 func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) }
@@ -63,14 +76,18 @@ func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) }
 
 // Endpoint is the common interface for Connection, Link and Session.
 type Endpoint interface {
+	// State is the open/closed state.
 	State() State
+	// Open an endpoint.
 	Open()
+	// Close an endpoint.
 	Close()
+	// Condition holds a local error condition.
 	Condition() Condition
+	// RemoteCondition holds a remote error condition.
 	RemoteCondition() Condition
 }
 
-// Disposition types
 const (
 	Received uint64 = C.PN_RECEIVED
 	Accepted        = C.PN_ACCEPTED
@@ -102,8 +119,11 @@ func (d Delivery) Release(delivered bool) {
 	}
 }
 
-type Transport struct{ pn *C.pn_transport_t }
-type DeliveryTag struct{ pn C.pn_delivery_tag_t } // FIXME aconway 2015-03-25: convert to string
+// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String?
+
+type DeliveryTag struct{ pn C.pn_delivery_tag_t }
+
+func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) }
 
 func (l Link) Recv(buf []byte) int {
 	if len(buf) == 0 {
@@ -112,7 +132,7 @@ func (l Link) Recv(buf []byte) int {
 	return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
 }
 
-func (l Link) Send(bytes []byte) int {
+func (l Link) SendBytes(bytes []byte) int {
 	return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes)))
 }
 
@@ -135,3 +155,70 @@ func cPtr(b []byte) *C.char {
 func cLen(b []byte) C.size_t {
 	return C.size_t(len(b))
 }
+
+func (s Session) Sender(name string) Link {
+	cname := C.CString(name)
+	defer C.free(unsafe.Pointer(cname))
+	return Link{C.pn_sender(s.pn, cname)}
+}
+
+func (s Session) Receiver(name string) Link {
+	cname := C.CString(name)
+	defer C.free(unsafe.Pointer(cname))
+	return Link{C.pn_receiver(s.pn, cname)}
+}
+
+func joinId(a, b interface{}) string {
+	return fmt.Sprintf("%s/%s", a, b)
+}
+
+// Pump associated with this connection.
+func (c Connection) Pump() *Pump { return pumps[c.pn] }
+
+// Unique (per process) string identifier for a connection, useful for debugging.
+func (c Connection) String() string { return pumps[c.pn].String() }
+
+// Head functions don't follow the normal naming conventions so missed by the generator.
+
+func (c Connection) LinkHead(s State) Link {
+	return Link{C.pn_link_head(c.pn, C.pn_state_t(s))}
+}
+
+func (c Connection) SessionHead(s State) Session {
+	return Session{C.pn_session_head(c.pn, C.pn_state_t(s))}
+}
+
+// Unique (per process) string identifier for a session, including connection identifier.
+func (s Session) String() string {
+	return joinId(s.Connection(), fmt.Sprintf("%p", s.pn))
+}
+
+// Unique (per process) string identifier for a link, inlcuding session identifier.
+func (l Link) String() string {
+	return joinId(l.Session(), l.Name())
+}
+
+// Error returns an error interface corresponding to Condition.
+func (c Condition) Error() error {
+	if c.IsNil() {
+		return nil
+	} else {
+		return fmt.Errorf("%s: %s", c.Name(), c.Description())
+	}
+}
+
+// SetIfUnset sets name and description on a condition if it is not already set.
+func (c Condition) SetIfUnset(name, description string) {
+	if !c.IsSet() {
+		c.SetName(name)
+		c.SetDescription(description)
+	}
+}
+
+func (c Connection) Session() (Session, error) {
+	s := Session{C.pn_session(c.pn)}
+	if s.IsNil() {
+		return s, Connection(c).Error()
+	}
+	return s, nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message