qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [5/5] qpid-proton git commit: PROTON-1011: Go example of event driven broker. Package renaming and some new features.
Date Thu, 08 Oct 2015 04:30:37 GMT
PROTON-1011: Go example of event driven broker. Package renaming and some new features.

New pacakges names:

- qpid.apache.org/amqp - amqp/Go data mapping
- qpid.apache.org/proton - faithful wrapper of proton C library
- qpid.apache.org/electron - alternative, procedural, concurrent-safe Go API

Simplified broker examples, complete proton and electron brokers for comparison.

- Send blocks for credit, added SendTimeout.
- Fixed some shut-down issues
- Session flow control.
- Additional unit tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/478ba4ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/478ba4ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/478ba4ea

Branch: refs/heads/master
Commit: 478ba4ea1e7a2c5e60ac8f64d6756f2e6658663b
Parents: 2789615
Author: Alan Conway <aconway@redhat.com>
Authored: Tue Sep 29 17:29:22 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Thu Oct 8 00:30:09 2015 -0400

----------------------------------------------------------------------
 examples/go/CMakeLists.txt                      |  15 +-
 examples/go/README.md                           |  24 +-
 examples/go/broker.go                           | 161 ------
 examples/go/electron/broker.go                  | 148 +++++
 examples/go/electron/receive.go                 | 122 ++++
 examples/go/electron/send.go                    | 123 ++++
 examples/go/example_test.go                     |   6 +-
 examples/go/proton/broker.go                    | 299 ++++++++++
 examples/go/receive.go                          | 119 ----
 examples/go/send.go                             | 121 ----
 examples/go/util/queue.go                       |  89 +--
 examples/go/util/util.go                        |   2 +-
 proton-c/bindings/go/CMakeLists.txt             |   4 +-
 proton-c/bindings/go/README.md                  | 151 +----
 .../bindings/go/src/qpid.apache.org/README.md   | 105 ++++
 .../bindings/go/src/qpid.apache.org/amqp/doc.go |  34 ++
 .../go/src/qpid.apache.org/amqp/error.go        |  66 +++
 .../go/src/qpid.apache.org/amqp/interop         |   1 +
 .../go/src/qpid.apache.org/amqp/interop_test.go | 381 +++++++++++++
 .../go/src/qpid.apache.org/amqp/marshal.go      | 250 +++++++++
 .../go/src/qpid.apache.org/amqp/message.go      | 347 ++++++++++++
 .../go/src/qpid.apache.org/amqp/message_test.go | 166 ++++++
 .../go/src/qpid.apache.org/amqp/types.go        | 198 +++++++
 .../go/src/qpid.apache.org/amqp/unmarshal.go    | 556 +++++++++++++++++++
 .../bindings/go/src/qpid.apache.org/amqp/url.go |  96 ++++
 .../go/src/qpid.apache.org/amqp/url_test.go     |  51 ++
 .../src/qpid.apache.org/electron/connection.go  | 192 +++++++
 .../src/qpid.apache.org/electron/container.go   |  71 +++
 .../go/src/qpid.apache.org/electron/doc.go      |  57 ++
 .../go/src/qpid.apache.org/electron/endpoint.go |  68 +++
 .../go/src/qpid.apache.org/electron/handler.go  | 176 ++++++
 .../go/src/qpid.apache.org/electron/link.go     | 242 ++++++++
 .../qpid.apache.org/electron/messaging_test.go  | 412 ++++++++++++++
 .../go/src/qpid.apache.org/electron/receiver.go | 232 ++++++++
 .../go/src/qpid.apache.org/electron/sender.go   | 319 +++++++++++
 .../go/src/qpid.apache.org/electron/session.go  |  98 ++++
 .../go/src/qpid.apache.org/electron/time.go     |  81 +++
 .../go/src/qpid.apache.org/internal/error.go    | 118 ++++
 .../src/qpid.apache.org/internal/flexchannel.go |  82 +++
 .../internal/flexchannel_test.go                |  89 +++
 .../go/src/qpid.apache.org/internal/safemap.go  |  57 ++
 .../go/src/qpid.apache.org/internal/uuid.go     |  70 +++
 .../go/src/qpid.apache.org/proton/README.md     |  12 -
 .../go/src/qpid.apache.org/proton/amqp/doc.go   |  34 --
 .../go/src/qpid.apache.org/proton/amqp/error.go |  66 ---
 .../go/src/qpid.apache.org/proton/amqp/interop  |   1 -
 .../qpid.apache.org/proton/amqp/interop_test.go | 381 -------------
 .../src/qpid.apache.org/proton/amqp/marshal.go  | 250 ---------
 .../src/qpid.apache.org/proton/amqp/message.go  | 347 ------------
 .../qpid.apache.org/proton/amqp/message_test.go | 166 ------
 .../go/src/qpid.apache.org/proton/amqp/types.go | 198 -------
 .../qpid.apache.org/proton/amqp/unmarshal.go    | 556 -------------------
 .../go/src/qpid.apache.org/proton/amqp/url.go   |  96 ----
 .../src/qpid.apache.org/proton/amqp/url_test.go |  51 --
 .../proton/concurrent/connection.go             | 213 -------
 .../proton/concurrent/container.go              |  71 ---
 .../qpid.apache.org/proton/concurrent/doc.go    |  46 --
 .../proton/concurrent/endpoint.go               |  87 ---
 .../proton/concurrent/handler.go                | 137 -----
 .../qpid.apache.org/proton/concurrent/link.go   | 232 --------
 .../proton/concurrent/messaging_test.go         | 205 -------
 .../proton/concurrent/receiver.go               | 241 --------
 .../qpid.apache.org/proton/concurrent/sender.go | 190 -------
 .../proton/concurrent/session.go                | 114 ----
 .../qpid.apache.org/proton/concurrent/time.go   |  71 ---
 .../go/src/qpid.apache.org/proton/doc.go        |  49 +-
 .../go/src/qpid.apache.org/proton/engine.go     | 218 ++++----
 .../go/src/qpid.apache.org/proton/handlers.go   |  22 +-
 .../qpid.apache.org/proton/internal/error.go    | 121 ----
 .../proton/internal/flexchannel.go              |  82 ---
 .../proton/internal/flexchannel_test.go         |  89 ---
 .../qpid.apache.org/proton/internal/safemap.go  |  57 --
 .../src/qpid.apache.org/proton/internal/uuid.go |  70 ---
 .../go/src/qpid.apache.org/proton/message.go    |  17 +-
 .../go/src/qpid.apache.org/proton/wrappers.go   |  25 +-
 .../src/qpid.apache.org/proton/wrappers_gen.go  |   2 +-
 76 files changed, 5534 insertions(+), 4982 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index 873180d..1b68ebe 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -18,10 +18,11 @@
 #
 
 if(BUILD_GO)
-  set(examples receive send broker)
+  set(examples electron/receive electron/send electron/broker proton/broker)
 
   foreach(example ${examples})
-    add_custom_target(go-example-${example} ALL
+    string(REPLACE / - target ${example})
+    add_custom_target(go-example-${target} ALL
       COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${CMAKE_CURRENT_BINARY_DIR}/${example} ${CMAKE_CURRENT_SOURCE_DIR}/${example}.go
       DEPENDS go-packages qpid-proton)
   endforeach()
@@ -30,8 +31,14 @@ if(BUILD_GO)
       DEPENDS go-packages qpid-proton)
 
   add_test(
-    NAME go_example_test
-    COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker)
+    NAME go_example_electron_test
+    COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker
+    WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/electron)
+
+  add_test(
+    NAME go_example_proton_test
+    COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker ../proton/broker
+    WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/electron)
 
   list(APPEND ADDITIONAL_MAKE_CLEAN_FILES ${examples})
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index c0bfd85..0114d0e 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -2,22 +2,30 @@
 
 There are 3 Go packages for proton:
 
-- qpid.apache.org/proton/concurrent: Easy-to-use, concurrent API for concurrent clients and servers.
-- qpid.apache.org/proton/amqp: Convert AMQP messages and data to and from Go data types.
+- qpid.apache.org/electron: Concurrent, procedural API for messaging clients and servers.
 - qpid.apache.org/proton: Direct access to the event-driven, concurrent-unsafe proton library.
+- qpid.apache.org/amqp: Convert AMQP messages and data to and from Go data types.
 
-Most applications should use the `concurrent` package. The `proton` package is
-for applications that need low-level access to the proton library.
+`proton` and `electron` are alternative APIs for sending messages. `proton` is a
+direct wrapping of the concurrent-unsafe, event-driven C proton API. `electron`
+is a procedural, concurrent-safe interface that may be more convenient and
+familiar for Go programmers. The examples `proton/broker.go` and
+`electron/broker.go` give an illustration of how the APIs differ.
 
 ## Example programs
 
-- [receive.go](receive.go) receive from many connections concurrently.
-- [send.go](send.go) send to many connections concurrently.
+electron
+- [receive.go](electron/receive.go) receive from many connections concurrently.
+- [send.go](electron/send.go) send to many connections concurrently.
+- [broker.go](electron/broker.go) a simple broker using the electron API
+
+proton
+- [broker.go](proton/broker.go) a simple broker using the proton API
 
 ## Using the Go packages
 
-Use `go get qpid.apache.org/proton/concurrent` or check out the proton
-repository and set your GOPATH environment variable to include
+Use `go get qpid.apache.org/electron` or check out the proton repository and set
+your GOPATH environment variable to include
 `/<path-to-proton>/proton-c/bindings/go`
 
 The proton Go packages include C code so the cgo compiler needs to be able to

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/broker.go b/examples/go/broker.go
deleted file mode 100644
index 47d0a76..0000000
--- a/examples/go/broker.go
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-//
-// This is a simple AMQP broker implemented using the concurrent interface.
-//
-// It maintains a set of named in-memory queues of messages. Clients can send
-// messages to queues or subscribe to receive messages from them.
-//
-//
-
-package main
-
-import (
-	"./util"
-	"flag"
-	"fmt"
-	"log"
-	"net"
-	"os"
-	"qpid.apache.org/proton/concurrent"
-)
-
-// Usage and command-line flags
-func usage() {
-	fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
-`, os.Args[0])
-	flag.PrintDefaults()
-}
-
-var addr = flag.String("addr", ":amqp", "Listening address")
-
-func main() {
-	flag.Usage = usage
-	flag.Parse()
-	b := newBroker()
-	err := b.listen(*addr)
-	util.ExitIf(err)
-}
-
-type broker struct {
-	container concurrent.Container
-	queues    util.QueueMap
-}
-
-func newBroker() *broker {
-	return &broker{
-		container: concurrent.NewContainer(""),
-		queues:    util.MakeQueueMap(),
-	}
-}
-
-// Listen for incoming connections
-func (b *broker) listen(addr string) (err error) {
-	listener, err := net.Listen("tcp", addr)
-	if err != nil {
-		return err
-	}
-	log.Printf("Listening on %s\n", listener.Addr())
-	defer listener.Close()
-	for {
-		conn, err := listener.Accept()
-		if err != nil {
-			return err
-		}
-		c, err := b.container.Connection(conn)
-		if err != nil {
-			return err
-		}
-		// Make this a server connection. Must be done before Open()
-		c.Server() // Server-side protocol negotiation.
-		c.Listen() // Enable remotely-opened endpoints.
-		if err := c.Open(); err != nil {
-			return err
-		}
-		util.Debugf("accept %s\n", c)
-		// Accept remotely-opened endpoints on the connection
-		go b.accept(c)
-	}
-}
-
-// accept remotely-opened endpoints (Session, Sender and Receiver)
-func (b *broker) accept(c concurrent.Connection) {
-	for ep, err := c.Accept(); err == nil; ep, err = c.Accept() {
-		switch ep := ep.(type) {
-		case concurrent.Session:
-			util.Debugf("accept session %s\n", ep)
-			ep.Open()
-		case concurrent.Sender:
-			util.Debugf("accept sender %s\n", ep)
-			ep.Open()
-			go b.sender(ep)
-		case concurrent.Receiver:
-			util.Debugf("accept receiver %s\n", ep)
-			ep.SetCapacity(100, true) // Pre-fetch 100 messages
-			ep.Open()
-			go b.receiver(ep)
-		}
-	}
-}
-
-// sender pops from a the queue in the sender's Source address and send messages.
-func (b *broker) sender(sender concurrent.Sender) {
-	qname := sender.Settings().Source
-	if qname == "" {
-		log.Printf("invalid consumer, no source address: %s", sender)
-		return
-	}
-	q := b.queues.Get(qname)
-	for {
-		m := <-q.Pop
-		if m == nil {
-			break
-		}
-		if sm, err := sender.Send(m); err == nil {
-			sm.Forget() // FIXME aconway 2015-09-24: Ignore acknowledgements
-			util.Debugf("send %s: %s\n", sender, util.FormatMessage(m))
-		} else {
-			util.Debugf("send error %s: %s\n", sender, err)
-			q.Putback <- m
-			break
-		}
-	}
-}
-
-func (b *broker) receiver(receiver concurrent.Receiver) {
-	qname := receiver.Settings().Target
-	if qname == "" {
-		log.Printf("invalid producer, no target address: %s", receiver)
-		return
-	}
-	q := b.queues.Get(qname)
-	for {
-		if rm, err := receiver.Receive(); err == nil {
-			util.Debugf("recv %s: %s\n", receiver, util.FormatMessage(rm.Message))
-			q.Push <- rm.Message
-			rm.Accept()
-		} else {
-			util.Debugf("recv error %s: %s\n", receiver, err)
-			break
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
new file mode 100644
index 0000000..4b877df
--- /dev/null
+++ b/examples/go/electron/broker.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the procedural electron package.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+
+package main
+
+import (
+	"../util"
+	"flag"
+	"fmt"
+	"net"
+	"os"
+	"qpid.apache.org/electron"
+)
+
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var addr = flag.String("addr", ":amqp", "Listening address")
+var credit = flag.Int("credit", 100, "Receiver credit window")
+var qsize = flag.Int("qsize", 1000, "Max queue size")
+
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+
+	b := newBroker()
+	listener, err := net.Listen("tcp", *addr)
+	util.ExitIf(err)
+	defer listener.Close()
+	fmt.Printf("Listening on %s\n", listener.Addr())
+
+	// Loop accepting new connections.
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			util.Debugf("Accept error: %s", err)
+			continue
+		}
+		if err := b.connection(conn); err != nil {
+			if err != nil {
+				util.Debugf("Connection error: %s", err)
+				continue
+			}
+		}
+	}
+}
+
+type broker struct {
+	queues    util.Queues
+	container electron.Container
+}
+
+func newBroker() *broker {
+	return &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+}
+
+// connection creates a new AMQP connection for a net.Conn.
+func (b *broker) connection(conn net.Conn) error {
+	c, err := b.container.Connection(conn)
+	if err != nil {
+		return err
+	}
+	c.Server()         // Enable server-side protocol negotiation.
+	c.Listen(b.accept) // Call accept() for remotely-opened endpoints.
+	if err := c.Open(); err != nil {
+		return err
+	}
+	util.Debugf("Accepted %s", c)
+	return nil
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver)
+// and start goroutines to service them.
+func (b *broker) accept(ep electron.Endpoint) error {
+	switch ep := ep.(type) {
+	case electron.Sender:
+		util.Debugf("%s opened", ep)
+		go b.sender(ep)
+	case electron.Receiver:
+		util.Debugf("%s opened", ep)
+		ep.SetCapacity(100, true) // Pre-fetch 100 messages
+		go b.receiver(ep)
+	}
+	return nil
+}
+
+// sender pops messages from a queue and sends them.
+func (b *broker) sender(sender electron.Sender) {
+	q := b.queues.Get(sender.Source())
+	for {
+		m, ok := <-q
+		if !ok { // Queue closed
+			return
+		}
+		if err := sender.SendForget(m); err == nil {
+			util.Debugf("send %s: %s", sender, util.FormatMessage(m))
+		} else {
+			util.Debugf("send error %s: %s", sender, err)
+			q <- m // Put it back on the queue.
+			break
+		}
+	}
+}
+
+// receiver receives messages and pushes to the queue named by the receivers's
+// Target address
+func (b *broker) receiver(receiver electron.Receiver) {
+	q := b.queues.Get(receiver.Target())
+	for {
+		if rm, err := receiver.Receive(); err == nil {
+			util.Debugf("%s: received %s", receiver, util.FormatMessage(rm.Message))
+			q <- rm.Message
+			rm.Accept()
+		} else {
+			util.Debugf("%s: error %s", receiver, err)
+			break
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
new file mode 100644
index 0000000..7639375
--- /dev/null
+++ b/examples/go/electron/receive.go
@@ -0,0 +1,122 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"../util"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"path"
+	"qpid.apache.org/amqp"
+	"qpid.apache.org/electron"
+	"sync"
+)
+
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
+
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		log.Println("No URL provided")
+		usage()
+		os.Exit(1)
+	}
+
+	messages := make(chan amqp.Message) // Channel for messages from goroutines to main()
+	stop := make(chan struct{})         // Closing this channel means the program is stopping.
+	var wait sync.WaitGroup             // Used by main() to wait for all goroutines to end.
+	wait.Add(len(urls))                 // Wait for one goroutine per URL.
+
+	_, prog := path.Split(os.Args[0])
+	container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+	connections := make(chan electron.Connection, len(urls)) // Connections to close on exit
+
+	// Start a goroutine to for each URL to receive messages and send them to the messages channel.
+	// main() receives and prints them.
+	for _, urlStr := range urls {
+		util.Debugf("Connecting to %s\n", urlStr)
+		go func(urlStr string) { // Start the goroutine
+
+			defer wait.Done()                 // Notify main() when this goroutine is done.
+			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			util.ExitIf(err)
+
+			// Open a new connection
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			util.ExitIf(err)
+			c, err := container.Connection(conn)
+			util.ExitIf(err)
+			util.ExitIf(c.Open())
+			connections <- c // Save connection so we can Close() when main() ends
+
+			// Create a Receiver using the path of the URL as the source address
+			r, err := c.Receiver(electron.Source(url.Path))
+			util.ExitIf(err)
+
+			// Loop receiving messages and sending them to the main() goroutine
+			for {
+				rm, err := r.Receive()
+				if err == electron.Closed {
+					return
+				}
+				util.ExitIf(err)
+				select { // Send m to main() or stop
+				case messages <- rm.Message: // Send to main()
+				case <-stop: // The program is stopping.
+					return
+				}
+			}
+		}(urlStr)
+	}
+
+	// All goroutines are started, we are receiving messages.
+	fmt.Printf("Listening on %d connections\n", len(urls))
+
+	// print each message until the count is exceeded.
+	for i := uint64(0); i < *count; i++ {
+		m := <-messages
+		util.Debugf("%s\n", util.FormatMessage(m))
+	}
+	fmt.Printf("Received %d messages\n", *count)
+
+	// Close all connections, this will interrupt goroutines blocked in Receiver.Receive()
+	for i := 0; i < len(urls); i++ {
+		c := <-connections
+		util.Debugf("close %s", c)
+		c.Close(nil)
+	}
+	close(stop) // Signal all goroutines to stop.
+	wait.Wait() // Wait for all goroutines to finish.
+	close(messages)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
new file mode 100644
index 0000000..94a77e7
--- /dev/null
+++ b/examples/go/electron/send.go
@@ -0,0 +1,123 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"../util"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"path"
+	"qpid.apache.org/amqp"
+	"qpid.apache.org/electron"
+	"sync"
+)
+
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Send messages to each URL concurrently with body "<url-path>-<n>" where n is the message number.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var count = flag.Int64("count", 1, "Send this may messages per address.")
+
+type sent struct {
+	name        string
+	sentMessage electron.SentMessage
+}
+
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		log.Println("No URL provided")
+		flag.Usage()
+		os.Exit(1)
+	}
+
+	sentChan := make(chan sent) // Channel to receive all the delivery receipts.
+	var wait sync.WaitGroup     // Used by main() to wait for all goroutines to end.
+	wait.Add(len(urls))         // Wait for one goroutine per URL.
+
+	_, prog := path.Split(os.Args[0])
+	container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+	var connections []electron.Connection // Store connctions to close on exit
+
+	// Start a goroutine for each URL to send messages.
+	for _, urlStr := range urls {
+		util.Debugf("Connecting to %v\n", urlStr)
+		go func(urlStr string) {
+
+			defer wait.Done()                 // Notify main() that this goroutine is done.
+			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			util.ExitIf(err)
+
+			// Open a new connection
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			util.ExitIf(err)
+			c, err := container.Connection(conn)
+			util.ExitIf(err)
+			err = c.Open()
+			util.ExitIf(err)
+			connections = append(connections, c) // Save connection so it will be closed when main() ends
+
+			// Create a Sender using the path of the URL as the AMQP address
+			s, err := c.Sender(electron.Target(url.Path))
+			util.ExitIf(err)
+
+			// Loop sending messages.
+			for i := int64(0); i < *count; i++ {
+				m := amqp.NewMessage()
+				body := fmt.Sprintf("%v-%v", url.Path, i)
+				m.Marshal(body)
+				sentMessage, err := s.Send(m)
+				util.ExitIf(err)
+				sentChan <- sent{body, sentMessage}
+			}
+		}(urlStr)
+	}
+
+	// Wait for all the acknowledgements
+	expect := int(*count) * len(urls)
+	util.Debugf("Started senders, expect %v acknowledgements\n", expect)
+	for i := 0; i < expect; i++ {
+		d := <-sentChan
+		disposition, err := d.sentMessage.Disposition()
+		if err != nil {
+			util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
+		} else {
+			util.Debugf("acknowledgement[%v]  %v (%v)\n", i, d.name, disposition)
+		}
+	}
+	fmt.Printf("Received all %v acknowledgements\n", expect)
+
+	wait.Wait()                     // Wait for all goroutines to finish.
+	for _, c := range connections { // Close all connections
+		if c != nil {
+			c.Close(nil)
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index 2afd95c..a4d7d80 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -110,14 +110,14 @@ func checkEqual(want interface{}, got interface{}) error {
 // 'go build' uses the installed copy of the proton Go libraries, which may be out of date.
 func checkStaleLibs(t *testing.T) {
 	var stale []string
-	pp := "qpid.apache.org/proton"
-	for _, p := range []string{pp, pp + "/amqp", pp + "/concurrent"} {
+	pp := "qpid.apache.org"
+	for _, p := range []string{pp + "/proton", pp + "/amqp", pp + "/electron"} {
 		out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput()
 		if err != nil {
 			t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out))
 		}
 		if string(out) != "false\n" {
-			stale = append(stale, pp)
+			stale = append(stale, p)
 		}
 	}
 	if len(stale) > 0 {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
new file mode 100644
index 0000000..dbb4a82
--- /dev/null
+++ b/examples/go/proton/broker.go
@@ -0,0 +1,299 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the event-driven proton package.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+
+package main
+
+import (
+	"../util"
+	"flag"
+	"fmt"
+	"io"
+	"net"
+	"os"
+	"qpid.apache.org/amqp"
+	"qpid.apache.org/proton"
+)
+
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var addr = flag.String("addr", ":amqp", "Listening address")
+var credit = flag.Int("credit", 100, "Receiver credit window")
+var qsize = flag.Int("qsize", 1000, "Max queue size")
+
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+
+	b := newBroker()
+	listener, err := net.Listen("tcp", *addr)
+	util.ExitIf(err)
+	defer listener.Close()
+	fmt.Printf("Listening on %s\n", listener.Addr())
+
+	// Loop accepting new connections.
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			util.Debugf("Accept error: %s", err)
+			continue
+		}
+		if err := b.connection(conn); err != nil {
+			if err != nil {
+				util.Debugf("Connection error: %s", err)
+				continue
+			}
+		}
+	}
+}
+
+type broker struct {
+	queues util.Queues
+}
+
+func newBroker() *broker {
+	return &broker{util.MakeQueues(*qsize)}
+}
+
+// connection creates a new AMQP connection for a net.Conn.
+func (b *broker) connection(conn net.Conn) error {
+	delegator := proton.NewMessagingDelegator(newHandler(&b.queues, *credit))
+	// We want to accept messages when they are enqueued, not just when they
+	// are received, so we turn off auto-accept and prefetch by the handler.
+	delegator.Prefetch = 0
+	delegator.AutoAccept = false
+	engine, err := proton.NewEngine(conn, delegator)
+	if err != nil {
+		return err
+	}
+	engine.Server() // Enable server-side protocol negotiation.
+	go func() {     // Start goroutine to run the engine event loop
+		engine.Run()
+		util.Debugf("Closed %s", engine)
+	}()
+	util.Debugf("Accepted %s", engine)
+	return nil
+}
+
+// receiver is a channel to buffer messages waiting to go on the queue.
+type receiver chan receivedMessage
+
+// receivedMessage is a message and the corresponding delivery for acknowledgement.
+type receivedMessage struct {
+	delivery proton.Delivery
+	message  amqp.Message
+}
+
+// sender is a signal channel, closed when we are done sending.
+type sender chan struct{}
+
+// handler handles AMQP events. There is one handler per connection.  The
+// handler does not need to be concurrent-safe as proton will serialize all
+// calls to a handler. We will use channels to communicate from the handler
+// to goroutines sending and receiving messages.
+type handler struct {
+	queues    *util.Queues
+	credit    int // Credit window for receiver flow control.
+	receivers map[proton.Link]receiver
+	senders   map[proton.Link]sender
+}
+
+func newHandler(queues *util.Queues, credit int) *handler {
+	return &handler{
+		queues,
+		credit,
+		make(map[proton.Link]receiver),
+		make(map[proton.Link]sender),
+	}
+}
+
+// Handle an AMQP event.
+func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
+	switch t {
+
+	case proton.MLinkOpening:
+		l := e.Link()
+		var err error
+		if l.IsReceiver() {
+			err = h.receiver(l)
+		} else { // IsSender()
+			err = h.sender(l)
+		}
+		if err == nil {
+			util.Debugf("%s opened", l)
+		} else {
+			util.Debugf("%s open error: %s", l, err)
+			proton.CloseError(l, err)
+		}
+
+	case proton.MLinkClosing:
+		l := e.Link()
+		if r, ok := h.receivers[l]; ok {
+			close(r)
+			delete(h.receivers, l)
+		} else if s, ok := h.senders[l]; ok {
+			close(s)
+			delete(h.senders, l)
+		}
+		util.Debugf("%s closed", l)
+
+	case proton.MSendable:
+		l := e.Link()
+		q := h.queues.Get(l.RemoteSource().Address())
+		if n, err := h.sendAll(e.Link(), q); err == nil && n > 0 {
+			// Still have credit, start a watcher.
+			go h.sendWatch(e.Link(), q)
+		}
+
+	case proton.MMessage:
+		l := e.Link()
+		d := e.Delivery()
+		m, err := d.Message() // Must decode message immediately before link state changes.
+		if err != nil {
+			util.Debugf("%s error decoding message: %s", e.Link(), err)
+			proton.CloseError(l, err)
+		} else {
+			// This will not block, AMQP credit prevents us from overflowing the buffer.
+			h.receivers[l] <- receivedMessage{d, m}
+			util.Debugf("%s received %s", l, util.FormatMessage(m))
+		}
+
+	case proton.MConnectionClosing, proton.MDisconnected:
+		for l, r := range h.receivers {
+			close(r)
+			delete(h.receivers, l)
+		}
+		for l, s := range h.senders {
+			close(s)
+			delete(h.senders, l)
+		}
+	}
+}
+
+// receiver is called by the handler when a receiver link opens.
+//
+// It sets up data structures in the handler and then starts a goroutine
+// to receive messages and put them on a queue.
+func (h *handler) receiver(l proton.Link) error {
+	q := h.queues.Get(l.RemoteTarget().Address())
+	buffer := make(receiver, h.credit)
+	h.receivers[l] = buffer
+	l.Flow(cap(buffer)) // credit==cap(buffer) so we won't overflow the buffer.
+	go h.runReceive(l, buffer, q)
+	return nil
+}
+
+// runReceive moves messages from buffer to queue
+func (h *handler) runReceive(l proton.Link, buffer receiver, q util.Queue) {
+	for rm := range buffer {
+		q <- rm.message
+		rm2 := rm // Save in temp var for injected closure
+		err := l.Connection().Injecter().Inject(func() {
+			rm2.delivery.Accept()
+			l.Flow(1)
+		})
+		if err != nil {
+			util.Debugf("%s receive error: %s", l, err)
+			proton.CloseError(l, err)
+		}
+	}
+}
+
+// sender is called by the handler when a sender link opens.
+// It sets up a sender structures in the handler.
+func (h *handler) sender(l proton.Link) error {
+	h.senders[l] = make(sender)
+	return nil
+}
+
+// send one message in handler context, assumes we have credit.
+func (h *handler) send(l proton.Link, m amqp.Message, q util.Queue) error {
+	delivery, err := l.Send(m)
+	if err != nil {
+		h.closeSender(l, err)
+		return err
+	}
+	delivery.Settle() // Pre-settled, unreliable.
+	util.Debugf("%s sent %s", l, util.FormatMessage(m))
+	return nil
+}
+
+// sendAll sends as many messages as possible without blocking, call in handler context.
+// Returns the number of credits left, >0 means we ran out of messages.
+func (h *handler) sendAll(l proton.Link, q util.Queue) (int, error) {
+	for l.Credit() > 0 {
+		select {
+		case m, ok := <-q:
+			if ok { // Got a message
+				if err := h.send(l, m, q); err != nil {
+					return 0, err
+				}
+			} else { // Queue is closed
+				l.Close()
+				return 0, io.EOF
+			}
+		default: // Queue empty
+			return l.Credit(), nil
+		}
+	}
+	return l.Credit(), nil
+}
+
+// sendWatch watches the queue for more messages and re-runs sendAll.
+// Run in a separate goroutine, so must inject handler functions.
+func (h *handler) sendWatch(l proton.Link, q util.Queue) {
+	select {
+	case m, ok := <-q:
+		l.Connection().Injecter().Inject(func() {
+			if ok {
+				if h.send(l, m, q) != nil {
+					return
+				}
+				if n, err := h.sendAll(l, q); err != nil {
+					return
+				} else if n > 0 {
+					go h.sendWatch(l, q) // Start a new watcher.
+				}
+			}
+		})
+	case <-h.senders[l]: // Closed
+		return
+	}
+}
+
+// closeSender closes a sender link and signals goroutines processing that sender.
+func (h *handler) closeSender(l proton.Link, err error) {
+	util.Debugf("%s sender closed: %s", l, err)
+	proton.CloseError(l, err)
+	close(h.senders[l])
+	delete(h.senders, l)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
deleted file mode 100644
index 86244d7..0000000
--- a/examples/go/receive.go
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
-	"./util"
-	"flag"
-	"fmt"
-	"log"
-	"net"
-	"os"
-	"qpid.apache.org/proton/amqp"
-	"qpid.apache.org/proton/concurrent"
-	"sync"
-)
-
-// Usage and command-line flags
-func usage() {
-	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
-Receive messages from all the listed URLs concurrently and print them.
-`, os.Args[0])
-	flag.PrintDefaults()
-}
-
-var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
-
-func main() {
-	flag.Usage = usage
-	flag.Parse()
-
-	urls := flag.Args() // Non-flag arguments are URLs to receive from
-	if len(urls) == 0 {
-		log.Println("No URL provided")
-		usage()
-		os.Exit(1)
-	}
-
-	messages := make(chan amqp.Message) // Channel for messages from goroutines to main()
-	stop := make(chan struct{})         // Closing this channel means the program is stopping.
-	var wait sync.WaitGroup             // Used by main() to wait for all goroutines to end.
-	wait.Add(len(urls))                 // Wait for one goroutine per URL.
-
-	container := concurrent.NewContainer("")
-	connections := make(chan concurrent.Connection, len(urls)) // Connections to close on exit
-
-	// Start a goroutine to for each URL to receive messages and send them to the messages channel.
-	// main() receives and prints them.
-	for _, urlStr := range urls {
-		util.Debugf("Connecting to %s\n", urlStr)
-		go func(urlStr string) { // Start the goroutine
-
-			defer wait.Done()                 // Notify main() when this goroutine is done.
-			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			util.ExitIf(err)
-
-			// Open a new connection
-			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			util.ExitIf(err)
-			c, err := container.Connection(conn)
-			util.ExitIf(err)
-			util.ExitIf(c.Open())
-			connections <- c // Save connection so we can Close() when main() ends
-
-			// Create a Receiver using the path of the URL as the source address
-			r, err := c.Receiver(url.Path)
-			util.ExitIf(err)
-
-			// Loop receiving messages and sending them to the main() goroutine
-			for {
-				rm, err := r.Receive()
-				if err == concurrent.Closed {
-					return
-				}
-				util.ExitIf(err)
-				select { // Send m to main() or stop
-				case messages <- rm.Message: // Send to main()
-				case <-stop: // The program is stopping.
-					return
-				}
-			}
-		}(urlStr)
-	}
-
-	// All goroutines are started, we are receiving messages.
-	fmt.Printf("Listening on %d connections\n", len(urls))
-
-	// print each message until the count is exceeded.
-	for i := uint64(0); i < *count; i++ {
-		m := <-messages
-		util.Debugf("%s\n", util.FormatMessage(m))
-	}
-	fmt.Printf("Received %d messages\n", *count)
-
-	// Close all connections, this will interrupt goroutines blocked in Receiver.Receive()
-	for i := 0; i < len(urls); i++ {
-		c := <-connections
-		c.Disconnect(nil) // FIXME aconway 2015-09-25: Close
-	}
-	close(stop) // Signal all goroutines to stop.
-	wait.Wait() // Wait for all goroutines to finish.
-	close(messages)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
deleted file mode 100644
index edac2ae..0000000
--- a/examples/go/send.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
-	"./util"
-	"flag"
-	"fmt"
-	"log"
-	"net"
-	"os"
-	"qpid.apache.org/proton/amqp"
-	"qpid.apache.org/proton/concurrent"
-	"sync"
-)
-
-// Usage and command-line flags
-func usage() {
-	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
-Send messages to each URL concurrently with body "<url-path>-<n>" where n is the message number.
-`, os.Args[0])
-	flag.PrintDefaults()
-}
-
-var count = flag.Int64("count", 1, "Send this may messages per address.")
-
-type sent struct {
-	name        string
-	sentMessage concurrent.SentMessage
-}
-
-func main() {
-	flag.Usage = usage
-	flag.Parse()
-
-	urls := flag.Args() // Non-flag arguments are URLs to receive from
-	if len(urls) == 0 {
-		log.Println("No URL provided")
-		flag.Usage()
-		os.Exit(1)
-	}
-
-	sentChan := make(chan sent) // Channel to receive all the delivery receipts.
-	var wait sync.WaitGroup     // Used by main() to wait for all goroutines to end.
-	wait.Add(len(urls))         // Wait for one goroutine per URL.
-
-	container := concurrent.NewContainer("")
-	var connections []concurrent.Connection // Store connctions to close on exit
-
-	// Start a goroutine for each URL to send messages.
-	for _, urlStr := range urls {
-		util.Debugf("Connecting to %v\n", urlStr)
-		go func(urlStr string) {
-
-			defer wait.Done()                 // Notify main() that this goroutine is done.
-			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			util.ExitIf(err)
-
-			// Open a new connection
-			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			util.ExitIf(err)
-			c, err := container.Connection(conn)
-			util.ExitIf(err)
-			err = c.Open()
-			util.ExitIf(err)
-			connections = append(connections, c) // Save connection so it will be closed when main() ends
-
-			// Create a Sender using the path of the URL as the AMQP address
-			s, err := c.Sender(url.Path)
-			util.ExitIf(err)
-
-			// Loop sending messages.
-			for i := int64(0); i < *count; i++ {
-				m := amqp.NewMessage()
-				body := fmt.Sprintf("%v-%v", url.Path, i)
-				m.Marshal(body)
-				sentMessage, err := s.Send(m)
-				util.ExitIf(err)
-				sentChan <- sent{body, sentMessage}
-			}
-		}(urlStr)
-	}
-
-	// Wait for all the acknowledgements
-	expect := int(*count) * len(urls)
-	util.Debugf("Started senders, expect %v acknowledgements\n", expect)
-	for i := 0; i < expect; i++ {
-		d := <-sentChan
-		disposition, err := d.sentMessage.Disposition()
-		if err != nil {
-			util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
-		} else {
-			util.Debugf("acknowledgement[%v]  %v (%v)\n", i, d.name, disposition)
-		}
-	}
-	fmt.Printf("Received all %v acknowledgements\n", expect)
-
-	wait.Wait()                     // Wait for all goroutines to finish.
-	for _, c := range connections { // Close all connections
-		if c != nil {
-			c.Close(nil)
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
index 075c4d2..d844c0d 100644
--- a/examples/go/util/queue.go
+++ b/examples/go/util/queue.go
@@ -20,87 +20,32 @@ under the License.
 package util
 
 import (
-	"container/list"
-	"qpid.apache.org/proton/amqp"
+	"qpid.apache.org/amqp"
 	"sync"
 )
 
-// Queue is a concurrent-safe queue of amqp.Message.
-type Queue struct {
-	name     string
-	messages list.List // List of amqp.Message
-	// Send to Push to push a message onto back of queue
-	Push chan amqp.Message
-	// Receive from Pop to pop a message from the front of the queue.
-	Pop chan amqp.Message
-	// Send to Putback to put an unsent message back on the front of the queue.
-	Putback chan amqp.Message
-}
+// Use a buffered channel as a very simple queue.
+type Queue chan amqp.Message
 
-func NewQueue(name string) *Queue {
-	q := &Queue{
-		name:    name,
-		Push:    make(chan amqp.Message),
-		Pop:     make(chan amqp.Message),
-		Putback: make(chan amqp.Message),
-	}
-	go q.run()
-	return q
+// Concurrent-safe map of queues.
+type Queues struct {
+	queueSize int
+	m         map[string]Queue
+	lock      sync.Mutex
 }
 
-// Close the queue. Any remaining messages on Pop can still be received.
-func (q *Queue) Close() { close(q.Push); close(q.Putback) }
-
-// Run runs the queue, returns when q.Close() is called.
-func (q *Queue) run() {
-	defer close(q.Pop)
-	for {
-		var pop chan amqp.Message
-		var front amqp.Message
-		if el := q.messages.Front(); el != nil {
-			front = el.Value.(amqp.Message)
-			pop = q.Pop // Only select for pop if there is something to pop.
-		}
-		select {
-		case m, ok := <-q.Push:
-			if !ok {
-				return
-			}
-			Debugf("%s push: %s\n", q.name, FormatMessage(m))
-			q.messages.PushBack(m)
-		case m, ok := <-q.Putback:
-			Debugf("%s put-back: %s\n", q.name, FormatMessage(m))
-			if !ok {
-				return
-			}
-			q.messages.PushFront(m)
-		case pop <- front:
-			Debugf("%s pop: %s\n", q.name, FormatMessage(front))
-			q.messages.Remove(q.messages.Front())
-		}
-	}
+func MakeQueues(queueSize int) Queues {
+	return Queues{queueSize: queueSize, m: make(map[string]Queue)}
 }
 
-// QueueMap is a concurrent-safe map of queues that creates new queues
-// on demand.
-type QueueMap struct {
-	lock sync.Mutex
-	m    map[string]*Queue
-}
-
-func MakeQueueMap() QueueMap { return QueueMap{m: make(map[string]*Queue)} }
-
-func (qm *QueueMap) Get(name string) *Queue {
-	if name == "" {
-		panic("Attempt to get queue with no name")
-	}
-	qm.lock.Lock()
-	defer qm.lock.Unlock()
-	q := qm.m[name]
+// Create a queue if not found.
+func (qs *Queues) Get(name string) Queue {
+	qs.lock.Lock()
+	defer qs.lock.Unlock()
+	q := qs.m[name]
 	if q == nil {
-		q = NewQueue(name)
-		qm.m[name] = q
-		Debugf("queue %s create", name)
+		q = make(Queue, qs.queueSize)
+		qs.m[name] = q
 	}
 	return q
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
index 72c6646..f158386 100644
--- a/examples/go/util/util.go
+++ b/examples/go/util/util.go
@@ -27,7 +27,7 @@ import (
 	"log"
 	"os"
 	"path"
-	"qpid.apache.org/proton/amqp"
+	"qpid.apache.org/amqp"
 )
 
 // Debug flag "-debug" enables debug output with Debugf

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt
index 0631eae..d24bf2e 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -45,8 +45,8 @@ if (BUILD_GO)
 
   # Install packages in the source tree, go tools aren't friendly otherwise.
   # All build output goes in git-ignored pkg or bin subdirectories.
-  set(qgo "qpid.apache.org/proton")
-  set(packages ${qgo} ${qgo}/amqp ${qgo}/concurrent ${qgo}/internal)
+  set(q "qpid.apache.org")
+  set(packages ${q}/amqp ${q}/internal ${q}/proton ${q}/electron)
   add_custom_target(go-packages ALL
     COMMAND ${GO_INSTALL} ${packages}
     WORKING_DIRECTORY ${CMAKE_BINARY_DIR}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
deleted file mode 100644
index 14ccf0f..0000000
--- a/proton-c/bindings/go/README.md
+++ /dev/null
@@ -1,150 +0,0 @@
-# Go binding for proton
-
-This is a a [Go](http://golang.org) binding for proton.
-The API is subject to change but is stabilizing.
-
-Feedback is strongly encouraged:
-
-- Email <proton@qpid.apache.org>
-- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
-
-The package documentation is available at: <http://godoc.org/qpid.apache.org/proton>
-
-See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.mdw)
-for working examples and practical instructions on how to get started.
-
-The rest of this page discusses the high-level goals and design issues.
-
-## Goals
-
-The API should
-
-- be idiomatic, unsurprising, and easy to use for Go developers.
-- support client and server development.
-- make simple tasks simple.
-- provide deep access to AMQP protocol when that is required.
-
-There are two types of developer we want to support
-
-1. Go developers using AMQP as a message transport:
-   - Straightforward conversions between Go built-in types and AMQP types.
-   - Easy message exchange via Go channels to support use in goroutines.
-
-2. AMQP-aware developers using Go as an implementation language:
-   - Go types to exactly represent all AMQP types and encoding details.
-   - Full access to detailed AMQP concepts: sessions, links, deliveries etc.
-
-## Status
-
-There are 3 go packages for proton:
-
-- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and from Go data types.
-- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for clients and servers.
-- qpid.apache.org/proton: full low-level access to the proton engine.
-
-The `amqp` package provides conversions between AMQP and Go data types that are
-used by the other two packages.
-
-The `concurrent` package provides a simple procedural API that can be used with
-goroutines to construct concurrent AMQP clients and servers.
-
-The `proton` package is a concurrency-unsafe, event-driven API. It is a very
-thin wrapper providing almost direct access to the underlying proton C API.
-
-The `concurrent` package will probably be more familiar and convenient to Go
-programmers for most use cases. The `proton` package may be more familiar if
-you have used proton in other languages.
-
-Note the `concurrent` package itself is implemented in terms of the `proton`
-package. It takes care of running concurrency-unsafe `proton` code in dedicated
-goroutines and setting up channels to move data between user and proton
-goroutines safely. It hides all this complexity behind a simple procedural
-interface rather than presenting an event-driven interface.
-
-See the [examples](../../../examples/go/README.md) for a better illustration of the APIs.
-
-### Why two APIs?
-
-Go is a concurrent language and encourages applications to be divided into
-concurrent *goroutines*. It provides traditional locking but it encourages the
-use *channels* to communicate between goroutines without explicit locks:
-
-  "Share memory by communicating, don't communicate by sharing memory"
-
-The idea is that a given value is only operated on by one goroutine at a time,
-but values can easily be passed from one goroutine to another.
-
-Go literature distinguishes between:
-
-- *concurrency*: "keeping track of things that could be done in parallel"
-- *parallelism*: "actually doing things in parallel"
-
-The application expresses concurrency by starting goroutines for potentially
-concurrent tasks. The Go run-times schedule the activity of goroutines onto a
-small number (possibly one) of actual parallel executions.
-
-Even with *no* parallelism, concurrency lets the Go run-times *order* work with
-respect to events like file descriptors being readable/writable, channels having
-data, timers firing etc. Go automatically takes care of switching out goroutines
-that block or sleep so it is normal to write code in terms of blocking calls.
-
-Event-driven programming (such as poll, epoll, select or the `proton` package)
-also channels unpredictably ordered events to actions in one or a small pool of
-execution threads. However this requires a different style of programming:
-"event-driven" or "reactive" programming. Go developers call it "inside-out"
-programming. In an event-driven architecture blocking is a big problem as it
-consumes a scarce thread of execution, so actions that take time to complete
-have to be re-structured in terms of future event delivery.
-
-The promise of Go is that you can express your application in concurrent,
-procedural terms with simple blocking calls and the Go run-times will turn it
-inside-out for you. Write as many goroutines as you want, and let Go interleave
-and schedule them efficiently.
-
-For example: the Go equivalent of listening for connections is a goroutine with
-a simple endless loop that calls a blocking Listen() function and starts a
-goroutine for each new connection. Each connection has its own goroutine that
-deals with just that connection till it closes.
-
-The benefit is that the variables and logic live closer together. Once you're in
-a goroutine, you have everything you need in local variables, and they are
-preserved across blocking calls. There's no need to store details in context
-objects that you have to look up when handling a later event to figure out how
-to continue where you left off.
-
-The `proton` API is important because it is close to the original proton-C
-reactive API and gives you direct access to the underlying library. However it
-is un-Go-like in it's event-driven nature, and it requires care as methods on
-values associated with the same underlying proton engine are not
-concurrent-safe.
-
-The `concurrent` API hides the event-driven details behind a simple blocking API
-that can be safely called from arbitrary goroutines. Under the covers data is
-passed through channels to dedicated goroutines running separate `proton` event
-loops for each connection.
-
-### Design of the concurrent API
-
-Code from the `proton` package runs _only_ in a dedicated goroutine (per
-connection). This makes it safe to use proton C data structures associated with
-that connection.
-
-Code in the `concurrent` package can run in any goroutine, and holds `proton`
-package values with proton object pointers.  To use those values, it "injects" a
-function into the proton goroutine via a special channel. Injected functions
-can use temporary channels to allow the calling code to wait for results. Such
-waiting is only for the local event-loop, not across network calls.
-
-The API exposes blocking calls returning normal error values. The user can write
-simple blocking code or start their own goroutine loops and channels as
-appropriate. Details of our internal channel use and error handling are hidden,
-which simplifies the API and gives us more implementation flexibility.
-
-## New to Go?
-
-If you are new to Go then these are a good place to start:
-
-- [A Tour of Go](http://tour.golang.org)
-- [Effective Go](http://golang.org/doc/effective_go.html)
-
-Then look at the tools and library docs at <http://golang.org> as you need them.
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
new file mode 120000
index 0000000..38521ba
--- /dev/null
+++ b/proton-c/bindings/go/README.md
@@ -0,0 +1 @@
+src/qpid.apache.org/README.md
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/README.md b/proton-c/bindings/go/src/qpid.apache.org/README.md
new file mode 100644
index 0000000..b99047d
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/README.md
@@ -0,0 +1,105 @@
+# Qpid Go Libraries for AMQP
+
+These packages provide [Go](http://golang.org) support for sending and receiving AMQP
+messages in client or server applications.
+
+Package documentation is available at: <http://godoc.org/qpid.apache.org/>
+
+See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+for working examples and practical instructions on how to get started.
+
+Feedback is encouraged at:
+
+- Email <proton@qpid.apache.org>
+- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
+
+## Status
+
+There are 3 go packages for proton:
+
+`qpid.apache.org/electron`:  procedural, concurrent-safe Go library for AMQP messaging.
+A simple procedural API that can easily be used with goroutines and channels to construct
+concurrent AMQP clients and servers.
+
+`qpid.apache.org/proton`: event-driven, concurrent-unsafe Go library for AMQP messaging.
+A simple port into Go of the Proton C library. Its event-driven, single-threaded nature
+may be off-putting for Go programmers, hence the electron API.
+
+`qpid.apache.org/amqp`: converts AMQP messages and data types to and from Go data types.
+Used by both the proton and electron packages to represent AMQP types.
+
+See the
+[examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+for an illustration of the APIs, in particular compare `proton/broker.go` and
+`electron/broker.go` which illustrate the different API approaches to the same
+task (a simple broker.)
+
+
+### Why two APIs?
+
+Go is a concurrent language and encourages applications to be divided into
+concurrent *goroutines*. It provides traditional locking but it encourages the
+use *channels* to communicate between goroutines without explicit locks:
+
+  "Share memory by communicating, don't communicate by sharing memory"
+
+The idea is that a given value is only operated on by one goroutine at a time,
+but values can easily be passed from one goroutine to another.
+
+Go literature distinguishes between:
+
+- *concurrency*: "keeping track of things that could be done in parallel"
+- *parallelism*: "actually doing things in parallel"
+
+The application expresses concurrency by starting goroutines for potentially
+concurrent tasks. The Go run-times schedule the activity of goroutines onto a
+small number (possibly one) of actual parallel executions.
+
+Even with *no* parallelism, concurrency lets the Go run-times *order* work with
+respect to events like file descriptors being readable/writable, channels having
+data, timers firing etc. Go automatically takes care of switching out goroutines
+that block or sleep so it is normal to write code in terms of blocking calls.
+
+Event-driven programming (such as poll, epoll, select or the `proton` package)
+also channels unpredictably ordered events to actions in one or a small pool of
+execution threads. However this requires a different style of programming:
+"event-driven" or "reactive" programming. Go developers call it "inside-out"
+programming. In an event-driven architecture blocking is a big problem as it
+consumes a scarce thread of execution, so actions that take time to complete
+have to be re-structured in terms of future event delivery.
+
+The promise of Go is that you can express your application in concurrent,
+procedural terms with simple blocking calls and the Go run-times will turn it
+inside-out for you. Write as many goroutines as you want, and let Go interleave
+and schedule them efficiently.
+
+For example: the Go equivalent of listening for connections is a goroutine with
+a simple endless loop that calls a blocking Listen() function and starts a
+goroutine for each new connection. Each connection has its own goroutine that
+deals with just that connection till it closes.
+
+The benefit is that the variables and logic live closer together. Once you're in
+a goroutine, you have everything you need in local variables, and they are
+preserved across blocking calls. There's no need to store details in context
+objects that you have to look up when handling a later event to figure out how
+to continue where you left off.
+
+The `proton` API is important because it is close to the original proton-C
+reactive API and gives you direct access to the underlying library. However it
+is un-Go-like in it's event-driven nature, and it requires care as methods on
+values associated with the same underlying proton engine are not
+concurrent-safe.
+
+The `electron` API hides the event-driven details behind a simple blocking API
+that can be safely called from arbitrary goroutines. Under the covers data is
+passed through channels to dedicated goroutines running separate `proton` event
+loops for each connection.
+
+## New to Go?
+
+If you are new to Go then these are a good place to start:
+
+- [A Tour of Go](http://tour.golang.org)
+- [Effective Go](http://golang.org/doc/effective_go.html)
+
+Then look at the tools and library docs at <http://golang.org> as you need them.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
new file mode 100644
index 0000000..323c344
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package amqp encodes and decodes AMQP messages and data types as Go types.
+
+It follows the standard 'encoding' libraries pattern. The mapping between AMQP
+and Go types is described in the documentation of the Marshal and Unmarshal
+functions.
+
+AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
+*/
+package amqp
+
+// #cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
new file mode 100644
index 0000000..868dbf3
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+	"fmt"
+	"reflect"
+)
+
+// Error is an AMQP error condition. It has a name and a description.
+// It implements the Go error interface so can be returned as an error value.
+//
+// You can pass amqp.Error to methods that pass an error to a remote endpoint,
+// this gives you full control over what the remote endpoint will see.
+//
+// You can also pass any Go error to such functions, the remote peer
+// will see the equivalent of MakeError(error)
+//
+type Error struct{ Name, Description string }
+
+// Error implements the Go error interface for AMQP error errors.
+func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
+
+// Errorf makes a Error with name and formatted description as per fmt.Sprintf
+func Errorf(name, format string, arg ...interface{}) Error {
+	return Error{name, fmt.Sprintf(format, arg...)}
+}
+
+// MakeError makes an AMQP error from a go error using the Go error type as the name
+// and the err.Error() string as the description.
+func MakeError(err error) Error {
+	return Error{reflect.TypeOf(err).Name(), err.Error()}
+}
+
+var (
+	InternalError      = "amqp:internal-error"
+	NotFound           = "amqp:not-found"
+	UnauthorizedAccess = "amqp:unauthorized-access"
+	DecodeError        = "amqp:decode-error"
+	ResourceLimit      = "amqp:resource-limit"
+	NotAllowed         = "amqp:not-allowed"
+	InvalidField       = "amqp:invalid-field"
+	NotImplemented     = "amqp:not-implemented"
+	ResourceLocked     = "amqp:resource-locked"
+	PreerrorFailed     = "amqp:preerror-failed"
+	ResourceDeleted    = "amqp:resource-deleted"
+	IllegalState       = "amqp:illegal-state"
+	FrameSizeTooSmall  = "amqp:frame-size-too-small"
+)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
new file mode 120000
index 0000000..ad6fcad
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
@@ -0,0 +1 @@
+../../../../../../tests/interop
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
new file mode 100644
index 0000000..b36ef64
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
@@ -0,0 +1,381 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that conversion of Go type to/from AMQP is compatible with other
+// bindings.
+//
+package amqp
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+func checkEqual(want interface{}, got interface{}) error {
+	if !reflect.DeepEqual(want, got) {
+		return fmt.Errorf("%#v != %#v", want, got)
+	}
+	return nil
+}
+
+func getReader(name string) (r io.Reader) {
+	r, err := os.Open("interop/" + name + ".amqp")
+	if err != nil {
+		panic(fmt.Errorf("Can't open %#v: %v", name, err))
+	}
+	return
+}
+
+func remaining(d *Decoder) string {
+	remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
+	return string(remainder)
+}
+
+// checkDecode: want is the expected value, gotPtr is a pointer to a
+// instance of the same type for Decode.
+func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
+
+	if err := d.Decode(gotPtr); err != nil {
+		t.Error("Decode failed", err)
+		return
+	}
+	got := reflect.ValueOf(gotPtr).Elem().Interface()
+	if err := checkEqual(want, got); err != nil {
+		t.Error("Decode bad value:", err)
+		return
+	}
+
+	// Try round trip encoding
+	bytes, err := Marshal(want, nil)
+	if err != nil {
+		t.Error("Marshal failed", err)
+		return
+	}
+	n, err := Unmarshal(bytes, gotPtr)
+	if err != nil {
+		t.Error("Unmarshal failed", err)
+		return
+	}
+	if err := checkEqual(n, len(bytes)); err != nil {
+		t.Error("Bad unmarshal length", err)
+		return
+	}
+	got = reflect.ValueOf(gotPtr).Elem().Interface()
+	if err = checkEqual(want, got); err != nil {
+		t.Error("Bad unmarshal value", err)
+		return
+	}
+}
+
+func TestUnmarshal(t *testing.T) {
+	bytes, err := ioutil.ReadAll(getReader("strings"))
+	if err != nil {
+		t.Error(err)
+	}
+	for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+		var got string
+		n, err := Unmarshal(bytes, &got)
+		if err != nil {
+			t.Error(err)
+		}
+		if want != got {
+			t.Errorf("%#v != %#v", want, got)
+		}
+		bytes = bytes[n:]
+	}
+}
+
+func TestPrimitivesExact(t *testing.T) {
+	d := NewDecoder(getReader("primitives"))
+	// Decoding into exact types
+	var b bool
+	checkDecode(d, true, &b, t)
+	checkDecode(d, false, &b, t)
+	var u8 uint8
+	checkDecode(d, uint8(42), &u8, t)
+	var u16 uint16
+	checkDecode(d, uint16(42), &u16, t)
+	var i16 int16
+	checkDecode(d, int16(-42), &i16, t)
+	var u32 uint32
+	checkDecode(d, uint32(12345), &u32, t)
+	var i32 int32
+	checkDecode(d, int32(-12345), &i32, t)
+	var u64 uint64
+	checkDecode(d, uint64(12345), &u64, t)
+	var i64 int64
+	checkDecode(d, int64(-12345), &i64, t)
+	var f32 float32
+	checkDecode(d, float32(0.125), &f32, t)
+	var f64 float64
+	checkDecode(d, float64(0.125), &f64, t)
+}
+
+func TestPrimitivesCompatible(t *testing.T) {
+	d := NewDecoder(getReader("primitives"))
+	// Decoding into compatible types
+	var b bool
+	var i int
+	var u uint
+	var f float64
+	checkDecode(d, true, &b, t)
+	checkDecode(d, false, &b, t)
+	checkDecode(d, uint(42), &u, t)
+	checkDecode(d, uint(42), &u, t)
+	checkDecode(d, -42, &i, t)
+	checkDecode(d, uint(12345), &u, t)
+	checkDecode(d, -12345, &i, t)
+	checkDecode(d, uint(12345), &u, t)
+	checkDecode(d, -12345, &i, t)
+	checkDecode(d, 0.125, &f, t)
+	checkDecode(d, 0.125, &f, t)
+}
+
+// checkDecodeValue: want is the expected value, decode into a reflect.Value
+func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
+
+	var got, got2 interface{}
+	if err := d.Decode(&got); err != nil {
+		t.Error("Decode failed", err)
+		return
+	}
+	if err := checkEqual(want, got); err != nil {
+		t.Error(err)
+		return
+	}
+	// Try round trip encoding
+	bytes, err := Marshal(got, nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	n, err := Unmarshal(bytes, &got2)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	if err := checkEqual(n, len(bytes)); err != nil {
+		t.Error(err)
+		return
+	}
+	if err := checkEqual(want, got2); err != nil {
+		t.Error(err)
+		return
+	}
+}
+
+func TestPrimitivesInterface(t *testing.T) {
+	d := NewDecoder(getReader("primitives"))
+	checkDecodeInterface(d, true, t)
+	checkDecodeInterface(d, false, t)
+	checkDecodeInterface(d, uint8(42), t)
+	checkDecodeInterface(d, uint16(42), t)
+	checkDecodeInterface(d, int16(-42), t)
+	checkDecodeInterface(d, uint32(12345), t)
+	checkDecodeInterface(d, int32(-12345), t)
+	checkDecodeInterface(d, uint64(12345), t)
+	checkDecodeInterface(d, int64(-12345), t)
+	checkDecodeInterface(d, float32(0.125), t)
+	checkDecodeInterface(d, float64(0.125), t)
+}
+
+func TestStrings(t *testing.T) {
+	d := NewDecoder(getReader("strings"))
+	// Test decoding as plain Go strings
+	for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+		var got string
+		checkDecode(d, want, &got, t)
+	}
+	remains := remaining(d)
+	if remains != "" {
+		t.Errorf("leftover: %s", remains)
+	}
+
+	// Test decoding as specific string types
+	d = NewDecoder(getReader("strings"))
+	var bytes []byte
+	var str, sym string
+	checkDecode(d, []byte("abc\000defg"), &bytes, t)
+	checkDecode(d, "abcdefg", &str, t)
+	checkDecode(d, "abcdefg", &sym, t)
+	checkDecode(d, make([]byte, 0), &bytes, t)
+	checkDecode(d, "", &str, t)
+	checkDecode(d, "", &sym, t)
+	remains = remaining(d)
+	if remains != "" {
+		t.Fatalf("leftover: %s", remains)
+	}
+
+	// Test some error handling
+	d = NewDecoder(getReader("strings"))
+	var s string
+	err := d.Decode(s)
+	if err == nil {
+		t.Fatal("Expected error")
+	}
+	if !strings.Contains(err.Error(), "not a pointer") {
+		t.Error(err)
+	}
+	var i int
+	err = d.Decode(&i)
+	if !strings.Contains(err.Error(), "cannot unmarshal") {
+		t.Error(err)
+	}
+	_, err = Unmarshal([]byte{}, nil)
+	if !strings.Contains(err.Error(), "not enough data") {
+		t.Error(err)
+	}
+	_, err = Unmarshal([]byte("foobar"), nil)
+	if !strings.Contains(err.Error(), "invalid-argument") {
+		t.Error(err)
+	}
+}
+
+func TestEncodeDecode(t *testing.T) {
+	type data struct {
+		s  string
+		i  int
+		u8 uint8
+		b  bool
+		f  float32
+		v  interface{}
+	}
+
+	in := data{"foo", 42, 9, true, 1.234, "thing"}
+
+	buf := bytes.Buffer{}
+	e := NewEncoder(&buf)
+	if err := e.Encode(in.s); err != nil {
+		t.Error(err)
+	}
+	if err := e.Encode(in.i); err != nil {
+		t.Error(err)
+	}
+	if err := e.Encode(in.u8); err != nil {
+		t.Error(err)
+	}
+	if err := e.Encode(in.b); err != nil {
+		t.Error(err)
+	}
+	if err := e.Encode(in.f); err != nil {
+		t.Error(err)
+	}
+	if err := e.Encode(in.v); err != nil {
+		t.Error(err)
+	}
+
+	var out data
+	d := NewDecoder(&buf)
+	if err := d.Decode(&out.s); err != nil {
+		t.Error(err)
+	}
+	if err := d.Decode(&out.i); err != nil {
+		t.Error(err)
+	}
+	if err := d.Decode(&out.u8); err != nil {
+		t.Error(err)
+	}
+	if err := d.Decode(&out.b); err != nil {
+		t.Error(err)
+	}
+	if err := d.Decode(&out.f); err != nil {
+		t.Error(err)
+	}
+	if err := d.Decode(&out.v); err != nil {
+		t.Error(err)
+	}
+
+	if err := checkEqual(in, out); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMap(t *testing.T) {
+	d := NewDecoder(getReader("maps"))
+
+	// Generic map
+	var m Map
+	checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
+
+	// Interface as map
+	var i interface{}
+	checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
+
+	d = NewDecoder(getReader("maps"))
+	// Specific typed map
+	var m2 map[string]int
+	checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
+
+	// Nested map
+	m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
+	bytes, err := Marshal(m, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, err = Unmarshal(bytes, &i)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err = checkEqual(m, i); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestList(t *testing.T) {
+	d := NewDecoder(getReader("lists"))
+	var l List
+	checkDecode(d, List{int32(32), "foo", true}, &l, t)
+	checkDecode(d, List{}, &l, t)
+}
+
+// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
+// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
+func TODO_TestMessage(t *testing.T) {
+	bytes, err := ioutil.ReadAll(getReader("message"))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	m, err := DecodeMessage(bytes)
+	if err != nil {
+		t.Fatal(err)
+	} else {
+		if err := checkEqual(m.Body(), "hello"); err != nil {
+			t.Error(err)
+		}
+	}
+
+	m2 := NewMessageWith("hello")
+	bytes2, err := m2.Encode(nil)
+	if err != nil {
+		t.Error(err)
+	} else {
+		if err = checkEqual(bytes, bytes2); err != nil {
+			t.Error(err)
+		}
+	}
+}
+
+// TODO aconway 2015-03-13: finish the full interop test

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
new file mode 100644
index 0000000..666b4f6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -0,0 +1,250 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"io"
+	"qpid.apache.org/internal"
+	"reflect"
+	"unsafe"
+)
+
+func dataError(prefix string, data *C.pn_data_t) error {
+	err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
+	if err != nil {
+		err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
+	}
+	return err
+}
+
+/*
+Marshal encodes a Go value as AMQP data in buffer.
+If buffer is nil, or is not large enough, a new buffer  is created.
+
+Returns the buffer used for encoding with len() adjusted to the actual size of data.
+
+Go types are encoded as follows
+
+ +-------------------------------------+--------------------------------------------+
+ |Go type                              |AMQP type                                   |
+ +-------------------------------------+--------------------------------------------+
+ |bool                                 |bool                                        |
+ +-------------------------------------+--------------------------------------------+
+ |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)        |
+ +-------------------------------------+--------------------------------------------+
+ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong)  |
+ +-------------------------------------+--------------------------------------------+
+ |float32, float64                     |float, double.                              |
+ +-------------------------------------+--------------------------------------------+
+ |string                               |string                                      |
+ +-------------------------------------+--------------------------------------------+
+ |[]byte, Binary                       |binary                                      |
+ +-------------------------------------+--------------------------------------------+
+ |Symbol                               |symbol                                      |
+ +-------------------------------------+--------------------------------------------+
+ |interface{}                          |the contained type                          |
+ +-------------------------------------+--------------------------------------------+
+ |nil                                  |null                                        |
+ +-------------------------------------+--------------------------------------------+
+ |map[K]T                              |map with K and T converted as above         |
+ +-------------------------------------+--------------------------------------------+
+ |Map                                  |map, may have mixed types for keys, values  |
+ +-------------------------------------+--------------------------------------------+
+ |[]T                                  |list with T converted as above              |
+ +-------------------------------------+--------------------------------------------+
+ |List                                 |list, may have mixed types  values          |
+ +-------------------------------------+--------------------------------------------+
+
+The following Go types cannot be marshaled: uintptr, function, interface, channel
+
+TODO
+
+Go types: array, slice, struct, complex64/128.
+
+AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
+
+Described types.
+
+*/
+func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
+	defer doRecover(&err)
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	marshal(v, data)
+	encode := func(buf []byte) ([]byte, error) {
+		n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
+		switch {
+		case n == int(C.PN_OVERFLOW):
+			return buf, overflow
+		case n < 0:
+			return buf, dataError("marshal error", data)
+		default:
+			return buf[:n], nil
+		}
+	}
+	return encodeGrow(buffer, encode)
+}
+
+const minEncode = 256
+
+// overflow is returned when an encoding function can't fit data in the buffer.
+var overflow = internal.Errorf("buffer too small")
+
+// encodeFn encodes into buffer[0:len(buffer)].
+// Returns buffer with length adjusted for data encoded.
+// If buffer too small, returns overflow as error.
+type encodeFn func(buffer []byte) ([]byte, error)
+
+// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
+// Returns the final buffer.
+func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
+	if buffer == nil || len(buffer) == 0 {
+		buffer = make([]byte, minEncode)
+	}
+	var err error
+	for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
+		buffer = make([]byte, 2*len(buffer))
+	}
+	return buffer, err
+}
+
+func marshal(v interface{}, data *C.pn_data_t) {
+	switch v := v.(type) {
+	case nil:
+		C.pn_data_put_null(data)
+	case bool:
+		C.pn_data_put_bool(data, C.bool(v))
+	case int8:
+		C.pn_data_put_byte(data, C.int8_t(v))
+	case int16:
+		C.pn_data_put_short(data, C.int16_t(v))
+	case int32:
+		C.pn_data_put_int(data, C.int32_t(v))
+	case int64:
+		C.pn_data_put_long(data, C.int64_t(v))
+	case int:
+		if unsafe.Sizeof(0) == 8 {
+			C.pn_data_put_long(data, C.int64_t(v))
+		} else {
+			C.pn_data_put_int(data, C.int32_t(v))
+		}
+	case uint8:
+		C.pn_data_put_ubyte(data, C.uint8_t(v))
+	case uint16:
+		C.pn_data_put_ushort(data, C.uint16_t(v))
+	case uint32:
+		C.pn_data_put_uint(data, C.uint32_t(v))
+	case uint64:
+		C.pn_data_put_ulong(data, C.uint64_t(v))
+	case uint:
+		if unsafe.Sizeof(0) == 8 {
+			C.pn_data_put_ulong(data, C.uint64_t(v))
+		} else {
+			C.pn_data_put_uint(data, C.uint32_t(v))
+		}
+	case float32:
+		C.pn_data_put_float(data, C.float(v))
+	case float64:
+		C.pn_data_put_double(data, C.double(v))
+	case string:
+		C.pn_data_put_string(data, pnBytes([]byte(v)))
+	case []byte:
+		C.pn_data_put_binary(data, pnBytes(v))
+	case Binary:
+		C.pn_data_put_binary(data, pnBytes([]byte(v)))
+	case Symbol:
+		C.pn_data_put_symbol(data, pnBytes([]byte(v)))
+	case Map: // Special map type
+		C.pn_data_put_map(data)
+		C.pn_data_enter(data)
+		for key, val := range v {
+			marshal(key, data)
+			marshal(val, data)
+		}
+		C.pn_data_exit(data)
+	default:
+		switch reflect.TypeOf(v).Kind() {
+		case reflect.Map:
+			putMap(data, v)
+		case reflect.Slice:
+			putList(data, v)
+		default:
+			panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+		}
+	}
+	err := dataError("marshal", data)
+	if err != nil {
+		panic(err)
+	}
+	return
+}
+
+func clearMarshal(v interface{}, data *C.pn_data_t) {
+	C.pn_data_clear(data)
+	marshal(v, data)
+}
+
+func putMap(data *C.pn_data_t, v interface{}) {
+	mapValue := reflect.ValueOf(v)
+	C.pn_data_put_map(data)
+	C.pn_data_enter(data)
+	for _, key := range mapValue.MapKeys() {
+		marshal(key.Interface(), data)
+		marshal(mapValue.MapIndex(key).Interface(), data)
+	}
+	C.pn_data_exit(data)
+}
+
+func putList(data *C.pn_data_t, v interface{}) {
+	listValue := reflect.ValueOf(v)
+	C.pn_data_put_list(data)
+	C.pn_data_enter(data)
+	for i := 0; i < listValue.Len(); i++ {
+		marshal(listValue.Index(i).Interface(), data)
+	}
+	C.pn_data_exit(data)
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+	writer io.Writer
+	buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+	return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+	e.buffer, err = Marshal(v, e.buffer)
+	if err == nil {
+		e.writer.Write(e.buffer)
+	}
+	return err
+}
+
+func replace(data *C.pn_data_t, v interface{}) {
+	C.pn_data_clear(data)
+	marshal(v, data)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message