qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [18/50] [abbrv] qpid-proton git commit: PROTON-827: go binding: enable use of 'go get', reorganize packages names and layout.
Date Mon, 28 Sep 2015 18:09:44 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/amqp/unmarshal.go b/proton-c/bindings/go/amqp/unmarshal.go
new file mode 100644
index 0000000..89ab64a
--- /dev/null
+++ b/proton-c/bindings/go/amqp/unmarshal.go
@@ -0,0 +1,552 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"qpid.apache.org/proton/go/internal"
+	"reflect"
+	"unsafe"
+)
+
+const minDecode = 1024
+
+// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
+type BadUnmarshal struct {
+	// The name of the AMQP type.
+	AMQPType string
+	// The Go type.
+	GoType reflect.Type
+}
+
+func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal {
+	return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)}
+}
+
+func (e BadUnmarshal) Error() string {
+	if e.GoType.Kind() != reflect.Ptr {
+		return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
+	} else {
+		return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
+	}
+}
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+	reader io.Reader
+	buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested.  Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+	return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+	return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+	defer internal.DoRecover(&err)
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	var n int
+	for n == 0 && err == nil {
+		n = unmarshal(data, d.buffer.Bytes(), v)
+		if n == 0 { // n == 0 means not enough data, read more
+			err = d.more()
+			if err != nil {
+				return
+			}
+		}
+	}
+	d.buffer.Next(n)
+	return
+}
+
+/*
+Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
+Types are converted as follows:
+
+ +---------------------------+----------------------------------------------------------------------+
+ |To Go types                |From AMQP types                                                       |
+ +===========================+======================================================================+
+ |bool                       |bool                                                                  |
+ +---------------------------+----------------------------------------------------------------------+
+ |int, int8, int16,          |Equivalent or smaller signed integer type: char, byte, short, int,    |
+ |int32, int64               |long.                                                                 |
+ +---------------------------+----------------------------------------------------------------------+
+ |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: char, ubyte, ushort,     |
+ |uint32, uint64 types       |uint, ulong                                                           |
+ +---------------------------+----------------------------------------------------------------------+
+ |float32, float64           |Equivalent or smaller float or double.                                |
+ +---------------------------+----------------------------------------------------------------------+
+ |string, []byte             |string, symbol or binary.                                             |
+ +---------------------------+----------------------------------------------------------------------+
+ |Symbol                     |symbol                                                                |
+ +---------------------------+----------------------------------------------------------------------+
+ |map[K]T                    |map, provided all keys and values can unmarshal to types K, T         |
+ +---------------------------+----------------------------------------------------------------------+
+ |Map                        |map, any AMQP map                                                     |
+ +---------------------------+----------------------------------------------------------------------+
+ |interface{}                |Any AMQP value can be unmarshaled to an interface{} as follows:       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |AMQP Type               |Go Type in interface{}                       |
+ |                           +========================+=============================================+
+ |                           |bool                    |bool                                         |
+ |                           +------------------------+---------------------------------------------+
+ |                           |char                    |unint8                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |byte,short,int,long     |int8,int16,int32,int64                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64                   |
+ |                           +------------------------+---------------------------------------------+
+ |                           |float, double           |float32, float64                             |
+ |                           +------------------------+---------------------------------------------+
+ |                           |string                  |string                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |symbol                  |Symbol                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |binary                  |Binary                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |nulll                   |nil                                          |
+ |                           +------------------------+---------------------------------------------+
+ |                           |map                     |Map                                          |
+ |                           +------------------------+---------------------------------------------+
+ |                           |list                    |List                                         |
+ +---------------------------+------------------------+---------------------------------------------+
+
+The following Go types cannot be unmarshaled: complex64/128, uintptr, function, interface, channel.
+
+TODO types
+
+AMQP: timestamp, decimal32/64/128, uuid, described, array.
+
+Go: array, struct.
+
+Maps: currently we cannot unmarshal AMQP maps with unhashable key types, need an alternate
+representation for those.
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+	defer internal.DoRecover(&err)
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	n = unmarshal(data, bytes, v)
+	if n == 0 {
+		err = internal.Errorf("not enough data")
+	}
+	return
+}
+
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+	var readSize int64 = minDecode
+	if int64(d.buffer.Len()) > readSize { // Grow by doubling
+		readSize = int64(d.buffer.Len())
+	}
+	var n int64
+	n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+	if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+		err = io.EOF
+	}
+	return err
+}
+
+// unmarshal decodes from bytes and converts into the value pointed to by v.
+// Used by Unmarshal and Decode
+//
+// Returns the number of bytes decoded or 0 if not enough data.
+//
+func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) {
+	n = decode(data, bytes)
+	if n == 0 {
+		return 0
+	}
+	get(data, v)
+	return
+}
+
+// get value from data into value pointed at by v.
+func get(data *C.pn_data_t, v interface{}) {
+	pnType := C.pn_data_type(data)
+
+	switch v := v.(type) {
+	case *bool:
+		switch pnType {
+		case C.PN_BOOL:
+			*v = bool(C.pn_data_get_bool(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *int8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int8(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int8(C.pn_data_get_byte(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *uint8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint8(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint8(C.pn_data_get_ubyte(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *int16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int16(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int16(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int16(C.pn_data_get_short(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *uint16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint16(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint16(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint16(C.pn_data_get_ushort(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *int32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int32(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int32(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int32(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int32(C.pn_data_get_int(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	case *uint32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint32(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint32(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint32(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint32(C.pn_data_get_uint(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *int64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int64(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int64(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int64(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int64(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			*v = int64(C.pn_data_get_long(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *uint64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint64(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint64(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint64(C.pn_data_get_ushort(data))
+		case C.PN_ULONG:
+			*v = uint64(C.pn_data_get_ulong(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *int:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = int(C.pn_data_get_long(data))
+			} else {
+				panic(newBadUnmarshal(pnType, v))
+			}
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *uint:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint(C.pn_data_get_uint(data))
+		case C.PN_ULONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = uint(C.pn_data_get_ulong(data))
+			} else {
+				panic(newBadUnmarshal(pnType, v))
+			}
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *float32:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float32(C.pn_data_get_float(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *float64:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float64(C.pn_data_get_float(data))
+		case C.PN_DOUBLE:
+			*v = float64(C.pn_data_get_double(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *string:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goString(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goString(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goString(C.pn_data_get_binary(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *[]byte:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goBytes(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goBytes(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goBytes(C.pn_data_get_binary(data))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *Binary:
+		switch pnType {
+		case C.PN_BINARY:
+			*v = Binary(goBytes(C.pn_data_get_binary(data)))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *Symbol:
+		switch pnType {
+		case C.PN_SYMBOL:
+			*v = Symbol(goBytes(C.pn_data_get_symbol(data)))
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+
+	case *interface{}:
+		getInterface(data, v)
+
+	default:
+		if reflect.TypeOf(v).Kind() != reflect.Ptr {
+			panic(newBadUnmarshal(pnType, v))
+		}
+		switch reflect.TypeOf(v).Elem().Kind() {
+		case reflect.Map:
+			getMap(data, v)
+		case reflect.Slice:
+			getList(data, v)
+		default:
+			panic(newBadUnmarshal(pnType, v))
+		}
+	}
+	err := dataError("unmarshaling", data)
+	if err != nil {
+		panic(err)
+	}
+	return
+}
+
+// Getting into an interface is driven completely by the AMQP type, since the interface{}
+// target is type-neutral.
+func getInterface(data *C.pn_data_t, v *interface{}) {
+	pnType := C.pn_data_type(data)
+	switch pnType {
+	case C.PN_NULL:
+		*v = nil
+	case C.PN_BOOL:
+		*v = bool(C.pn_data_get_bool(data))
+	case C.PN_UBYTE:
+		*v = uint8(C.pn_data_get_ubyte(data))
+	case C.PN_BYTE:
+		*v = int8(C.pn_data_get_byte(data))
+	case C.PN_USHORT:
+		*v = uint16(C.pn_data_get_ushort(data))
+	case C.PN_SHORT:
+		*v = int16(C.pn_data_get_short(data))
+	case C.PN_UINT:
+		*v = uint32(C.pn_data_get_uint(data))
+	case C.PN_INT:
+		*v = int32(C.pn_data_get_int(data))
+	case C.PN_CHAR:
+		*v = uint8(C.pn_data_get_char(data))
+	case C.PN_ULONG:
+		*v = uint64(C.pn_data_get_ulong(data))
+	case C.PN_LONG:
+		*v = int64(C.pn_data_get_long(data))
+	case C.PN_FLOAT:
+		*v = float32(C.pn_data_get_float(data))
+	case C.PN_DOUBLE:
+		*v = float64(C.pn_data_get_double(data))
+	case C.PN_BINARY:
+		*v = Binary(goBytes(C.pn_data_get_binary(data)))
+	case C.PN_STRING:
+		*v = goString(C.pn_data_get_string(data))
+	case C.PN_SYMBOL:
+		*v = Symbol(goString(C.pn_data_get_symbol(data)))
+	case C.PN_MAP:
+		m := make(Map)
+		get(data, &m)
+		*v = m // FIXME aconway 2015-03-13: avoid the copy?
+	case C.PN_LIST:
+		l := make(List, 0)
+		get(data, &l)
+		*v = l // FIXME aconway 2015-03-13: avoid the copy?
+	default:
+		panic(newBadUnmarshal(pnType, v))
+	}
+}
+
+// get into map pointed at by v
+func getMap(data *C.pn_data_t, v interface{}) {
+	pnType := C.pn_data_type(data)
+	if pnType != C.PN_MAP {
+		panic(newBadUnmarshal(pnType, v))
+	}
+	mapValue := reflect.ValueOf(v).Elem()
+	mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
+	count := int(C.pn_data_get_map(data))
+	if bool(C.pn_data_enter(data)) {
+		for i := 0; i < count/2; i++ {
+			if bool(C.pn_data_next(data)) {
+				key := reflect.New(mapValue.Type().Key())
+				get(data, key.Interface())
+				if bool(C.pn_data_next(data)) {
+					val := reflect.New(mapValue.Type().Elem())
+					get(data, val.Interface())
+					mapValue.SetMapIndex(key.Elem(), val.Elem())
+				}
+			}
+		}
+		C.pn_data_exit(data)
+	}
+}
+
+func getList(data *C.pn_data_t, v interface{}) {
+	pnType := C.pn_data_type(data)
+	if pnType != C.PN_LIST {
+		panic(newBadUnmarshal(pnType, v))
+	}
+	count := int(C.pn_data_get_list(data))
+	listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
+	if bool(C.pn_data_enter(data)) {
+		for i := 0; i < count; i++ {
+			if bool(C.pn_data_next(data)) {
+				val := reflect.New(listValue.Type().Elem())
+				get(data, val.Interface())
+				listValue.Index(i).Set(val.Elem())
+			}
+		}
+		C.pn_data_exit(data)
+	}
+	// FIXME aconway 2015-04-09: avoid the copy?
+	reflect.ValueOf(v).Elem().Set(listValue)
+}
+
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
+func decode(data *C.pn_data_t, bytes []byte) int {
+	if len(bytes) == 0 {
+		return 0
+	}
+	n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
+	if n == int(C.PN_UNDERFLOW) {
+		C.pn_error_clear(C.pn_data_error(data))
+		return 0
+	} else if n <= 0 {
+		panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
+	}
+	return n
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/url.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/amqp/url.go b/proton-c/bindings/go/amqp/url.go
new file mode 100644
index 0000000..58711c7
--- /dev/null
+++ b/proton-c/bindings/go/amqp/url.go
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+/*
+#include <stdlib.h>
+#include <string.h>
+#include <proton/url.h>
+
+// Helper function for setting URL fields.
+typedef void (*setter_fn)(pn_url_t* url, const char* value);
+inline void	set(pn_url_t *url, setter_fn s, const char* value) {
+  s(url, value);
+}
+*/
+import "C"
+
+import (
+	"net"
+	"net/url"
+	"qpid.apache.org/proton/go/internal"
+	"unsafe"
+)
+
+const (
+	amqp  string = "amqp"
+	amqps        = "amqps"
+)
+
+// ParseUrl parses an AMQP URL string and returns a net/url.Url.
+//
+// It is more forgiving than net/url.Parse and allows most of the parts of the
+// URL to be missing, assuming AMQP defaults.
+//
+func ParseURL(s string) (u *url.URL, err error) {
+	cstr := C.CString(s)
+	defer C.free(unsafe.Pointer(cstr))
+	pnUrl := C.pn_url_parse(cstr)
+	if pnUrl == nil {
+		return nil, internal.Errorf("bad URL %#v", s)
+	}
+	defer C.pn_url_free(pnUrl)
+
+	scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
+	username := C.GoString(C.pn_url_get_username(pnUrl))
+	password := C.GoString(C.pn_url_get_password(pnUrl))
+	host := C.GoString(C.pn_url_get_host(pnUrl))
+	port := C.GoString(C.pn_url_get_port(pnUrl))
+	path := C.GoString(C.pn_url_get_path(pnUrl))
+
+	if err != nil {
+		return nil, internal.Errorf("bad URL %#v: %s", s, err)
+	}
+	if scheme == "" {
+		scheme = amqp
+	}
+	if port == "" {
+		if scheme == amqps {
+			port = amqps
+		} else {
+			port = amqp
+		}
+	}
+	var user *url.Userinfo
+	if password != "" {
+		user = url.UserPassword(username, password)
+	} else if username != "" {
+		user = url.User(username)
+	}
+
+	u = &url.URL{
+		Scheme: scheme,
+		User:   user,
+		Host:   net.JoinHostPort(host, port),
+		Path:   path,
+	}
+
+	return u, nil
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/url_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/amqp/url_test.go b/proton-c/bindings/go/amqp/url_test.go
new file mode 100644
index 0000000..f80f1c4
--- /dev/null
+++ b/proton-c/bindings/go/amqp/url_test.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+	"fmt"
+)
+
+func ExampleParseURL() {
+	for _, s := range []string{
+		"amqp://username:password@host:1234/path",
+		"host:1234",
+		"host",
+		":1234",
+		"host/path",
+		"amqps://host",
+		"",
+	} {
+		u, err := ParseURL(s)
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			fmt.Println(u)
+		}
+	}
+	// Output:
+	// amqp://username:password@host:1234/path
+	// amqp://host:1234
+	// amqp://host:amqp
+	// amqp://:1234
+	// amqp://host:amqp/path
+	// amqps://host:amqps
+	// proton: bad URL ""
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/doc.go b/proton-c/bindings/go/event/doc.go
new file mode 100644
index 0000000..7a9ec12
--- /dev/null
+++ b/proton-c/bindings/go/event/doc.go
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package event provides a low-level API to the proton AMQP engine.
+
+For most tasks, consider instead package qpid.apache.org/proton/go/messaging.
+It provides a higher-level, concurrent API that is easier to use.
+
+The API is event based. There are two alternative styles of handler. EventHandler
+provides the core proton events. MessagingHandler provides a slighly simplified
+view of the event stream and automates some common tasks.
+
+See type Pump documentation for more details of the interaction between proton
+events and goroutines.
+*/
+package event
+
+// #cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/handlers.go b/proton-c/bindings/go/event/handlers.go
new file mode 100644
index 0000000..d76fac9
--- /dev/null
+++ b/proton-c/bindings/go/event/handlers.go
@@ -0,0 +1,411 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package event
+
+// #include <proton/reactor.h>
+// #include <proton/handlers.h>
+import "C"
+
+import (
+	"qpid.apache.org/proton/go/internal"
+)
+
+// EventHandler handles core proton events.
+type EventHandler interface {
+	// HandleEvent is called with an event.
+	// Typically HandleEvent() is implemented as a switch on e.Type()
+	HandleEvent(e Event) error
+}
+
+// cHandler wraps a C pn_handler_t
+type cHandler struct {
+	pn *C.pn_handler_t
+}
+
+func (h cHandler) HandleEvent(e Event) error {
+	C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn))
+	return nil // FIXME aconway 2015-03-31: error handling
+}
+
+// MessagingHandler provides an alternative interface to EventHandler.
+// it is easier to use for most applications that send and receive messages.
+//
+// Implement this interface and then wrap your value with a MessagingHandlerDelegator.
+// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump.
+//
+type MessagingHandler interface {
+	HandleMessagingEvent(MessagingEventType, Event) error
+}
+
+// MessagingEventType provides a set of events that are easier to work with than the
+// core events defined by EventType
+//
+// There are 3 types of "endpoint": Connection, Session and Link.
+// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error.
+// The meaning of these events is as follows:
+//
+// Opening: The remote end opened, the local end will open automatically.
+//
+// Opened: Both ends are open, regardless of which end opened first.
+//
+// Closing: The remote end closed without error, the local end will close automatically.
+//
+// Error: The remote end closed with an error, the local end will close automatically.
+//
+// Closed: Both ends are closed, regardless of which end closed first or if there was an error.
+//
+type MessagingEventType int
+
+const (
+	// The event loop starts.
+	MStart MessagingEventType = iota
+	// The peer closes the connection with an error condition.
+	MConnectionError
+	// The peer closes the session with an error condition.
+	MSessionError
+	// The peer closes the link with an error condition.
+	MLinkError
+	// The peer Initiates the opening of the connection.
+	MConnectionOpening
+	// The peer initiates the opening of the session.
+	MSessionOpening
+	// The peer initiates the opening of the link.
+	MLinkOpening
+	// The connection is opened.
+	MConnectionOpened
+	// The session is opened.
+	MSessionOpened
+	// The link is opened.
+	MLinkOpened
+	// The peer initiates the closing of the connection.
+	MConnectionClosing
+	// The peer initiates the closing of the session.
+	MSessionClosing
+	// The peer initiates the closing of the link.
+	MLinkClosing
+	// Both ends of the connection are closed.
+	MConnectionClosed
+	// Both ends of the session are closed.
+	MSessionClosed
+	// Both ends of the link are closed.
+	MLinkClosed
+	// The connection is disconnected.
+	MConnectionDisconnected
+	// The session's connection was disconnected
+	MSessionDisconnected
+	// The session's connection was disconnected
+	MLinkDisconnected
+	// The sender link has credit and messages can
+	// therefore be transferred.
+	MSendable
+	// The remote peer accepts an outgoing message.
+	MAccepted
+	// The remote peer rejects an outgoing message.
+	MRejected
+	// The peer releases an outgoing message. Note that this may be in response to
+	// either the RELEASE or MODIFIED state as defined by the AMQP specification.
+	MReleased
+	// The peer has settled the outgoing message. This is the point at which it
+	// shouod never be retransmitted.
+	MSettled
+	// A message is received. Call DecodeMessage() to decode as an amqp.Message.
+	// To manage the outcome of this messages (e.g. to accept or reject the message)
+	// use Event.Delivery().
+	MMessage
+	// The event loop terminates, there are no more events to process.
+	MFinal
+)
+
+func (t MessagingEventType) String() string {
+	switch t {
+	case MStart:
+		return "Start"
+	case MConnectionError:
+		return "ConnectionError"
+	case MSessionError:
+		return "SessionError"
+	case MLinkError:
+		return "LinkError"
+	case MConnectionOpening:
+		return "ConnectionOpening"
+	case MSessionOpening:
+		return "SessionOpening"
+	case MLinkOpening:
+		return "LinkOpening"
+	case MConnectionOpened:
+		return "ConnectionOpened"
+	case MSessionOpened:
+		return "SessionOpened"
+	case MLinkOpened:
+		return "LinkOpened"
+	case MConnectionClosing:
+		return "ConnectionClosing"
+	case MSessionClosing:
+		return "SessionClosing"
+	case MLinkClosing:
+		return "LinkClosing"
+	case MConnectionClosed:
+		return "ConnectionClosed"
+	case MSessionClosed:
+		return "SessionClosed"
+	case MLinkClosed:
+		return "LinkClosed"
+	case MConnectionDisconnected:
+		return "ConnectionDisconnected"
+	case MSessionDisconnected:
+		return "MSessionDisconnected"
+	case MLinkDisconnected:
+		return "MLinkDisconnected"
+	case MSendable:
+		return "Sendable"
+	case MAccepted:
+		return "Accepted"
+	case MRejected:
+		return "Rejected"
+	case MReleased:
+		return "Released"
+	case MSettled:
+		return "Settled"
+	case MMessage:
+		return "Message"
+	default:
+		return "Unknown"
+	}
+}
+
+// ResourceHandler provides a simple way to track the creation and deletion of
+// various proton objects.
+// endpointDelegator captures common patterns for endpoints opening/closing
+type endpointDelegator struct {
+	remoteOpen, remoteClose, localOpen, localClose EventType
+	opening, opened, closing, closed, error        MessagingEventType
+	endpoint                                       func(Event) Endpoint
+	delegate                                       MessagingHandler
+}
+
+// HandleEvent handles an open/close event for an endpoint in a generic way.
+func (d endpointDelegator) HandleEvent(e Event) (err error) {
+	endpoint := d.endpoint(e)
+	state := endpoint.State()
+
+	switch e.Type() {
+
+	case d.localOpen:
+		if state.Is(SRemoteActive) {
+			err = d.delegate.HandleMessagingEvent(d.opened, e)
+		}
+
+	case d.remoteOpen:
+		switch {
+		case state.Is(SLocalActive):
+			err = d.delegate.HandleMessagingEvent(d.opened, e)
+		case state.Is(SLocalUninit):
+			err = d.delegate.HandleMessagingEvent(d.opening, e)
+			if err == nil {
+				endpoint.Open()
+			}
+		}
+
+	case d.remoteClose:
+		var err1 error
+		if endpoint.RemoteCondition().IsSet() { // Closed with error
+			err1 = d.delegate.HandleMessagingEvent(d.error, e)
+			if err1 == nil { // Don't overwrite an application error.
+				err1 = endpoint.RemoteCondition().Error()
+			}
+		} else {
+			err1 = d.delegate.HandleMessagingEvent(d.closing, e)
+		}
+		if state.Is(SLocalClosed) {
+			err = d.delegate.HandleMessagingEvent(d.closed, e)
+		} else if state.Is(SLocalActive) {
+			endpoint.Close()
+		}
+		if err1 != nil { // Keep the first error.
+			err = err1
+		}
+
+	case d.localClose:
+		if state.Is(SRemoteClosed) {
+			err = d.delegate.HandleMessagingEvent(d.closed, e)
+		}
+
+	default:
+		// We shouldn't be called with any other event type.
+		panic(internal.Errorf("internal error, not an open/close event: %s", e))
+	}
+
+	return err
+}
+
+// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler.
+// You can modify the exported fields before you pass the MessagingDelegator to
+// a Pump.
+type MessagingDelegator struct {
+	delegate                   MessagingHandler
+	connection, session, link  endpointDelegator
+	handshaker, flowcontroller EventHandler
+
+	// AutoSettle (default true) automatically pre-settle outgoing messages.
+	AutoSettle bool
+	// AutoAccept (default true) automatically accept and settle incoming messages
+	// if they are not settled by the delegate.
+	AutoAccept bool
+	// Prefetch (default 10) initial credit to issue for incoming links.
+	Prefetch int
+	// PeerCloseIsError (default false) if true a close by the peer will be treated as an error.
+	PeerCloseError bool
+}
+
+func NewMessagingDelegator(h MessagingHandler) EventHandler {
+	return &MessagingDelegator{
+		delegate: h,
+		connection: endpointDelegator{
+			EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose,
+			MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed,
+			MConnectionError,
+			func(e Event) Endpoint { return e.Connection() },
+			h,
+		},
+		session: endpointDelegator{
+			ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose,
+			MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed,
+			MSessionError,
+			func(e Event) Endpoint { return e.Session() },
+			h,
+		},
+		link: endpointDelegator{
+			ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose,
+			MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed,
+			MLinkError,
+			func(e Event) Endpoint { return e.Link() },
+			h,
+		},
+		flowcontroller: nil,
+		AutoSettle:     true,
+		AutoAccept:     true,
+		Prefetch:       10,
+		PeerCloseError: false,
+	}
+}
+
+func handleIf(h EventHandler, e Event) error {
+	if h != nil {
+		return h.HandleEvent(e)
+	}
+	return nil
+}
+
+// Handle a proton event by passing the corresponding MessagingEvent(s) to
+// the MessagingHandler.
+func (d *MessagingDelegator) HandleEvent(e Event) error {
+	handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling.
+
+	switch e.Type() {
+
+	case EConnectionInit:
+		d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))}
+		d.delegate.HandleMessagingEvent(MStart, e)
+
+	case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose:
+		return d.connection.HandleEvent(e)
+
+	case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose:
+		return d.session.HandleEvent(e)
+
+	case ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose:
+		return d.link.HandleEvent(e)
+
+	case ELinkFlow:
+		if e.Link().IsSender() && e.Link().Credit() > 0 {
+			return d.delegate.HandleMessagingEvent(MSendable, e)
+		}
+
+	case EDelivery:
+		if e.Delivery().Link().IsReceiver() {
+			d.incoming(e)
+		} else {
+			d.outgoing(e)
+		}
+
+	case ETransportTailClosed:
+		c := e.Connection()
+		for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) {
+			e2 := e
+			e2.link = l
+			e2.session = l.Session()
+			d.delegate.HandleMessagingEvent(MLinkDisconnected, e2)
+		}
+		for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) {
+			e2 := e
+			e2.session = s
+			d.delegate.HandleMessagingEvent(MSessionDisconnected, e2)
+		}
+		d.delegate.HandleMessagingEvent(MConnectionDisconnected, e)
+		d.delegate.HandleMessagingEvent(MFinal, e)
+	}
+	return nil
+}
+
+func (d *MessagingDelegator) incoming(e Event) (err error) {
+	delivery := e.Delivery()
+	if delivery.Readable() && !delivery.Partial() {
+		if e.Link().State().Is(SLocalClosed) {
+			e.Link().Advance()
+			if d.AutoAccept {
+				delivery.Release(false)
+			}
+		} else {
+			err = d.delegate.HandleMessagingEvent(MMessage, e)
+			e.Link().Advance()
+			if d.AutoAccept && !delivery.Settled() {
+				if err == nil {
+					delivery.Accept()
+				} else {
+					delivery.Reject()
+				}
+			}
+		}
+	} else if delivery.Updated() && delivery.Settled() {
+		err = d.delegate.HandleMessagingEvent(MSettled, e)
+	}
+	return
+}
+
+func (d *MessagingDelegator) outgoing(e Event) (err error) {
+	delivery := e.Delivery()
+	if delivery.Updated() {
+		switch delivery.Remote().Type() {
+		case Accepted:
+			err = d.delegate.HandleMessagingEvent(MAccepted, e)
+		case Rejected:
+			err = d.delegate.HandleMessagingEvent(MRejected, e)
+		case Released, Modified:
+			err = d.delegate.HandleMessagingEvent(MReleased, e)
+		}
+		if err == nil && delivery.Settled() {
+			err = d.delegate.HandleMessagingEvent(MSettled, e)
+		}
+		if err == nil && d.AutoSettle {
+			delivery.Settle()
+		}
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/message.go b/proton-c/bindings/go/event/message.go
new file mode 100644
index 0000000..d900744
--- /dev/null
+++ b/proton-c/bindings/go/event/message.go
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package event
+
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"qpid.apache.org/proton/go/amqp"
+	"qpid.apache.org/proton/go/internal"
+)
+
+// DecodeMessage decodes the message containined in a delivery event.
+func DecodeMessage(e Event) (m amqp.Message, err error) {
+	defer internal.DoRecover(&err)
+	delivery := e.Delivery()
+	if !delivery.Readable() || delivery.Partial() {
+		return nil, internal.Errorf("attempting to get incomplete message")
+	}
+	data := make([]byte, delivery.Pending())
+	result := delivery.Link().Recv(data)
+	if result != len(data) {
+		return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result))
+	}
+	return amqp.DecodeMessage(data)
+}
+
+// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link.
+var tags amqp.UidCounter
+
+// Send sends a amqp.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) Send(m amqp.Message) (Delivery, error) {
+	if !link.IsSender() {
+		return Delivery{}, internal.Errorf("attempt to send message on receiving link")
+	}
+	// FIXME aconway 2015-04-08: buffering, error handling
+	delivery := link.Delivery(tags.Next())
+	bytes, err := m.Encode(nil)
+	if err != nil {
+		return Delivery{}, internal.Errorf("cannot send mesage %s", err)
+	}
+	result := link.SendBytes(bytes)
+	link.Advance()
+	if result != len(bytes) {
+		if result < 0 {
+			return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result))
+		} else {
+			return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes))
+		}
+	}
+	if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names
+		delivery.Settle()
+	}
+	return delivery, nil
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/pump.go b/proton-c/bindings/go/event/pump.go
new file mode 100644
index 0000000..db022ff
--- /dev/null
+++ b/proton-c/bindings/go/event/pump.go
@@ -0,0 +1,357 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package event
+
+// #include <proton/connection.h>
+// #include <proton/transport.h>
+// #include <proton/event.h>
+// #include <proton/reactor.h>
+// #include <proton/handlers.h>
+// #include <proton/transport.h>
+// #include <proton/session.h>
+// #include <memory.h>
+// #include <stdlib.h>
+//
+// PN_HANDLE(REMOTE_ADDR)
+import "C"
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"qpid.apache.org/proton/go/internal"
+	"sync"
+	"unsafe"
+)
+
+// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
+type bufferChan struct {
+	buffers    chan []byte
+	buf1, buf2 []byte
+}
+
+func newBufferChan(size int) *bufferChan {
+	return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
+}
+
+func (b *bufferChan) buffer() []byte {
+	b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
+	return b.buf1[:cap(b.buf1)]
+}
+
+// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if
+// the pum stops. Make this a function that selects on running.
+
+// FIXME aconway 2015-05-05: for consistency should Pump be called Driver?
+
+/*
+Pump reads from a net.Conn, decodes AMQP events and calls the appropriate
+Handler functions. Actions taken by Handler functions (such as sending messages)
+are encoded and written to the net.Conn.
+
+The proton protocol engine is single threaded (per connection). The Pump runs
+proton in the goroutine that calls Pump.Run() and creates goroutines to feed
+data to/from a net.Conn. You can create multiple Pumps to handle multiple
+connections concurrently.
+
+Methods in this package can only be called in the goroutine that executes the
+corresponding Pump.Run(). You implement the EventHandler or MessagingHandler
+interfaces and provide those values to NewPump(). Their HandleEvent method will be
+called in the Pump goroutine, in typical event-driven style.
+
+Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
+other goroutines, store them, or use them as map indexes. Effectively they are
+just C pointers.  Other goroutines cannot call their methods directly but they
+can can create function closures that call their methods and send those closures
+to the Pump.Inject channel. They will execute safely in the pump
+goroutine. Injected functions, or your handlers, can set up channels to get
+results back to other goroutines.
+
+You are responsible for ensuring you don't use an event value after the C object
+has been deleted. The handler methods will tell you when a value is no longer
+valid. For example after a MethodHandler handles a LinkClosed event, that link
+is no longer valid. If you do Link.Close() yourself (in a handler or injected
+function) the link remains valid until the corresponing LinkClosed event is
+received by the handler.
+
+Pump.Close() will take care of cleaning up any remaining values and types when
+you are done with the Pump. All values associated with a pump become invalid
+when you call Pump.Close()
+
+The qpid.apache.org/proton/go/messaging package will do all this for you, so unless
+you are doing something fairly low-level it is probably a better choice.
+
+*/
+type Pump struct {
+	// Error is set on exit from Run() if there was an error.
+	Error error
+	// Channel to inject functions to be executed in the Pump's proton event loop.
+	Inject chan func()
+
+	conn       net.Conn
+	transport  *C.pn_transport_t
+	connection *C.pn_connection_t
+	collector  *C.pn_collector_t
+	read       *bufferChan    // Read buffers channel.
+	write      *bufferChan    // Write buffers channel.
+	handlers   []EventHandler // Handlers for proton events.
+	running    chan struct{}  // This channel will be closed when the goroutines are done.
+}
+
+const bufferSize = 4096
+
+var pumps map[*C.pn_connection_t]*Pump
+
+func init() {
+	pumps = make(map[*C.pn_connection_t]*Pump)
+}
+
+// NewPump initializes a pump with a connection and handlers. To start it running:
+//    p := NewPump(...)
+//    go run p.Run()
+// The goroutine will exit when the pump is closed or disconnected.
+// You can check for errors on Pump.Error.
+//
+func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) {
+	// Save the connection ID for Connection.String()
+	p := &Pump{
+		Inject:     make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack
+		conn:       conn,
+		transport:  C.pn_transport(),
+		connection: C.pn_connection(),
+		collector:  C.pn_collector(),
+		handlers:   handlers,
+		read:       newBufferChan(bufferSize),
+		write:      newBufferChan(bufferSize),
+		running:    make(chan struct{}),
+	}
+	if p.transport == nil || p.connection == nil || p.collector == nil {
+		return nil, internal.Errorf("failed to allocate pump")
+	}
+	pnErr := int(C.pn_transport_bind(p.transport, p.connection))
+	if pnErr != 0 {
+		return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr))
+	}
+	C.pn_connection_collect(p.connection, p.collector)
+	C.pn_connection_open(p.connection)
+	pumps[p.connection] = p
+	return p, nil
+}
+
+func (p *Pump) String() string {
+	return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr())
+}
+
+func (p *Pump) Id() string {
+	return fmt.Sprintf("%p", &p)
+}
+
+// setError sets error only if not already set
+func (p *Pump) setError(e error) {
+	if p.Error == nil {
+		p.Error = e
+	}
+}
+
+// Server puts the Pump in server mode, meaning it will auto-detect security settings on
+// the incoming connnection such as use of SASL and SSL.
+// Must be called before Run()
+//
+func (p *Pump) Server() {
+	C.pn_transport_set_server(p.transport)
+}
+
+func (p *Pump) free() {
+	if p.connection != nil {
+		C.pn_connection_free(p.connection)
+	}
+	if p.transport != nil {
+		C.pn_transport_free(p.transport)
+	}
+	if p.collector != nil {
+		C.pn_collector_free(p.collector)
+	}
+	for _, h := range p.handlers {
+		switch h := h.(type) {
+		case cHandler:
+			C.pn_handler_free(h.pn)
+		}
+	}
+}
+
+// Close closes the AMQP connection, the net.Conn, and stops associated goroutines.
+// It will cause Run() to return. Run() may return earlier if the network disconnects
+// but you must still call Close() to clean everything up.
+//
+// Methods on values associated with the pump (Connections, Sessions, Links) will panic
+// if called after Close()
+//
+func (p *Pump) Close() error {
+	// If the pump is still running, inject a close. Either way wait for it to finish.
+	select {
+	case p.Inject <- func() { C.pn_connection_close(p.connection) }:
+		<-p.running // Wait to finish
+	case <-p.running: // Wait for goroutines to finish
+	}
+	delete(pumps, p.connection)
+	p.free()
+	return p.Error
+}
+
+// Run the pump. Normally called in a goroutine as: go pump.Run()
+// An error dunring Run is stored on p.Error.
+//
+func (p *Pump) Run() {
+	// Signal errors from the read/write goroutines. Don't block if we don't
+	// read all the errors, we only care about the first.
+	error := make(chan error, 2)
+	// FIXME aconway 2015-05-04: 	stop := make(chan struct{}) // Closed to signal that read/write should stop.
+
+	wait := sync.WaitGroup{}
+	wait.Add(2)
+
+	go func() { // Read goroutine
+		defer wait.Done()
+		for {
+			rbuf := p.read.buffer()
+			n, err := p.conn.Read(rbuf)
+			if n > 0 {
+				p.read.buffers <- rbuf[:n]
+			} else if err != nil {
+				close(p.read.buffers)
+				error <- err
+				return
+			}
+		}
+	}()
+
+	go func() { // Write goroutine
+		defer wait.Done()
+		for {
+			wbuf, ok := <-p.write.buffers
+			if !ok {
+				return
+			}
+			_, err := p.conn.Write(wbuf)
+			if err != nil {
+				error <- err
+				return
+			}
+		}
+	}()
+
+	wbuf := p.write.buffer()[:0]
+loop:
+	for {
+		if len(wbuf) == 0 {
+			p.pop(&wbuf)
+		}
+		// Don't set wchan unless there is something to write.
+		var wchan chan []byte
+		if len(wbuf) > 0 {
+			wchan = p.write.buffers
+		}
+
+		select {
+		case buf := <-p.read.buffers: // Read a buffer
+			p.push(buf)
+		case wchan <- wbuf: // Write a buffer
+			wbuf = p.write.buffer()[:0]
+		case f := <-p.Inject: // Function injected from another goroutine
+			f()
+		case err := <-error: // Read or write error
+			p.setError(err)
+			C.pn_transport_close_tail(p.transport)
+			C.pn_transport_close_head(p.transport)
+		}
+		if err := p.process(); err != nil {
+			p.setError(err)
+			break loop
+		}
+	}
+	close(p.write.buffers)
+	p.conn.Close()
+	wait.Wait()
+	close(p.running) // Signal goroutines have exited and Error is set.
+}
+
+func minInt(a, b int) int {
+	if a < b {
+		return a
+	} else {
+		return b
+	}
+}
+
+func (p *Pump) pop(buf *[]byte) {
+	pending := int(C.pn_transport_pending(p.transport))
+	switch {
+	case pending == int(C.PN_EOS):
+		*buf = (*buf)[:]
+		return
+	case pending < 0:
+		panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
+	}
+	size := minInt(pending, cap(*buf))
+	*buf = (*buf)[:size]
+	if size == 0 {
+		return
+	}
+	C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size))
+	C.pn_transport_pop(p.transport, C.size_t(size))
+}
+
+func (p *Pump) push(buf []byte) {
+	buf2 := buf
+	for len(buf2) > 0 {
+		n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2))))
+		if n <= 0 {
+			panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
+		}
+		buf2 = buf2[n:]
+	}
+}
+
+func (p *Pump) handle(e Event) error {
+	for _, h := range p.handlers {
+		if err := h.HandleEvent(e); err != nil {
+			return err
+		}
+	}
+	if e.Type() == ETransportClosed {
+		return io.EOF
+	}
+	return nil
+}
+
+func (p *Pump) process() error {
+	// FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump
+	for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) {
+		e := makeEvent(ce)
+		if err := p.handle(e); err != nil {
+			return err
+		}
+		C.pn_collector_pop(p.collector)
+	}
+	return nil
+}
+
+// Connectoin gets the Pump's connection value.
+func (p *Pump) Connection() Connection { return Connection{p.connection} }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/wrappers.go b/proton-c/bindings/go/event/wrappers.go
new file mode 100644
index 0000000..7043b9c
--- /dev/null
+++ b/proton-c/bindings/go/event/wrappers.go
@@ -0,0 +1,253 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package event
+
+//#include <proton/codec.h>
+//#include <proton/connection.h>
+//#include <proton/session.h>
+//#include <proton/session.h>
+//#include <proton/delivery.h>
+//#include <proton/link.h>
+//#include <proton/event.h>
+//#include <proton/transport.h>
+//#include <proton/link.h>
+//#include <stdlib.h>
+import "C"
+
+import (
+	"fmt"
+	"qpid.apache.org/proton/go/internal"
+	"unsafe"
+)
+
+// FIXME aconway 2015-05-05: Documentation for generated types.
+
+// Event is an AMQP protocol event.
+type Event struct {
+	pn         *C.pn_event_t
+	eventType  EventType
+	connection Connection
+	session    Session
+	link       Link
+	delivery   Delivery
+}
+
+func makeEvent(pn *C.pn_event_t) Event {
+	return Event{
+		pn:         pn,
+		eventType:  EventType(C.pn_event_type(pn)),
+		connection: Connection{C.pn_event_connection(pn)},
+		session:    Session{C.pn_event_session(pn)},
+		link:       Link{C.pn_event_link(pn)},
+		delivery:   Delivery{C.pn_event_delivery(pn)},
+	}
+}
+func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
+func (e Event) Type() EventType        { return e.eventType }
+func (e Event) Connection() Connection { return e.connection }
+func (e Event) Session() Session       { return e.session }
+func (e Event) Link() Link             { return e.link }
+func (e Event) Delivery() Delivery     { return e.delivery }
+func (e Event) String() string         { return e.Type().String() }
+
+// Data holds a pointer to decoded AMQP data.
+// Use amqp.marshal/unmarshal to access it as Go data types.
+//
+type Data struct{ pn *C.pn_data_t }
+
+func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} }
+
+func (d Data) Free()                   { C.pn_data_free(d.pn) }
+func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) }
+func (d Data) Clear()                  { C.pn_data_clear(d.pn) }
+func (d Data) Rewind()                 { C.pn_data_rewind(d.pn) }
+func (d Data) Error() error {
+	return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn)))
+}
+
+// State holds the state flags for an AMQP endpoint.
+type State byte
+
+const (
+	SLocalUninit  State = C.PN_LOCAL_UNINIT
+	SLocalActive        = C.PN_LOCAL_ACTIVE
+	SLocalClosed        = C.PN_LOCAL_CLOSED
+	SRemoteUninit       = C.PN_REMOTE_UNINIT
+	SRemoteActive       = C.PN_REMOTE_ACTIVE
+	SRemoteClosed       = C.PN_REMOTE_CLOSED
+)
+
+// Is is True if bits & state is non 0.
+func (s State) Is(bits State) bool { return s&bits != 0 }
+
+// Return a State containig just the local flags
+func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) }
+
+// Return a State containig just the remote flags
+func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) }
+
+// Endpoint is the common interface for Connection, Link and Session.
+type Endpoint interface {
+	// State is the open/closed state.
+	State() State
+	// Open an endpoint.
+	Open()
+	// Close an endpoint.
+	Close()
+	// Condition holds a local error condition.
+	Condition() Condition
+	// RemoteCondition holds a remote error condition.
+	RemoteCondition() Condition
+}
+
+const (
+	Received uint64 = C.PN_RECEIVED
+	Accepted        = C.PN_ACCEPTED
+	Rejected        = C.PN_REJECTED
+	Released        = C.PN_RELEASED
+	Modified        = C.PN_MODIFIED
+)
+
+// SettleAs is equivalent to d.Update(disposition); d.Settle()
+// It is a no-op if e does not have a delivery.
+func (d Delivery) SettleAs(disposition uint64) {
+	d.Update(disposition)
+	d.Settle()
+}
+
+// Accept accepts and settles a delivery.
+func (d Delivery) Accept() { d.SettleAs(Accepted) }
+
+// Reject rejects and settles a delivery
+func (d Delivery) Reject() { d.SettleAs(Rejected) }
+
+// Release releases and settles a delivery
+// If delivered is true the delivery count for the message will be increased.
+func (d Delivery) Release(delivered bool) {
+	if delivered {
+		d.SettleAs(Modified)
+	} else {
+		d.SettleAs(Released)
+	}
+}
+
+// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String?
+
+type DeliveryTag struct{ pn C.pn_delivery_tag_t }
+
+func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) }
+
+func (l Link) Recv(buf []byte) int {
+	if len(buf) == 0 {
+		return 0
+	}
+	return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
+}
+
+func (l Link) SendBytes(bytes []byte) int {
+	return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes)))
+}
+
+func pnTag(tag string) C.pn_delivery_tag_t {
+	bytes := []byte(tag)
+	return C.pn_dtag(cPtr(bytes), cLen(bytes))
+}
+
+func (l Link) Delivery(tag string) Delivery {
+	return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
+}
+
+func cPtr(b []byte) *C.char {
+	if len(b) == 0 {
+		return nil
+	}
+	return (*C.char)(unsafe.Pointer(&b[0]))
+}
+
+func cLen(b []byte) C.size_t {
+	return C.size_t(len(b))
+}
+
+func (s Session) Sender(name string) Link {
+	cname := C.CString(name)
+	defer C.free(unsafe.Pointer(cname))
+	return Link{C.pn_sender(s.pn, cname)}
+}
+
+func (s Session) Receiver(name string) Link {
+	cname := C.CString(name)
+	defer C.free(unsafe.Pointer(cname))
+	return Link{C.pn_receiver(s.pn, cname)}
+}
+
+func joinId(a, b interface{}) string {
+	return fmt.Sprintf("%s/%s", a, b)
+}
+
+// Pump associated with this connection.
+func (c Connection) Pump() *Pump { return pumps[c.pn] }
+
+// Unique (per process) string identifier for a connection, useful for debugging.
+func (c Connection) String() string { return pumps[c.pn].String() }
+
+// Head functions don't follow the normal naming conventions so missed by the generator.
+
+func (c Connection) LinkHead(s State) Link {
+	return Link{C.pn_link_head(c.pn, C.pn_state_t(s))}
+}
+
+func (c Connection) SessionHead(s State) Session {
+	return Session{C.pn_session_head(c.pn, C.pn_state_t(s))}
+}
+
+// Unique (per process) string identifier for a session, including connection identifier.
+func (s Session) String() string {
+	return joinId(s.Connection(), fmt.Sprintf("%p", s.pn))
+}
+
+// Unique (per process) string identifier for a link, inlcuding session identifier.
+func (l Link) String() string {
+	return joinId(l.Session(), l.Name())
+}
+
+// Error returns an error interface corresponding to Condition.
+func (c Condition) Error() error {
+	if c.IsNil() {
+		return nil
+	} else {
+		return fmt.Errorf("%s: %s", c.Name(), c.Description())
+	}
+}
+
+// SetIfUnset sets name and description on a condition if it is not already set.
+func (c Condition) SetIfUnset(name, description string) {
+	if !c.IsSet() {
+		c.SetName(name)
+		c.SetDescription(description)
+	}
+}
+
+func (c Connection) Session() (Session, error) {
+	s := Session{C.pn_session(c.pn)}
+	if s.IsNil() {
+		return s, Connection(c).Error()
+	}
+	return s, nil
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/event/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/event/wrappers_gen.go b/proton-c/bindings/go/event/wrappers_gen.go
new file mode 100644
index 0000000..73f0d3b
--- /dev/null
+++ b/proton-c/bindings/go/event/wrappers_gen.go
@@ -0,0 +1,732 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// NOTE: This file was generated by genwrap.go, do not edit it by hand.
+//
+
+package event
+
+import (
+	"qpid.apache.org/proton/go/internal"
+	"time"
+	"unsafe"
+)
+
+// #include <proton/types.h>
+// #include <proton/event.h>
+// #include <stdlib.h>
+// #include <proton/session.h>
+// #include <proton/link.h>
+// #include <proton/delivery.h>
+// #include <proton/disposition.h>
+// #include <proton/condition.h>
+// #include <proton/terminus.h>
+// #include <proton/connection.h>
+import "C"
+
+type EventType int
+
+const (
+	EConnectionInit         EventType = C.PN_CONNECTION_INIT
+	EConnectionBound        EventType = C.PN_CONNECTION_BOUND
+	EConnectionUnbound      EventType = C.PN_CONNECTION_UNBOUND
+	EConnectionLocalOpen    EventType = C.PN_CONNECTION_LOCAL_OPEN
+	EConnectionRemoteOpen   EventType = C.PN_CONNECTION_REMOTE_OPEN
+	EConnectionLocalClose   EventType = C.PN_CONNECTION_LOCAL_CLOSE
+	EConnectionRemoteClose  EventType = C.PN_CONNECTION_REMOTE_CLOSE
+	EConnectionFinal        EventType = C.PN_CONNECTION_FINAL
+	ESessionInit            EventType = C.PN_SESSION_INIT
+	ESessionLocalOpen       EventType = C.PN_SESSION_LOCAL_OPEN
+	ESessionRemoteOpen      EventType = C.PN_SESSION_REMOTE_OPEN
+	ESessionLocalClose      EventType = C.PN_SESSION_LOCAL_CLOSE
+	ESessionRemoteClose     EventType = C.PN_SESSION_REMOTE_CLOSE
+	ESessionFinal           EventType = C.PN_SESSION_FINAL
+	ELinkInit               EventType = C.PN_LINK_INIT
+	ELinkLocalOpen          EventType = C.PN_LINK_LOCAL_OPEN
+	ELinkRemoteOpen         EventType = C.PN_LINK_REMOTE_OPEN
+	ELinkLocalClose         EventType = C.PN_LINK_LOCAL_CLOSE
+	ELinkRemoteClose        EventType = C.PN_LINK_REMOTE_CLOSE
+	ELinkLocalDetach        EventType = C.PN_LINK_LOCAL_DETACH
+	ELinkRemoteDetach       EventType = C.PN_LINK_REMOTE_DETACH
+	ELinkFlow               EventType = C.PN_LINK_FLOW
+	ELinkFinal              EventType = C.PN_LINK_FINAL
+	EDelivery               EventType = C.PN_DELIVERY
+	ETransport              EventType = C.PN_TRANSPORT
+	ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED
+	ETransportError         EventType = C.PN_TRANSPORT_ERROR
+	ETransportHeadClosed    EventType = C.PN_TRANSPORT_HEAD_CLOSED
+	ETransportTailClosed    EventType = C.PN_TRANSPORT_TAIL_CLOSED
+	ETransportClosed        EventType = C.PN_TRANSPORT_CLOSED
+)
+
+func (e EventType) String() string {
+	switch e {
+
+	case C.PN_CONNECTION_INIT:
+		return "ConnectionInit"
+	case C.PN_CONNECTION_BOUND:
+		return "ConnectionBound"
+	case C.PN_CONNECTION_UNBOUND:
+		return "ConnectionUnbound"
+	case C.PN_CONNECTION_LOCAL_OPEN:
+		return "ConnectionLocalOpen"
+	case C.PN_CONNECTION_REMOTE_OPEN:
+		return "ConnectionRemoteOpen"
+	case C.PN_CONNECTION_LOCAL_CLOSE:
+		return "ConnectionLocalClose"
+	case C.PN_CONNECTION_REMOTE_CLOSE:
+		return "ConnectionRemoteClose"
+	case C.PN_CONNECTION_FINAL:
+		return "ConnectionFinal"
+	case C.PN_SESSION_INIT:
+		return "SessionInit"
+	case C.PN_SESSION_LOCAL_OPEN:
+		return "SessionLocalOpen"
+	case C.PN_SESSION_REMOTE_OPEN:
+		return "SessionRemoteOpen"
+	case C.PN_SESSION_LOCAL_CLOSE:
+		return "SessionLocalClose"
+	case C.PN_SESSION_REMOTE_CLOSE:
+		return "SessionRemoteClose"
+	case C.PN_SESSION_FINAL:
+		return "SessionFinal"
+	case C.PN_LINK_INIT:
+		return "LinkInit"
+	case C.PN_LINK_LOCAL_OPEN:
+		return "LinkLocalOpen"
+	case C.PN_LINK_REMOTE_OPEN:
+		return "LinkRemoteOpen"
+	case C.PN_LINK_LOCAL_CLOSE:
+		return "LinkLocalClose"
+	case C.PN_LINK_REMOTE_CLOSE:
+		return "LinkRemoteClose"
+	case C.PN_LINK_LOCAL_DETACH:
+		return "LinkLocalDetach"
+	case C.PN_LINK_REMOTE_DETACH:
+		return "LinkRemoteDetach"
+	case C.PN_LINK_FLOW:
+		return "LinkFlow"
+	case C.PN_LINK_FINAL:
+		return "LinkFinal"
+	case C.PN_DELIVERY:
+		return "Delivery"
+	case C.PN_TRANSPORT:
+		return "Transport"
+	case C.PN_TRANSPORT_AUTHENTICATED:
+		return "TransportAuthenticated"
+	case C.PN_TRANSPORT_ERROR:
+		return "TransportError"
+	case C.PN_TRANSPORT_HEAD_CLOSED:
+		return "TransportHeadClosed"
+	case C.PN_TRANSPORT_TAIL_CLOSED:
+		return "TransportTailClosed"
+	case C.PN_TRANSPORT_CLOSED:
+		return "TransportClosed"
+	}
+	return "Unknown"
+}
+
+// Wrappers for declarations in session.h
+
+type Session struct{ pn *C.pn_session_t }
+
+func (s Session) IsNil() bool { return s.pn == nil }
+func (s Session) Free() {
+	C.pn_session_free(s.pn)
+}
+func (s Session) State() State {
+	return State(C.pn_session_state(s.pn))
+}
+func (s Session) Error() error {
+	return internal.PnError(unsafe.Pointer(C.pn_session_error(s.pn)))
+}
+func (s Session) Condition() Condition {
+	return Condition{C.pn_session_condition(s.pn)}
+}
+func (s Session) RemoteCondition() Condition {
+	return Condition{C.pn_session_remote_condition(s.pn)}
+}
+func (s Session) Connection() Connection {
+	return Connection{C.pn_session_connection(s.pn)}
+}
+func (s Session) Open() {
+	C.pn_session_open(s.pn)
+}
+func (s Session) Close() {
+	C.pn_session_close(s.pn)
+}
+func (s Session) IncomingCapacity() uint {
+	return uint(C.pn_session_get_incoming_capacity(s.pn))
+}
+func (s Session) SetIncomingCapacity(capacity uint) {
+	C.pn_session_set_incoming_capacity(s.pn, C.size_t(capacity))
+}
+func (s Session) OutgoingBytes() uint {
+	return uint(C.pn_session_outgoing_bytes(s.pn))
+}
+func (s Session) IncomingBytes() uint {
+	return uint(C.pn_session_incoming_bytes(s.pn))
+}
+func (s Session) Next(state State) Session {
+	return Session{C.pn_session_next(s.pn, C.pn_state_t(state))}
+}
+
+// Wrappers for declarations in link.h
+
+type SndSettleMode C.pn_snd_settle_mode_t
+
+const (
+	PnSndUnsettled SndSettleMode = C.PN_SND_UNSETTLED
+	PnSndSettled   SndSettleMode = C.PN_SND_SETTLED
+	PnSndMixed     SndSettleMode = C.PN_SND_MIXED
+)
+
+func (e SndSettleMode) String() string {
+	switch e {
+
+	case C.PN_SND_UNSETTLED:
+		return "SndUnsettled"
+	case C.PN_SND_SETTLED:
+		return "SndSettled"
+	case C.PN_SND_MIXED:
+		return "SndMixed"
+	}
+	return "unknown"
+}
+
+type RcvSettleMode C.pn_rcv_settle_mode_t
+
+const (
+	PnRcvFirst  RcvSettleMode = C.PN_RCV_FIRST
+	PnRcvSecond RcvSettleMode = C.PN_RCV_SECOND
+)
+
+func (e RcvSettleMode) String() string {
+	switch e {
+
+	case C.PN_RCV_FIRST:
+		return "RcvFirst"
+	case C.PN_RCV_SECOND:
+		return "RcvSecond"
+	}
+	return "unknown"
+}
+
+type Link struct{ pn *C.pn_link_t }
+
+func (l Link) IsNil() bool { return l.pn == nil }
+func (l Link) Free() {
+	C.pn_link_free(l.pn)
+}
+func (l Link) Name() string {
+	return C.GoString(C.pn_link_name(l.pn))
+}
+func (l Link) IsSender() bool {
+	return bool(C.pn_link_is_sender(l.pn))
+}
+func (l Link) IsReceiver() bool {
+	return bool(C.pn_link_is_receiver(l.pn))
+}
+func (l Link) State() State {
+	return State(C.pn_link_state(l.pn))
+}
+func (l Link) Error() error {
+	return internal.PnError(unsafe.Pointer(C.pn_link_error(l.pn)))
+}
+func (l Link) Condition() Condition {
+	return Condition{C.pn_link_condition(l.pn)}
+}
+func (l Link) RemoteCondition() Condition {
+	return Condition{C.pn_link_remote_condition(l.pn)}
+}
+func (l Link) Session() Session {
+	return Session{C.pn_link_session(l.pn)}
+}
+func (l Link) Next(state State) Link {
+	return Link{C.pn_link_next(l.pn, C.pn_state_t(state))}
+}
+func (l Link) Open() {
+	C.pn_link_open(l.pn)
+}
+func (l Link) Close() {
+	C.pn_link_close(l.pn)
+}
+func (l Link) Detach() {
+	C.pn_link_detach(l.pn)
+}
+func (l Link) Source() Terminus {
+	return Terminus{C.pn_link_source(l.pn)}
+}
+func (l Link) Target() Terminus {
+	return Terminus{C.pn_link_target(l.pn)}
+}
+func (l Link) RemoteSource() Terminus {
+	return Terminus{C.pn_link_remote_source(l.pn)}
+}
+func (l Link) RemoteTarget() Terminus {
+	return Terminus{C.pn_link_remote_target(l.pn)}
+}
+func (l Link) Current() Delivery {
+	return Delivery{C.pn_link_current(l.pn)}
+}
+func (l Link) Advance() bool {
+	return bool(C.pn_link_advance(l.pn))
+}
+func (l Link) Credit() int {
+	return int(C.pn_link_credit(l.pn))
+}
+func (l Link) Queued() int {
+	return int(C.pn_link_queued(l.pn))
+}
+func (l Link) RemoteCredit() int {
+	return int(C.pn_link_remote_credit(l.pn))
+}
+func (l Link) IsDrain() bool {
+	return bool(C.pn_link_get_drain(l.pn))
+}
+func (l Link) Drained() int {
+	return int(C.pn_link_drained(l.pn))
+}
+func (l Link) Available() int {
+	return int(C.pn_link_available(l.pn))
+}
+func (l Link) SndSettleMode() SndSettleMode {
+	return SndSettleMode(C.pn_link_snd_settle_mode(l.pn))
+}
+func (l Link) RcvSettleMode() RcvSettleMode {
+	return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn))
+}
+func (l Link) SetSndSettleMode(mode SndSettleMode) {
+	C.pn_link_set_snd_settle_mode(l.pn, C.pn_snd_settle_mode_t(mode))
+}
+func (l Link) SetRcvSettleMode(mode RcvSettleMode) {
+	C.pn_link_set_rcv_settle_mode(l.pn, C.pn_rcv_settle_mode_t(mode))
+}
+func (l Link) RemoteSndSettleMode() SndSettleMode {
+	return SndSettleMode(C.pn_link_remote_snd_settle_mode(l.pn))
+}
+func (l Link) RemoteRcvSettleMode() RcvSettleMode {
+	return RcvSettleMode(C.pn_link_remote_rcv_settle_mode(l.pn))
+}
+func (l Link) Unsettled() int {
+	return int(C.pn_link_unsettled(l.pn))
+}
+func (l Link) Offered(credit int) {
+	C.pn_link_offered(l.pn, C.int(credit))
+}
+func (l Link) Flow(credit int) {
+	C.pn_link_flow(l.pn, C.int(credit))
+}
+func (l Link) Drain(credit int) {
+	C.pn_link_drain(l.pn, C.int(credit))
+}
+func (l Link) SetDrain(drain bool) {
+	C.pn_link_set_drain(l.pn, C.bool(drain))
+}
+func (l Link) Draining() bool {
+	return bool(C.pn_link_draining(l.pn))
+}
+
+// Wrappers for declarations in delivery.h
+
+type Delivery struct{ pn *C.pn_delivery_t }
+
+func (d Delivery) IsNil() bool { return d.pn == nil }
+func (d Delivery) Tag() DeliveryTag {
+	return DeliveryTag{C.pn_delivery_tag(d.pn)}
+}
+func (d Delivery) Link() Link {
+	return Link{C.pn_delivery_link(d.pn)}
+}
+func (d Delivery) Local() Disposition {
+	return Disposition{C.pn_delivery_local(d.pn)}
+}
+func (d Delivery) LocalState() uint64 {
+	return uint64(C.pn_delivery_local_state(d.pn))
+}
+func (d Delivery) Remote() Disposition {
+	return Disposition{C.pn_delivery_remote(d.pn)}
+}
+func (d Delivery) RemoteState() uint64 {
+	return uint64(C.pn_delivery_remote_state(d.pn))
+}
+func (d Delivery) Settled() bool {
+	return bool(C.pn_delivery_settled(d.pn))
+}
+func (d Delivery) Pending() uint {
+	return uint(C.pn_delivery_pending(d.pn))
+}
+func (d Delivery) Partial() bool {
+	return bool(C.pn_delivery_partial(d.pn))
+}
+func (d Delivery) Writable() bool {
+	return bool(C.pn_delivery_writable(d.pn))
+}
+func (d Delivery) Readable() bool {
+	return bool(C.pn_delivery_readable(d.pn))
+}
+func (d Delivery) Updated() bool {
+	return bool(C.pn_delivery_updated(d.pn))
+}
+func (d Delivery) Update(state uint64) {
+	C.pn_delivery_update(d.pn, C.uint64_t(state))
+}
+func (d Delivery) Clear() {
+	C.pn_delivery_clear(d.pn)
+}
+func (d Delivery) Current() bool {
+	return bool(C.pn_delivery_current(d.pn))
+}
+func (d Delivery) Settle() {
+	C.pn_delivery_settle(d.pn)
+}
+func (d Delivery) Dump() {
+	C.pn_delivery_dump(d.pn)
+}
+func (d Delivery) Buffered() bool {
+	return bool(C.pn_delivery_buffered(d.pn))
+}
+
+// Wrappers for declarations in disposition.h
+
+type Disposition struct{ pn *C.pn_disposition_t }
+
+func (d Disposition) IsNil() bool { return d.pn == nil }
+func (d Disposition) Type() uint64 {
+	return uint64(C.pn_disposition_type(d.pn))
+}
+func (d Disposition) Condition() Condition {
+	return Condition{C.pn_disposition_condition(d.pn)}
+}
+func (d Disposition) Data() Data {
+	return Data{C.pn_disposition_data(d.pn)}
+}
+func (d Disposition) SectionNumber() uint16 {
+	return uint16(C.pn_disposition_get_section_number(d.pn))
+}
+func (d Disposition) SetSectionNumber(section_number uint16) {
+	C.pn_disposition_set_section_number(d.pn, C.uint32_t(section_number))
+}
+func (d Disposition) SectionOffset() uint64 {
+	return uint64(C.pn_disposition_get_section_offset(d.pn))
+}
+func (d Disposition) SetSectionOffset(section_offset uint64) {
+	C.pn_disposition_set_section_offset(d.pn, C.uint64_t(section_offset))
+}
+func (d Disposition) IsFailed() bool {
+	return bool(C.pn_disposition_is_failed(d.pn))
+}
+func (d Disposition) SetFailed(failed bool) {
+	C.pn_disposition_set_failed(d.pn, C.bool(failed))
+}
+func (d Disposition) IsUndeliverable() bool {
+	return bool(C.pn_disposition_is_undeliverable(d.pn))
+}
+func (d Disposition) SetUndeliverable(undeliverable bool) {
+	C.pn_disposition_set_undeliverable(d.pn, C.bool(undeliverable))
+}
+func (d Disposition) Annotations() Data {
+	return Data{C.pn_disposition_annotations(d.pn)}
+}
+
+// Wrappers for declarations in condition.h
+
+type Condition struct{ pn *C.pn_condition_t }
+
+func (c Condition) IsNil() bool { return c.pn == nil }
+func (c Condition) IsSet() bool {
+	return bool(C.pn_condition_is_set(c.pn))
+}
+func (c Condition) Clear() {
+	C.pn_condition_clear(c.pn)
+}
+func (c Condition) Name() string {
+	return C.GoString(C.pn_condition_get_name(c.pn))
+}
+func (c Condition) SetName(name string) int {
+	nameC := C.CString(name)
+	defer C.free(unsafe.Pointer(nameC))
+
+	return int(C.pn_condition_set_name(c.pn, nameC))
+}
+func (c Condition) Description() string {
+	return C.GoString(C.pn_condition_get_description(c.pn))
+}
+func (c Condition) SetDescription(description string) int {
+	descriptionC := C.CString(description)
+	defer C.free(unsafe.Pointer(descriptionC))
+
+	return int(C.pn_condition_set_description(c.pn, descriptionC))
+}
+func (c Condition) Info() Data {
+	return Data{C.pn_condition_info(c.pn)}
+}
+func (c Condition) IsRedirect() bool {
+	return bool(C.pn_condition_is_redirect(c.pn))
+}
+func (c Condition) RedirectHost() string {
+	return C.GoString(C.pn_condition_redirect_host(c.pn))
+}
+func (c Condition) RedirectPort() int {
+	return int(C.pn_condition_redirect_port(c.pn))
+}
+
+// Wrappers for declarations in terminus.h
+
+type TerminusType C.pn_terminus_type_t
+
+const (
+	PnUnspecified TerminusType = C.PN_UNSPECIFIED
+	PnSource      TerminusType = C.PN_SOURCE
+	PnTarget      TerminusType = C.PN_TARGET
+	PnCoordinator TerminusType = C.PN_COORDINATOR
+)
+
+func (e TerminusType) String() string {
+	switch e {
+
+	case C.PN_UNSPECIFIED:
+		return "Unspecified"
+	case C.PN_SOURCE:
+		return "Source"
+	case C.PN_TARGET:
+		return "Target"
+	case C.PN_COORDINATOR:
+		return "Coordinator"
+	}
+	return "unknown"
+}
+
+type Durability C.pn_durability_t
+
+const (
+	PnNondurable    Durability = C.PN_NONDURABLE
+	PnConfiguration Durability = C.PN_CONFIGURATION
+	PnDeliveries    Durability = C.PN_DELIVERIES
+)
+
+func (e Durability) String() string {
+	switch e {
+
+	case C.PN_NONDURABLE:
+		return "Nondurable"
+	case C.PN_CONFIGURATION:
+		return "Configuration"
+	case C.PN_DELIVERIES:
+		return "Deliveries"
+	}
+	return "unknown"
+}
+
+type ExpiryPolicy C.pn_expiry_policy_t
+
+const (
+	PnExpireWithLink       ExpiryPolicy = C.PN_EXPIRE_WITH_LINK
+	PnExpireWithSession    ExpiryPolicy = C.PN_EXPIRE_WITH_SESSION
+	PnExpireWithConnection ExpiryPolicy = C.PN_EXPIRE_WITH_CONNECTION
+	PnExpireNever          ExpiryPolicy = C.PN_EXPIRE_NEVER
+)
+
+func (e ExpiryPolicy) String() string {
+	switch e {
+
+	case C.PN_EXPIRE_WITH_LINK:
+		return "ExpireWithLink"
+	case C.PN_EXPIRE_WITH_SESSION:
+		return "ExpireWithSession"
+	case C.PN_EXPIRE_WITH_CONNECTION:
+		return "ExpireWithConnection"
+	case C.PN_EXPIRE_NEVER:
+		return "ExpireNever"
+	}
+	return "unknown"
+}
+
+type DistributionMode C.pn_distribution_mode_t
+
+const (
+	PnDistModeUnspecified DistributionMode = C.PN_DIST_MODE_UNSPECIFIED
+	PnDistModeCopy        DistributionMode = C.PN_DIST_MODE_COPY
+	PnDistModeMove        DistributionMode = C.PN_DIST_MODE_MOVE
+)
+
+func (e DistributionMode) String() string {
+	switch e {
+
+	case C.PN_DIST_MODE_UNSPECIFIED:
+		return "DistModeUnspecified"
+	case C.PN_DIST_MODE_COPY:
+		return "DistModeCopy"
+	case C.PN_DIST_MODE_MOVE:
+		return "DistModeMove"
+	}
+	return "unknown"
+}
+
+type Terminus struct{ pn *C.pn_terminus_t }
+
+func (t Terminus) IsNil() bool { return t.pn == nil }
+func (t Terminus) Type() TerminusType {
+	return TerminusType(C.pn_terminus_get_type(t.pn))
+}
+func (t Terminus) SetType(type_ TerminusType) int {
+	return int(C.pn_terminus_set_type(t.pn, C.pn_terminus_type_t(type_)))
+}
+func (t Terminus) Address() string {
+	return C.GoString(C.pn_terminus_get_address(t.pn))
+}
+func (t Terminus) SetAddress(address string) int {
+	addressC := C.CString(address)
+	defer C.free(unsafe.Pointer(addressC))
+
+	return int(C.pn_terminus_set_address(t.pn, addressC))
+}
+func (t Terminus) SetDistributionMode(mode DistributionMode) int {
+	return int(C.pn_terminus_set_distribution_mode(t.pn, C.pn_distribution_mode_t(mode)))
+}
+func (t Terminus) Durability() Durability {
+	return Durability(C.pn_terminus_get_durability(t.pn))
+}
+func (t Terminus) SetDurability(durability Durability) int {
+	return int(C.pn_terminus_set_durability(t.pn, C.pn_durability_t(durability)))
+}
+func (t Terminus) ExpiryPolicy() ExpiryPolicy {
+	return ExpiryPolicy(C.pn_terminus_get_expiry_policy(t.pn))
+}
+func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int {
+	return int(C.pn_terminus_set_expiry_policy(t.pn, C.pn_expiry_policy_t(policy)))
+}
+func (t Terminus) Timeout() time.Duration {
+	return (time.Duration(C.pn_terminus_get_timeout(t.pn)) * time.Second)
+}
+func (t Terminus) SetTimeout(timeout time.Duration) int {
+	return int(C.pn_terminus_set_timeout(t.pn, C.pn_seconds_t(timeout)))
+}
+func (t Terminus) IsDynamic() bool {
+	return bool(C.pn_terminus_is_dynamic(t.pn))
+}
+func (t Terminus) SetDynamic(dynamic bool) int {
+	return int(C.pn_terminus_set_dynamic(t.pn, C.bool(dynamic)))
+}
+func (t Terminus) Properties() Data {
+	return Data{C.pn_terminus_properties(t.pn)}
+}
+func (t Terminus) Capabilities() Data {
+	return Data{C.pn_terminus_capabilities(t.pn)}
+}
+func (t Terminus) Outcomes() Data {
+	return Data{C.pn_terminus_outcomes(t.pn)}
+}
+func (t Terminus) Filter() Data {
+	return Data{C.pn_terminus_filter(t.pn)}
+}
+func (t Terminus) Copy(src Terminus) int {
+	return int(C.pn_terminus_copy(t.pn, src.pn))
+}
+
+// Wrappers for declarations in connection.h
+
+type Connection struct{ pn *C.pn_connection_t }
+
+func (c Connection) IsNil() bool { return c.pn == nil }
+func (c Connection) Free() {
+	C.pn_connection_free(c.pn)
+}
+func (c Connection) Release() {
+	C.pn_connection_release(c.pn)
+}
+func (c Connection) Error() error {
+	return internal.PnError(unsafe.Pointer(C.pn_connection_error(c.pn)))
+}
+func (c Connection) State() State {
+	return State(C.pn_connection_state(c.pn))
+}
+func (c Connection) Open() {
+	C.pn_connection_open(c.pn)
+}
+func (c Connection) Close() {
+	C.pn_connection_close(c.pn)
+}
+func (c Connection) Reset() {
+	C.pn_connection_reset(c.pn)
+}
+func (c Connection) Condition() Condition {
+	return Condition{C.pn_connection_condition(c.pn)}
+}
+func (c Connection) RemoteCondition() Condition {
+	return Condition{C.pn_connection_remote_condition(c.pn)}
+}
+func (c Connection) Container() string {
+	return C.GoString(C.pn_connection_get_container(c.pn))
+}
+func (c Connection) SetContainer(container string) {
+	containerC := C.CString(container)
+	defer C.free(unsafe.Pointer(containerC))
+
+	C.pn_connection_set_container(c.pn, containerC)
+}
+func (c Connection) SetUser(user string) {
+	userC := C.CString(user)
+	defer C.free(unsafe.Pointer(userC))
+
+	C.pn_connection_set_user(c.pn, userC)
+}
+func (c Connection) SetPassword(password string) {
+	passwordC := C.CString(password)
+	defer C.free(unsafe.Pointer(passwordC))
+
+	C.pn_connection_set_password(c.pn, passwordC)
+}
+func (c Connection) User() string {
+	return C.GoString(C.pn_connection_get_user(c.pn))
+}
+func (c Connection) Hostname() string {
+	return C.GoString(C.pn_connection_get_hostname(c.pn))
+}
+func (c Connection) SetHostname(hostname string) {
+	hostnameC := C.CString(hostname)
+	defer C.free(unsafe.Pointer(hostnameC))
+
+	C.pn_connection_set_hostname(c.pn, hostnameC)
+}
+func (c Connection) RemoteContainer() string {
+	return C.GoString(C.pn_connection_remote_container(c.pn))
+}
+func (c Connection) RemoteHostname() string {
+	return C.GoString(C.pn_connection_remote_hostname(c.pn))
+}
+func (c Connection) OfferedCapabilities() Data {
+	return Data{C.pn_connection_offered_capabilities(c.pn)}
+}
+func (c Connection) DesiredCapabilities() Data {
+	return Data{C.pn_connection_desired_capabilities(c.pn)}
+}
+func (c Connection) Properties() Data {
+	return Data{C.pn_connection_properties(c.pn)}
+}
+func (c Connection) RemoteOfferedCapabilities() Data {
+	return Data{C.pn_connection_remote_offered_capabilities(c.pn)}
+}
+func (c Connection) RemoteDesiredCapabilities() Data {
+	return Data{C.pn_connection_remote_desired_capabilities(c.pn)}
+}
+func (c Connection) RemoteProperties() Data {
+	return Data{C.pn_connection_remote_properties(c.pn)}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/internal/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/internal/error.go b/proton-c/bindings/go/internal/error.go
new file mode 100644
index 0000000..01ba890
--- /dev/null
+++ b/proton-c/bindings/go/internal/error.go
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Internal implementation details - ignore.
+package internal
+
+// #cgo LDFLAGS: -lqpid-proton
+// #include <proton/error.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"fmt"
+	"runtime"
+	"sync"
+	"sync/atomic"
+	"unsafe"
+)
+
+// Error type for all proton errors.
+type Error string
+
+// Error prefixes error message with proton:
+func (e Error) Error() string {
+	return "proton: " + string(e)
+}
+
+// Errorf creates an Error with a formatted message
+func Errorf(format string, a ...interface{}) Error {
+	return Error(fmt.Sprintf(format, a...))
+}
+
+type PnErrorCode int
+
+func (e PnErrorCode) String() string {
+	switch e {
+	case C.PN_EOS:
+		return "end-of-data"
+	case C.PN_ERR:
+		return "error"
+	case C.PN_OVERFLOW:
+		return "overflow"
+	case C.PN_UNDERFLOW:
+		return "underflow"
+	case C.PN_STATE_ERR:
+		return "bad-state"
+	case C.PN_ARG_ERR:
+		return "invalid-argument"
+	case C.PN_TIMEOUT:
+		return "timeout"
+	case C.PN_INTR:
+		return "interrupted"
+	case C.PN_INPROGRESS:
+		return "in-progress"
+	default:
+		return fmt.Sprintf("unknown-error(%d)", e)
+	}
+}
+
+func PnError(p unsafe.Pointer) error {
+	e := (*C.pn_error_t)(p)
+	if e == nil || C.pn_error_code(e) == 0 {
+		return nil
+	}
+	return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
+}
+
+// DoRecover is called to recover from internal panics
+func DoRecover(err *error) {
+	r := recover()
+	switch r := r.(type) {
+	case nil: // We are not recovering
+		return
+	case runtime.Error: // Don't catch runtime.Error
+		panic(r)
+	case error:
+		*err = r
+	default:
+		panic(r)
+	}
+}
+
+// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
+func panicIf(condition bool, fmt string, args ...interface{}) {
+	if condition {
+		panic(Errorf(fmt, args...))
+	}
+}
+
+// FirstError is a goroutine-safe error holder that keeps the first error that is set.
+type FirstError struct {
+	err  atomic.Value
+	once sync.Once
+}
+
+// Set the error if not allread set.
+func (e *FirstError) Set(err error) {
+	e.once.Do(func() { e.err.Store(err) })
+}
+
+// Get the error.
+func (e *FirstError) Get() error {
+	v := e.err.Load()
+	if v != nil {
+		return v.(error)
+	} else {
+		return nil
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/messaging/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/messaging/doc.go b/proton-c/bindings/go/messaging/doc.go
new file mode 100644
index 0000000..c815f4e
--- /dev/null
+++ b/proton-c/bindings/go/messaging/doc.go
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package messaging provides a procedural, concurrent Go API for exchanging AMQP messages.
+*/
+package messaging
+
+// #cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// Just for package comment

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/messaging/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/messaging/handler.go b/proton-c/bindings/go/messaging/handler.go
new file mode 100644
index 0000000..4a97b9d
--- /dev/null
+++ b/proton-c/bindings/go/messaging/handler.go
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package messaging
+
+import (
+	"qpid.apache.org/proton/go/amqp"
+	"qpid.apache.org/proton/go/event"
+)
+
+// FIXME aconway 2015-04-28: cleanup - exposing delivery vs. disposition.
+
+type acksMap map[event.Delivery]chan Disposition
+type receiverMap map[event.Link]chan amqp.Message
+
+type handler struct {
+	connection *Connection
+	acks       acksMap
+	receivers  receiverMap
+}
+
+func newHandler(c *Connection) *handler {
+	return &handler{c, make(acksMap), make(receiverMap)}
+}
+
+func (h *handler) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error {
+	switch t {
+	// FIXME aconway 2015-04-29: handle errors.
+	case event.MConnectionClosed:
+		for _, ack := range h.acks {
+			// FIXME aconway 2015-04-29: communicate error info
+			close(ack)
+		}
+
+	case event.MSettled:
+		ack := h.acks[e.Delivery()]
+		if ack != nil {
+			ack <- Disposition(e.Delivery().Remote().Type())
+			close(ack)
+			delete(h.acks, e.Delivery())
+		}
+
+	case event.MMessage:
+		r := h.receivers[e.Link()]
+		if r != nil {
+			m, _ := event.DecodeMessage(e)
+			// FIXME aconway 2015-04-29: hack, direct send, possible blocking.
+			r <- m
+		} else {
+			// FIXME aconway 2015-04-29: Message with no receiver - log? panic? deadletter? drop?
+		}
+	}
+	return nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message