qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [4/5] qpid-proton git commit: PROTON-1011: Go example of event driven broker. Package renaming and some new features.
Date Thu, 08 Oct 2015 04:30:36 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
new file mode 100644
index 0000000..5ba4f4f
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
@@ -0,0 +1,347 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <proton/codec.h>
+// #include <stdlib.h>
+//
+// /* Helper for setting message string fields */
+// typedef int (*set_fn)(pn_message_t*, const char*);
+// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
+//     int result = set(m, s);
+//     free(s);
+//     return result;
+// }
+//
+import "C"
+
+import (
+	"qpid.apache.org/internal"
+	"runtime"
+	"time"
+	"unsafe"
+)
+
+// Message is the interface to an AMQP message.
+type Message interface {
+	// Durable indicates that any parties taking responsibility
+	// for the message must durably store the content.
+	Durable() bool
+	SetDurable(bool)
+
+	// Priority impacts ordering guarantees. Within a
+	// given ordered context, higher priority messages may jump ahead of
+	// lower priority messages.
+	Priority() uint8
+	SetPriority(uint8)
+
+	// TTL or Time To Live, a message it may be dropped after this duration
+	TTL() time.Duration
+	SetTTL(time.Duration)
+
+	// FirstAcquirer indicates
+	// that the recipient of the message is the first recipient to acquire
+	// the message, i.e. there have been no failed delivery attempts to
+	// other acquirers. Note that this does not mean the message has not
+	// been delivered to, but not acquired, by other recipients.
+	FirstAcquirer() bool
+	SetFirstAcquirer(bool)
+
+	// DeliveryCount tracks how many attempts have been made to
+	// delivery a message.
+	DeliveryCount() uint32
+	SetDeliveryCount(uint32)
+
+	// MessageId provides a unique identifier for a message.
+	// it can be an a string, an unsigned long, a uuid or a
+	// binary value.
+	MessageId() interface{}
+	SetMessageId(interface{})
+
+	UserId() string
+	SetUserId(string)
+
+	Address() string
+	SetAddress(string)
+
+	Subject() string
+	SetSubject(string)
+
+	ReplyTo() string
+	SetReplyTo(string)
+
+	// CorrelationId is set on correlated request and response messages. It can be
+	// an a string, an unsigned long, a uuid or a binary value.
+	CorrelationId() interface{}
+	SetCorrelationId(interface{})
+
+	ContentType() string
+	SetContentType(string)
+
+	ContentEncoding() string
+	SetContentEncoding(string)
+
+	// ExpiryTime indicates an absoulte time when the message may be dropped.
+	// A Zero time (i.e. t.isZero() == true) indicates a message never expires.
+	ExpiryTime() time.Time
+	SetExpiryTime(time.Time)
+
+	CreationTime() time.Time
+	SetCreationTime(time.Time)
+
+	GroupId() string
+	SetGroupId(string)
+
+	GroupSequence() int32
+	SetGroupSequence(int32)
+
+	ReplyToGroupId() string
+	SetReplyToGroupId(string)
+
+	// Instructions - AMQP delivery instructions.
+	Instructions() map[string]interface{}
+	SetInstructions(v map[string]interface{})
+
+	// Annotations - AMQP annotations.
+	Annotations() map[string]interface{}
+	SetAnnotations(v map[string]interface{})
+
+	// Properties - Application properties.
+	Properties() map[string]interface{}
+	SetProperties(v map[string]interface{})
+
+	// Inferred indicates how the message content
+	// is encoded into AMQP sections. If inferred is true then binary and
+	// list values in the body of the message will be encoded as AMQP DATA
+	// and AMQP SEQUENCE sections, respectively. If inferred is false,
+	// then all values in the body of the message will be encoded as AMQP
+	// VALUE sections regardless of their type.
+	Inferred() bool
+	SetInferred(bool)
+
+	// Marshal a Go value into the message body. See amqp.Marshal() for details.
+	Marshal(interface{})
+
+	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+	Unmarshal(interface{})
+
+	// Body value resulting from the default unmarshalling of message body as interface{}
+	Body() interface{}
+
+	// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
+	// the message is encoded into it, otherwise a new buffer is created.
+	// Returns the buffer containing the message.
+	Encode(buffer []byte) ([]byte, error)
+
+	// Decode data into this message. Overwrites an existing message content.
+	Decode(buffer []byte) error
+
+	// Clear the message contents.
+	Clear()
+
+	// Copy the contents of another message to this one.
+	Copy(m Message) error
+}
+
+type message struct{ pn *C.pn_message_t }
+
+func freeMessage(m *message) {
+	C.pn_message_free(m.pn)
+	m.pn = nil
+}
+
+// NewMessage creates a new message instance.
+func NewMessage() Message {
+	m := &message{C.pn_message()}
+	runtime.SetFinalizer(m, freeMessage)
+	return m
+}
+
+// NewMessageWith creates a message with value as the body. Equivalent to
+//     m := NewMessage(); m.Marshal(body)
+func NewMessageWith(value interface{}) Message {
+	m := NewMessage()
+	m.Marshal(value)
+	return m
+}
+
+func (m *message) Clear() { C.pn_message_clear(m.pn) }
+
+func (m *message) Copy(x Message) error {
+	if data, err := x.Encode(nil); err == nil {
+		return m.Decode(data)
+	} else {
+		return err
+	}
+}
+
+// ==== message get functions
+
+func rewindGet(data *C.pn_data_t) (v interface{}) {
+	C.pn_data_rewind(data)
+	C.pn_data_next(data)
+	unmarshal(&v, data)
+	return v
+}
+
+func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
+	C.pn_data_rewind(data)
+	C.pn_data_next(data)
+	unmarshal(&v, data)
+	return v
+}
+
+func (m *message) Inferred() bool  { return bool(C.pn_message_is_inferred(m.pn)) }
+func (m *message) Durable() bool   { return bool(C.pn_message_is_durable(m.pn)) }
+func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
+func (m *message) TTL() time.Duration {
+	return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+}
+func (m *message) FirstAcquirer() bool        { return bool(C.pn_message_is_first_acquirer(m.pn)) }
+func (m *message) DeliveryCount() uint32      { return uint32(C.pn_message_get_delivery_count(m.pn)) }
+func (m *message) MessageId() interface{}     { return rewindGet(C.pn_message_id(m.pn)) }
+func (m *message) UserId() string             { return goString(C.pn_message_get_user_id(m.pn)) }
+func (m *message) Address() string            { return C.GoString(C.pn_message_get_address(m.pn)) }
+func (m *message) Subject() string            { return C.GoString(C.pn_message_get_subject(m.pn)) }
+func (m *message) ReplyTo() string            { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
+func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
+func (m *message) ContentType() string        { return C.GoString(C.pn_message_get_content_type(m.pn)) }
+func (m *message) ContentEncoding() string    { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
+
+func (m *message) ExpiryTime() time.Time {
+	return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
+}
+func (m *message) CreationTime() time.Time {
+	return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
+}
+func (m *message) GroupId() string        { return C.GoString(C.pn_message_get_group_id(m.pn)) }
+func (m *message) GroupSequence() int32   { return int32(C.pn_message_get_group_sequence(m.pn)) }
+func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+
+func (m *message) Instructions() map[string]interface{} {
+	return rewindMap(C.pn_message_instructions(m.pn))
+}
+func (m *message) Annotations() map[string]interface{} {
+	return rewindMap(C.pn_message_annotations(m.pn))
+}
+func (m *message) Properties() map[string]interface{} {
+	return rewindMap(C.pn_message_properties(m.pn))
+}
+
+// ==== message set methods
+
+func setData(v interface{}, data *C.pn_data_t) {
+	C.pn_data_clear(data)
+	marshal(v, data)
+}
+
+func dataString(data *C.pn_data_t) string {
+	str := C.pn_string(C.CString(""))
+	defer C.pn_free(unsafe.Pointer(str))
+	C.pn_inspect(unsafe.Pointer(data), str)
+	return C.GoString(C.pn_string_get(str))
+}
+
+func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
+func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
+func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
+func (m *message) SetTTL(d time.Duration) {
+	C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
+}
+func (m *message) SetFirstAcquirer(b bool)     { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
+func (m *message) SetDeliveryCount(c uint32)   { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
+func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
+func (m *message) SetUserId(s string)          { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
+func (m *message) SetAddress(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
+}
+func (m *message) SetSubject(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
+}
+func (m *message) SetReplyTo(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
+}
+func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
+func (m *message) SetContentType(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
+}
+func (m *message) SetContentEncoding(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
+}
+func (m *message) SetExpiryTime(t time.Time)   { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
+func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
+func (m *message) SetGroupId(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
+}
+func (m *message) SetGroupSequence(s int32) {
+	C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
+}
+func (m *message) SetReplyToGroupId(s string) {
+	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+}
+
+func (m *message) SetInstructions(v map[string]interface{}) {
+	setData(v, C.pn_message_instructions(m.pn))
+}
+func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
+func (m *message) SetProperties(v map[string]interface{})  { setData(v, C.pn_message_properties(m.pn)) }
+
+// Marshal/Unmarshal body
+func (m *message) Marshal(v interface{})   { clearMarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Body() (v interface{})   { m.Unmarshal(&v); return }
+
+func (m *message) Decode(data []byte) error {
+	m.Clear()
+	if len(data) == 0 {
+		return internal.Errorf("empty buffer for decode")
+	}
+	if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
+		return internal.Errorf("decoding message: %s",
+			internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
+	}
+	return nil
+}
+
+func DecodeMessage(data []byte) (m Message, err error) {
+	m = NewMessage()
+	err = m.Decode(data)
+	return
+}
+
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+	encode := func(buf []byte) ([]byte, error) {
+		len := cLen(buf)
+		result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+		switch {
+		case result == C.PN_OVERFLOW:
+			return buf, overflow
+		case result < 0:
+			return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
+		default:
+			return buf[:len], nil
+		}
+	}
+	return encodeGrow(buffer, encode)
+}
+
+// TODO aconway 2015-09-14: Multi-section messages.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
new file mode 100644
index 0000000..7a6e5a8
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+	"testing"
+	"time"
+)
+
+func roundTrip(m Message) error {
+	buffer, err := m.Encode(nil)
+	if err != nil {
+		return err
+	}
+	m2, err := DecodeMessage(buffer)
+	if err != nil {
+		return err
+	}
+	return checkEqual(m, m2)
+}
+
+func TestDefaultMessage(t *testing.T) {
+	m := NewMessage()
+	// Check defaults
+	for _, data := range [][]interface{}{
+		{m.Inferred(), false},
+		{m.Durable(), false},
+		{m.Priority(), uint8(4)},
+		{m.TTL(), time.Duration(0)},
+		{m.UserId(), ""},
+		{m.Address(), ""},
+		{m.Subject(), ""},
+		{m.ReplyTo(), ""},
+		{m.ContentType(), ""},
+		{m.ContentEncoding(), ""},
+		{m.GroupId(), ""},
+		{m.GroupSequence(), int32(0)},
+		{m.ReplyToGroupId(), ""},
+		{m.MessageId(), nil},
+		{m.CorrelationId(), nil},
+		{m.Instructions(), map[string]interface{}{}},
+		{m.Annotations(), map[string]interface{}{}},
+		{m.Properties(), map[string]interface{}{}},
+		{m.Body(), nil},
+	} {
+		if err := checkEqual(data[0], data[1]); err != nil {
+			t.Error(err)
+		}
+	}
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+	m := NewMessage()
+	m.SetInferred(false)
+	m.SetDurable(true)
+	m.SetPriority(42)
+	m.SetTTL(0)
+	m.SetUserId("user")
+	m.SetAddress("address")
+	m.SetSubject("subject")
+	m.SetReplyTo("replyto")
+	m.SetContentType("content")
+	m.SetContentEncoding("encoding")
+	m.SetGroupId("group")
+	m.SetGroupSequence(42)
+	m.SetReplyToGroupId("replytogroup")
+	m.SetMessageId("id")
+	m.SetCorrelationId("correlation")
+	m.SetInstructions(map[string]interface{}{"instructions": "foo"})
+	m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
+	m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
+	m.Marshal("hello")
+
+	for _, data := range [][]interface{}{
+		{m.Inferred(), false},
+		{m.Durable(), true},
+		{m.Priority(), uint8(42)},
+		{m.TTL(), time.Duration(0)},
+		{m.UserId(), "user"},
+		{m.Address(), "address"},
+		{m.Subject(), "subject"},
+		{m.ReplyTo(), "replyto"},
+		{m.ContentType(), "content"},
+		{m.ContentEncoding(), "encoding"},
+		{m.GroupId(), "group"},
+		{m.GroupSequence(), int32(42)},
+		{m.ReplyToGroupId(), "replytogroup"},
+		{m.MessageId(), "id"},
+		{m.CorrelationId(), "correlation"},
+		{m.Instructions(), map[string]interface{}{"instructions": "foo"}},
+		{m.Annotations(), map[string]interface{}{"annotations": "foo"}},
+		{m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
+		{m.Body(), "hello"},
+	} {
+		if err := checkEqual(data[0], data[1]); err != nil {
+			t.Error(err)
+		}
+	}
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMessageBodyTypes(t *testing.T) {
+	var s string
+	var body interface{}
+	var i int64
+
+	m := NewMessageWith(int64(42))
+	m.Unmarshal(&body)
+	m.Unmarshal(&i)
+	if err := checkEqual(body.(int64), int64(42)); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(i, int64(42)); err != nil {
+		t.Error(err)
+	}
+
+	m = NewMessageWith("hello")
+	m.Unmarshal(&s)
+	m.Unmarshal(&body)
+	if err := checkEqual(s, "hello"); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(body.(string), "hello"); err != nil {
+		t.Error(err)
+	}
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
+
+	m = NewMessageWith(Binary("bin"))
+	m.Unmarshal(&s)
+	m.Unmarshal(&body)
+	if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(s, "bin"); err != nil {
+		t.Error(err)
+	}
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
+
+	// TODO aconway 2015-09-08: array etc.
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
new file mode 100644
index 0000000..131c974
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"time"
+	"unsafe"
+)
+
+type Type C.pn_type_t
+
+func (t Type) String() string {
+	switch C.pn_type_t(t) {
+	case C.PN_NULL:
+		return "null"
+	case C.PN_BOOL:
+		return "bool"
+	case C.PN_UBYTE:
+		return "ubyte"
+	case C.PN_BYTE:
+		return "byte"
+	case C.PN_USHORT:
+		return "ushort"
+	case C.PN_SHORT:
+		return "short"
+	case C.PN_CHAR:
+		return "char"
+	case C.PN_UINT:
+		return "uint"
+	case C.PN_INT:
+		return "int"
+	case C.PN_ULONG:
+		return "ulong"
+	case C.PN_LONG:
+		return "long"
+	case C.PN_TIMESTAMP:
+		return "timestamp"
+	case C.PN_FLOAT:
+		return "float"
+	case C.PN_DOUBLE:
+		return "double"
+	case C.PN_DECIMAL32:
+		return "decimal32"
+	case C.PN_DECIMAL64:
+		return "decimal64"
+	case C.PN_DECIMAL128:
+		return "decimal128"
+	case C.PN_UUID:
+		return "uuid"
+	case C.PN_BINARY:
+		return "binary"
+	case C.PN_STRING:
+		return "string"
+	case C.PN_SYMBOL:
+		return "symbol"
+	case C.PN_DESCRIBED:
+		return "described"
+	case C.PN_ARRAY:
+		return "array"
+	case C.PN_LIST:
+		return "list"
+	case C.PN_MAP:
+		return "map"
+	case C.PN_INVALID:
+		return "no-data"
+	default:
+		return fmt.Sprintf("unknown-type(%d)", t)
+	}
+}
+
+// Go types
+var (
+	bytesType = reflect.TypeOf([]byte{})
+	valueType = reflect.TypeOf(reflect.Value{})
+)
+
+// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
+
+// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
+type Map map[interface{}]interface{}
+
+// List is a generic list that can hold mixed values and can represent any AMQP list.
+//
+type List []interface{}
+
+// Symbol is a string that is encoded as an AMQP symbol
+type Symbol string
+
+func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
+
+// Binary is a string that is encoded as an AMQP binary.
+// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
+// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
+type Binary string
+
+func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
+
+// GoString for Map prints values with their types, useful for debugging.
+func (m Map) GoString() string {
+	out := &bytes.Buffer{}
+	fmt.Fprintf(out, "%T{", m)
+	i := len(m)
+	for k, v := range m {
+		fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
+		i--
+		if i > 0 {
+			fmt.Fprint(out, ", ")
+		}
+	}
+	fmt.Fprint(out, "}")
+	return out.String()
+}
+
+// GoString for List prints values with their types, useful for debugging.
+func (l List) GoString() string {
+	out := &bytes.Buffer{}
+	fmt.Fprintf(out, "%T{", l)
+	for i := 0; i < len(l); i++ {
+		fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
+		if i == len(l)-1 {
+			fmt.Fprint(out, ", ")
+		}
+	}
+	fmt.Fprint(out, "}")
+	return out.String()
+}
+
+// pnTime converts Go time.Time to Proton millisecond Unix time.
+func pnTime(t time.Time) C.pn_timestamp_t {
+	secs := t.Unix()
+	// Note: sub-second accuracy is not guaraunteed if the Unix time in
+	// nanoseconds cannot be represented by an int64 (sometime around year 2260)
+	msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
+	return C.pn_timestamp_t(secs*1000 + msecs)
+}
+
+// goTime converts a pn_timestamp_t to a Go time.Time.
+func goTime(t C.pn_timestamp_t) time.Time {
+	secs := int64(t) / 1000
+	nsecs := (int64(t) % 1000) * int64(time.Millisecond)
+	return time.Unix(secs, nsecs)
+}
+
+func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
+	if cBytes.start != nil {
+		bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
+	}
+	return
+}
+
+func goString(cBytes C.pn_bytes_t) (str string) {
+	if cBytes.start != nil {
+		str = C.GoStringN(cBytes.start, C.int(cBytes.size))
+	}
+	return
+}
+
+func pnBytes(b []byte) C.pn_bytes_t {
+	if len(b) == 0 {
+		return C.pn_bytes_t{0, nil}
+	} else {
+		return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
+	}
+}
+
+func cPtr(b []byte) *C.char {
+	if len(b) == 0 {
+		return nil
+	}
+	return (*C.char)(unsafe.Pointer(&b[0]))
+}
+
+func cLen(b []byte) C.size_t {
+	return C.size_t(len(b))
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
new file mode 100644
index 0000000..61c6d3f
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -0,0 +1,556 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"qpid.apache.org/internal"
+	"reflect"
+	"unsafe"
+)
+
+const minDecode = 1024
+
+// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
+type UnmarshalError struct {
+	// The name of the AMQP type.
+	AMQPType string
+	// The Go type.
+	GoType reflect.Type
+}
+
+func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
+	return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)}
+}
+
+func (e UnmarshalError) Error() string {
+	if e.GoType.Kind() != reflect.Ptr {
+		return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
+	} else {
+		return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
+	}
+}
+
+func doRecover(err *error) {
+	r := recover()
+	switch r := r.(type) {
+	case nil:
+	case *UnmarshalError, internal.Error:
+		*err = r.(error)
+	default:
+		panic(r)
+	}
+}
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+	reader io.Reader
+	buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested.  Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+	return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+	return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+	defer doRecover(&err)
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	var n int
+	for n == 0 && err == nil {
+		n = decode(data, d.buffer.Bytes())
+		if n == 0 { // n == 0 means not enough data, read more
+			err = d.more()
+		} else {
+			unmarshal(v, data)
+		}
+	}
+	d.buffer.Next(n)
+	return
+}
+
+/*
+Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
+Types are converted as follows:
+
+ +---------------------------+----------------------------------------------------------------------+
+ |To Go types                |From AMQP types                                                       |
+ +===========================+======================================================================+
+ |bool                       |bool                                                                  |
+ +---------------------------+----------------------------------------------------------------------+
+ |int, int8, int16,          |Equivalent or smaller signed integer type: byte, short, int, long.    |
+ |int32, int64               |                                                                      |
+ +---------------------------+----------------------------------------------------------------------+
+ |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: ubyte, ushort, uint,     |
+ |uint32, uint64 types       |ulong                                                                 |
+ +---------------------------+----------------------------------------------------------------------+
+ |float32, float64           |Equivalent or smaller float or double.                                |
+ +---------------------------+----------------------------------------------------------------------+
+ |string, []byte             |string, symbol or binary.                                             |
+ +---------------------------+----------------------------------------------------------------------+
+ |Symbol                     |symbol                                                                |
+ +---------------------------+----------------------------------------------------------------------+
+ |map[K]T                    |map, provided all keys and values can unmarshal to types K, T         |
+ +---------------------------+----------------------------------------------------------------------+
+ |Map                        |map, any AMQP map                                                     |
+ +---------------------------+----------------------------------------------------------------------+
+ |interface{}                |Any AMQP value can be unmarshaled to an interface{} as follows:       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |AMQP Type               |Go Type in interface{}                       |
+ |                           +========================+=============================================+
+ |                           |bool                    |bool                                         |
+ |                           +------------------------+---------------------------------------------+
+ |                           |byte,short,int,long     |int8,int16,int32,int64                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64                   |
+ |                           +------------------------+---------------------------------------------+
+ |                           |float, double           |float32, float64                             |
+ |                           +------------------------+---------------------------------------------+
+ |                           |string                  |string                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |symbol                  |Symbol                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |binary                  |Binary                                       |
+ |                           +------------------------+---------------------------------------------+
+ |                           |nulll                   |nil                                          |
+ |                           +------------------------+---------------------------------------------+
+ |                           |map                     |Map                                          |
+ |                           +------------------------+---------------------------------------------+
+ |                           |list                    |List                                         |
+ +---------------------------+------------------------+---------------------------------------------+
+
+The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
+
+TODO
+
+Go types: array, struct.
+
+AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
+
+AMQP maps with mixed/unhashable key types need an alternate representation.
+
+Described types.
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+	defer doRecover(&err)
+
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	n = decode(data, bytes)
+	if n == 0 {
+		err = internal.Errorf("not enough data")
+	} else {
+		unmarshal(v, data)
+	}
+	return
+}
+
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+	var readSize int64 = minDecode
+	if int64(d.buffer.Len()) > readSize { // Grow by doubling
+		readSize = int64(d.buffer.Len())
+	}
+	var n int64
+	n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+	if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+		err = io.EOF
+	}
+	return err
+}
+
+// Unmarshal from data into value pointed at by v.
+func unmarshal(v interface{}, data *C.pn_data_t) {
+	pnType := C.pn_data_type(data)
+	switch v := v.(type) {
+	case *bool:
+		switch pnType {
+		case C.PN_BOOL:
+			*v = bool(C.pn_data_get_bool(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *int8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int8(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int8(C.pn_data_get_byte(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *uint8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint8(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint8(C.pn_data_get_ubyte(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *int16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int16(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int16(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int16(C.pn_data_get_short(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *uint16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint16(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint16(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint16(C.pn_data_get_ushort(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *int32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int32(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int32(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int32(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int32(C.pn_data_get_int(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	case *uint32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint32(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint32(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint32(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint32(C.pn_data_get_uint(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *int64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int64(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int64(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int64(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int64(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			*v = int64(C.pn_data_get_long(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *uint64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint64(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint64(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint64(C.pn_data_get_ushort(data))
+		case C.PN_ULONG:
+			*v = uint64(C.pn_data_get_ulong(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *int:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = int(C.pn_data_get_long(data))
+			} else {
+				panic(newUnmarshalError(pnType, v))
+			}
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *uint:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint(C.pn_data_get_uint(data))
+		case C.PN_ULONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = uint(C.pn_data_get_ulong(data))
+			} else {
+				panic(newUnmarshalError(pnType, v))
+			}
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *float32:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float32(C.pn_data_get_float(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *float64:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float64(C.pn_data_get_float(data))
+		case C.PN_DOUBLE:
+			*v = float64(C.pn_data_get_double(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *string:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goString(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goString(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goString(C.pn_data_get_binary(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *[]byte:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goBytes(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goBytes(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goBytes(C.pn_data_get_binary(data))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *Binary:
+		switch pnType {
+		case C.PN_BINARY:
+			*v = Binary(goBytes(C.pn_data_get_binary(data)))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *Symbol:
+		switch pnType {
+		case C.PN_SYMBOL:
+			*v = Symbol(goBytes(C.pn_data_get_symbol(data)))
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+
+	case *interface{}:
+		getInterface(data, v)
+
+	default:
+		if reflect.TypeOf(v).Kind() != reflect.Ptr {
+			panic(newUnmarshalError(pnType, v))
+		}
+		switch reflect.TypeOf(v).Elem().Kind() {
+		case reflect.Map:
+			getMap(data, v)
+		case reflect.Slice:
+			getList(data, v)
+		default:
+			panic(newUnmarshalError(pnType, v))
+		}
+	}
+	err := dataError("unmarshaling", data)
+	if err != nil {
+		panic(err)
+	}
+	return
+}
+
+func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
+	C.pn_data_rewind(data)
+	C.pn_data_next(data)
+	unmarshal(v, data)
+}
+
+// Getting into an interface is driven completely by the AMQP type, since the interface{}
+// target is type-neutral.
+func getInterface(data *C.pn_data_t, v *interface{}) {
+	pnType := C.pn_data_type(data)
+	switch pnType {
+	case C.PN_NULL, C.PN_INVALID: // No data.
+		*v = nil
+	case C.PN_BOOL:
+		*v = bool(C.pn_data_get_bool(data))
+	case C.PN_UBYTE:
+		*v = uint8(C.pn_data_get_ubyte(data))
+	case C.PN_BYTE:
+		*v = int8(C.pn_data_get_byte(data))
+	case C.PN_USHORT:
+		*v = uint16(C.pn_data_get_ushort(data))
+	case C.PN_SHORT:
+		*v = int16(C.pn_data_get_short(data))
+	case C.PN_UINT:
+		*v = uint32(C.pn_data_get_uint(data))
+	case C.PN_INT:
+		*v = int32(C.pn_data_get_int(data))
+	case C.PN_CHAR:
+		*v = uint8(C.pn_data_get_char(data))
+	case C.PN_ULONG:
+		*v = uint64(C.pn_data_get_ulong(data))
+	case C.PN_LONG:
+		*v = int64(C.pn_data_get_long(data))
+	case C.PN_FLOAT:
+		*v = float32(C.pn_data_get_float(data))
+	case C.PN_DOUBLE:
+		*v = float64(C.pn_data_get_double(data))
+	case C.PN_BINARY:
+		*v = Binary(goBytes(C.pn_data_get_binary(data)))
+	case C.PN_STRING:
+		*v = goString(C.pn_data_get_string(data))
+	case C.PN_SYMBOL:
+		*v = Symbol(goString(C.pn_data_get_symbol(data)))
+	case C.PN_MAP:
+		m := make(Map)
+		unmarshal(&m, data)
+		*v = m
+	case C.PN_LIST:
+		l := make(List, 0)
+		unmarshal(&l, data)
+		*v = l
+	default:
+		panic(newUnmarshalError(pnType, v))
+	}
+}
+
+// get into map pointed at by v
+func getMap(data *C.pn_data_t, v interface{}) {
+	mapValue := reflect.ValueOf(v).Elem()
+	mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
+	switch pnType := C.pn_data_type(data); pnType {
+	case C.PN_MAP:
+		count := int(C.pn_data_get_map(data))
+		if bool(C.pn_data_enter(data)) {
+			defer C.pn_data_exit(data)
+			for i := 0; i < count/2; i++ {
+				if bool(C.pn_data_next(data)) {
+					key := reflect.New(mapValue.Type().Key())
+					unmarshal(key.Interface(), data)
+					if bool(C.pn_data_next(data)) {
+						val := reflect.New(mapValue.Type().Elem())
+						unmarshal(val.Interface(), data)
+						mapValue.SetMapIndex(key.Elem(), val.Elem())
+					}
+				}
+			}
+		}
+	case C.PN_INVALID: // Leave the map empty
+	default:
+		panic(newUnmarshalError(pnType, v))
+	}
+}
+
+func getList(data *C.pn_data_t, v interface{}) {
+	pnType := C.pn_data_type(data)
+	if pnType != C.PN_LIST {
+		panic(newUnmarshalError(pnType, v))
+	}
+	count := int(C.pn_data_get_list(data))
+	listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
+	if bool(C.pn_data_enter(data)) {
+		for i := 0; i < count; i++ {
+			if bool(C.pn_data_next(data)) {
+				val := reflect.New(listValue.Type().Elem())
+				unmarshal(val.Interface(), data)
+				listValue.Index(i).Set(val.Elem())
+			}
+		}
+		C.pn_data_exit(data)
+	}
+	reflect.ValueOf(v).Elem().Set(listValue)
+}
+
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
+func decode(data *C.pn_data_t, bytes []byte) int {
+	if len(bytes) == 0 {
+		return 0
+	}
+	n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
+	if n == int(C.PN_UNDERFLOW) {
+		C.pn_error_clear(C.pn_data_error(data))
+		return 0
+	} else if n <= 0 {
+		panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
+	}
+	return n
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
new file mode 100644
index 0000000..0d0c662
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+/*
+#include <stdlib.h>
+#include <string.h>
+#include <proton/url.h>
+
+// Helper function for setting URL fields.
+typedef void (*setter_fn)(pn_url_t* url, const char* value);
+inline void	set(pn_url_t *url, setter_fn s, const char* value) {
+  s(url, value);
+}
+*/
+import "C"
+
+import (
+	"net"
+	"net/url"
+	"qpid.apache.org/internal"
+	"unsafe"
+)
+
+const (
+	amqp  string = "amqp"
+	amqps        = "amqps"
+)
+
+// ParseUrl parses an AMQP URL string and returns a net/url.Url.
+//
+// It is more forgiving than net/url.Parse and allows most of the parts of the
+// URL to be missing, assuming AMQP defaults.
+//
+func ParseURL(s string) (u *url.URL, err error) {
+	cstr := C.CString(s)
+	defer C.free(unsafe.Pointer(cstr))
+	pnUrl := C.pn_url_parse(cstr)
+	if pnUrl == nil {
+		return nil, internal.Errorf("bad URL %#v", s)
+	}
+	defer C.pn_url_free(pnUrl)
+
+	scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
+	username := C.GoString(C.pn_url_get_username(pnUrl))
+	password := C.GoString(C.pn_url_get_password(pnUrl))
+	host := C.GoString(C.pn_url_get_host(pnUrl))
+	port := C.GoString(C.pn_url_get_port(pnUrl))
+	path := C.GoString(C.pn_url_get_path(pnUrl))
+
+	if err != nil {
+		return nil, internal.Errorf("bad URL %#v: %s", s, err)
+	}
+	if scheme == "" {
+		scheme = amqp
+	}
+	if port == "" {
+		if scheme == amqps {
+			port = amqps
+		} else {
+			port = amqp
+		}
+	}
+	var user *url.Userinfo
+	if password != "" {
+		user = url.UserPassword(username, password)
+	} else if username != "" {
+		user = url.User(username)
+	}
+
+	u = &url.URL{
+		Scheme: scheme,
+		User:   user,
+		Host:   net.JoinHostPort(host, port),
+		Path:   path,
+	}
+
+	return u, nil
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
new file mode 100644
index 0000000..f80f1c4
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+	"fmt"
+)
+
+func ExampleParseURL() {
+	for _, s := range []string{
+		"amqp://username:password@host:1234/path",
+		"host:1234",
+		"host",
+		":1234",
+		"host/path",
+		"amqps://host",
+		"",
+	} {
+		u, err := ParseURL(s)
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			fmt.Println(u)
+		}
+	}
+	// Output:
+	// amqp://username:password@host:1234/path
+	// amqp://host:1234
+	// amqp://host:amqp
+	// amqp://:1234
+	// amqp://host:amqp/path
+	// amqps://host:amqps
+	// proton: bad URL ""
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
new file mode 100644
index 0000000..7c8024b
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -0,0 +1,192 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+	"net"
+	"qpid.apache.org/internal"
+	"qpid.apache.org/proton"
+	"qpid.apache.org/amqp"
+	"sync"
+)
+
+// Connection is an AMQP connection, created by a Container.
+type Connection interface {
+	Endpoint
+
+	// Server puts the connection in server mode, must be called before Open().
+	//
+	// A server connection will do protocol negotiation to accept a incoming AMQP
+	// connection. Normally you would call this for a connection created by
+	// net.Listener.Accept()
+	//
+	Server()
+
+	// Listen arranges for endpoints opened by the remote peer to be passed to accept().
+	// Listen() must be called before Connection.Open().
+	//
+	// accept() is passed a Session, Sender or Receiver.  It can examine endpoint
+	// properties and set some properties (e.g. Receiver.SetCapacity()) Returning nil
+	// will accept the endpoint, returning an error will reject it.
+	//
+	// accept() must not block or use the endpoint other than to examine or set
+	// properties.  It can start a goroutine to process the Endpoint, or pass the
+	// Endpoint to another goroutine via a channel, and that goroutine can use
+	// the endpoint as normal.
+	//
+	// The default Listen function is RejectEndpoint which rejects all endpoints.
+	// You can call Listen(AcceptEndpoint) to accept all endpoints
+	Listen(accept func(Endpoint) error)
+
+	// Open the connection, ready for use.
+	Open() error
+
+	// Sender opens a new sender on the DefaultSession.
+	//
+	// v can be a string, which is used as the Target address, or a SenderSettings
+	// struct containing more details settings.
+	Sender(setting ...LinkSetting) (Sender, error)
+
+	// Receiver opens a new Receiver on the DefaultSession().
+	//
+	// v can be a string, which is used as the
+	// Source address, or a ReceiverSettings struct containing more details
+	// settings.
+	Receiver(setting ...LinkSetting) (Receiver, error)
+
+	// DefaultSession() returns a default session for the connection. It is opened
+	// on the first call to DefaultSession and returned on subsequent calls.
+	DefaultSession() (Session, error)
+
+	// Session opens a new session.
+	Session() (Session, error)
+
+	// Container for the connection.
+	Container() Container
+
+	// Disconnect the connection abruptly with an error.
+	Disconnect(error)
+}
+
+// AcceptEndpoint pass to Connection.Listen to accept all endpoints
+func AcceptEndpoint(Endpoint) error { return nil }
+
+// RejectEndpoint pass to Connection.Listen to reject all endpoints
+func RejectEndpoint(Endpoint) error {
+	return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
+}
+
+type connection struct {
+	endpoint
+	listenOnce, defaultSessionOnce, closeOnce sync.Once
+
+	// Set before Open()
+	container *container
+	conn      net.Conn
+	accept    func(Endpoint) error
+
+	// Set by Open()
+	handler     *handler
+	engine      *proton.Engine
+	err         internal.ErrorHolder
+	eConnection proton.Connection
+
+	defaultSession Session
+}
+
+func newConnection(conn net.Conn, cont *container) (*connection, error) {
+	c := &connection{container: cont, conn: conn, accept: RejectEndpoint}
+	c.handler = newHandler(c)
+	var err error
+	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
+	if err != nil {
+		return nil, err
+	}
+	c.str = c.engine.String()
+	c.eConnection = c.engine.Connection()
+	return c, nil
+}
+
+func (c *connection) Server() { c.engine.Server() }
+
+func (c *connection) Listen(accept func(Endpoint) error) { c.accept = accept }
+
+func (c *connection) Open() error {
+	go c.engine.Run()
+	return nil
+}
+
+func (c *connection) Close(err error) { c.engine.Close(err) }
+
+func (c *connection) Disconnect(err error) { c.engine.Disconnect(err) }
+
+// FIXME aconway 2015-10-07:
+func (c *connection) closed(err error) {
+	// Call from another goroutine to initiate close without deadlock.
+	go c.Close(err)
+}
+
+func (c *connection) Session() (Session, error) {
+	var s Session
+	err := c.engine.InjectWait(func() error {
+		eSession, err := c.engine.Connection().Session()
+		if err == nil {
+			eSession.Open()
+			if err == nil {
+				s = newSession(c, eSession)
+			}
+		}
+		return err
+	})
+	return s, err
+}
+
+func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+	c.defaultSessionOnce.Do(func() {
+		c.defaultSession, err = c.Session()
+	})
+	if err == nil {
+		err = c.Error()
+	}
+	return c.defaultSession, err
+}
+
+func (c *connection) Sender(setting ...LinkSetting) (Sender, error) {
+	if s, err := c.DefaultSession(); err == nil {
+		return s.Sender(setting...)
+	} else {
+		return nil, err
+	}
+}
+
+func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
+	if s, err := c.DefaultSession(); err == nil {
+		return s.Receiver(setting...)
+	} else {
+		return nil, err
+	}
+}
+
+func (c *connection) Connection() Connection { return c }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
new file mode 100644
index 0000000..06a9a14
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"net"
+	"qpid.apache.org/internal"
+)
+
+// Container is an AMQP container, it represents a single AMQP "application".It
+// provides functions to create new Connections to remote containers.
+//
+// Create with NewContainer()
+//
+type Container interface {
+	// Id is a unique identifier for the container in your distributed application.
+	Id() string
+
+	// Create a new AMQP Connection over the supplied net.Conn connection.
+	//
+	// You must call Connection.Open() on the returned Connection, after
+	// setting any Connection properties you need to set. Note the net.Conn
+	// can be an outgoing connection (e.g. made with net.Dial) or an incoming
+	// connection (e.g. made with net.Listener.Accept())
+	Connection(conn net.Conn) (Connection, error)
+}
+
+type container struct {
+	id        string
+	linkNames internal.IdCounter
+}
+
+// NewContainer creates a new container. The id must be unique in your
+// distributed application, all connections created by the container
+// will have this container-id.
+//
+// If id == "" a random UUID will be generated for the id.
+func NewContainer(id string) Container {
+	if id == "" {
+		id = internal.UUID4().String()
+	}
+	cont := &container{id: id}
+	return cont
+}
+
+func (cont *container) Id() string { return cont.id }
+
+func (cont *container) nextLinkName() string {
+	return cont.id + "@" + cont.linkNames.Next()
+}
+
+func (cont *container) Connection(conn net.Conn) (Connection, error) {
+	return newConnection(conn, cont)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
new file mode 100644
index 0000000..eaa6e7a
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
+You can write clients and servers using this library.
+
+Start by creating a Container with NewContainer. A Container represents a client
+or server application that can contain many incoming or outgoing connections.
+
+Create connections with the standard Go 'net' package using net.Dial or
+net.Listen. Create an AMQP connection over a net.Conn with
+Container.Connection() and open it with Connection.Open().
+
+AMQP sends messages over "links". Each link has a Sender end and a Receiver
+end. Connection.Sender() and Connection.Receiver() allow you to create links to
+Send() and Receive() messages.
+
+You can create an AMQP server connection by calling Connection.Server() and
+Connection.Listen() before calling Connection.Open(). A server connection can
+negotiate protocol security details and can accept incoming links opened from
+the remote end of the connection
+*/
+package electron
+
+//#cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// Just for package comment
+
+/* DEVELOPER NOTES
+
+There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
+and each with a 'handler'. Most state for a connection is maintained on the handler, and
+only accessed in the event-loop goroutine, so no locks are required.
+
+The handler sets up channels as needed to get or send data from user goroutines
+using electron types like Sender or Receiver. We also use Engine.Inject to inject
+actions into the event loop from user goroutines.
+
+*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
new file mode 100644
index 0000000..745fd04
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"io"
+	"qpid.apache.org/internal"
+	"qpid.apache.org/proton"
+)
+
+// Closed is an alias for io.EOF. It is returned as an error when an endpoint
+// was closed cleanly.
+var Closed = io.EOF
+
+// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
+//
+// Endpoints can be created locally or by the remote peer. You must Open() an
+// endpoint before you can use it. Some endpoints have additional Set*() methods
+// that must be called before Open() to take effect, see Connection, Session,
+// Link, Sender and Receiver for details.
+//
+type Endpoint interface {
+	// Close an endpoint and signal an error to the remote end if error != nil.
+	Close(error)
+
+	// String is a human readable identifier, useful for debugging and logging.
+	String() string
+
+	// Error returns nil if the endpoint is open, otherwise returns an error.
+	// Error() == Closed means the endpoint was closed without error.
+	Error() error
+
+	// Connection containing the endpoint
+	Connection() Connection
+}
+
+type endpoint struct {
+	err internal.ErrorHolder
+	str string // Must be set by the value that embeds endpoint.
+}
+
+func (e *endpoint) String() string { return e.str }
+func (e *endpoint) Error() error   { return e.err.Get() }
+
+// Call in proton goroutine to close an endpoint locally
+// handler will complete the close when remote end closes.
+func localClose(ep proton.Endpoint, err error) {
+	if ep.State().LocalActive() {
+		proton.CloseError(ep, err)
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
new file mode 100644
index 0000000..f6065a1
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -0,0 +1,176 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// FIXME aconway 2015-10-07: move to amqp or split into sub packages?
+// proton.core
+// proton.msg
+
+package electron
+
+import (
+	"qpid.apache.org/proton"
+	"qpid.apache.org/amqp"
+)
+
+// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
+
+type handler struct {
+	delegator    *proton.MessagingDelegator
+	connection   *connection
+	links        map[proton.Link]Link
+	sentMessages map[proton.Delivery]*sentMessage
+	sessions     map[proton.Session]*session
+}
+
+func newHandler(c *connection) *handler {
+	h := &handler{
+		connection:   c,
+		links:        make(map[proton.Link]Link),
+		sentMessages: make(map[proton.Delivery]*sentMessage),
+		sessions:     make(map[proton.Session]*session),
+	}
+	h.delegator = proton.NewMessagingDelegator(h)
+	// Disable auto features of MessagingDelegator, we do these ourselves.
+	h.delegator.Prefetch = 0
+	h.delegator.AutoAccept = false
+	h.delegator.AutoSettle = false
+	h.delegator.AutoOpen = false
+	return h
+}
+
+func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
+	switch t {
+
+	case proton.MMessage:
+		if r, ok := h.links[e.Link()].(*receiver); ok {
+			r.message(e.Delivery())
+		} else {
+			proton.CloseError(
+				h.connection.eConnection,
+				amqp.Errorf(amqp.InternalError, "no receiver for link %s", e.Link()))
+		}
+
+	case proton.MSettled:
+		if sm := h.sentMessages[e.Delivery()]; sm != nil {
+			sm.settled(nil)
+		}
+
+	case proton.MSendable:
+		h.trySend(e.Link())
+
+	case proton.MSessionOpening:
+		if e.Session().State().LocalUninit() { // Remotely opened
+			s := newSession(h.connection, e.Session())
+			if err := h.connection.accept(s); err != nil {
+				proton.CloseError(e.Session(), (err))
+			} else {
+				h.sessions[e.Session()] = s
+				if s.capacity > 0 {
+					e.Session().SetIncomingCapacity(s.capacity)
+				}
+				e.Session().Open()
+			}
+		}
+
+	case proton.MSessionClosed:
+		err := proton.EndpointError(e.Session())
+		for l, _ := range h.links {
+			if l.Session() == e.Session() {
+				h.linkClosed(l, err)
+			}
+		}
+		delete(h.sessions, e.Session())
+
+	case proton.MLinkOpening:
+		l := e.Link()
+		if l.State().LocalUninit() { // Remotely opened
+			ss := h.sessions[l.Session()]
+			if ss == nil {
+				proton.CloseError(
+					l, amqp.Errorf(amqp.InternalError, ("no session for link")))
+				break
+			}
+			var link Link
+			if l.IsReceiver() {
+				r := &receiver{link: incomingLink(ss, l)}
+				link = r
+				r.inAccept = true
+				defer func() { r.inAccept = false }()
+			} else {
+				link = &sender{link: incomingLink(ss, l)}
+			}
+			if err := h.connection.accept(link); err != nil {
+				proton.CloseError(l, err)
+				break
+			}
+			link.open()
+		}
+
+	case proton.MLinkOpened:
+		l := e.Link()
+		if l.IsSender() {
+			h.trySend(l)
+		}
+
+	case proton.MLinkClosing:
+		e.Link().Close()
+
+	case proton.MLinkClosed:
+		h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
+
+	case proton.MDisconnected:
+		err := h.connection.Error()
+		for l, _ := range h.links {
+			h.linkClosed(l, err)
+		}
+		for _, s := range h.sessions {
+			s.closed(err)
+		}
+		for _, sm := range h.sentMessages {
+			sm.settled(err)
+		}
+	}
+}
+
+func (h *handler) linkClosed(l proton.Link, err error) {
+	if link := h.links[l]; link != nil {
+		link.closed(err)
+		delete(h.links, l)
+	}
+}
+
+func (h *handler) addLink(rl proton.Link, ll Link) {
+	h.links[rl] = ll
+}
+
+func (h *handler) trySend(l proton.Link) {
+	if l.Credit() <= 0 {
+		return
+	}
+	if s, ok := h.links[l].(*sender); ok {
+		for ch := s.popBlocked(); l.Credit() > 0 && ch != nil; ch = s.popBlocked() {
+			if snd, ok := <-ch; ok {
+				s.doSend(snd)
+			}
+		}
+	} else {
+		h.connection.closed(
+			amqp.Errorf(amqp.InternalError, "cannot find sender for link %s", l))
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
new file mode 100644
index 0000000..abc8431
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -0,0 +1,242 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"qpid.apache.org/internal"
+	"qpid.apache.org/proton"
+)
+
+// Link is the common interface for AMQP links. Sender and Receiver provide
+// more methods for the sending or receiving end of a link respectively.
+type Link interface {
+	Endpoint
+
+	// Source address that messages are coming from.
+	Source() string
+
+	// Target address that messages are going to.
+	Target() string
+
+	// Name is a unique name for the link among links between the same
+	// containers in the same direction. By default generated automatically.
+	LinkName() string
+
+	// IsSender is true if this is the sending end of the link.
+	IsSender() bool
+
+	// IsReceiver is true if this is the receiving end of the link.
+	IsReceiver() bool
+
+	// SndSettle defines when the sending end of the link settles message delivery.
+	SndSettle() SndSettleMode
+
+	// RcvSettle defines when the sending end of the link settles message delivery.
+	RcvSettle() RcvSettleMode
+
+	// Session containing the Link
+	Session() Session
+
+	// Called in event loop on closed event.
+	closed(err error)
+	// Called to open a link (local or accepted incoming link)
+	open()
+}
+
+// LinkSetting is a function that sets a link property. Passed when creating
+// a Sender or Receiver, do not use at any other time.
+type LinkSetting func(Link)
+
+// Source sets address that messages are coming from.
+func Source(s string) LinkSetting { return func(l Link) { l.(*link).source = s } }
+
+// Target sets address that messages are going to.
+func Target(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+
+// LinkName sets the link name.
+func LinkName(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+
+// SndSettle sets the send settle mode
+func SndSettle(m SndSettleMode) LinkSetting { return func(l Link) { l.(*link).sndSettle = m } }
+
+// RcvSettle sets the send settle mode
+func RcvSettle(m RcvSettleMode) LinkSetting { return func(l Link) { l.(*link).rcvSettle = m } }
+
+// SndSettleMode defines when the sending end of the link settles message delivery.
+type SndSettleMode proton.SndSettleMode
+
+// Capacity sets the link capacity
+func Capacity(n int) LinkSetting { return func(l Link) { l.(*link).capacity = n } }
+
+// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
+func Prefetch(p bool) LinkSetting { return func(l Link) { l.(*link).prefetch = p } }
+
+// AtMostOnce sets "fire and forget" mode, messages are sent but no
+// acknowledgment is received, messages can be lost if there is a network
+// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
+func AtMostOnce() LinkSetting {
+	return func(l Link) {
+		SndSettle(SndSettled)(l)
+		RcvSettle(RcvFirst)(l)
+	}
+}
+
+// AtLeastOnce requests acknowledgment for every message, acknowledgment
+// indicates the message was definitely received. In the event of a
+// failure, unacknowledged messages can be re-sent but there is a chance
+// that the message will be received twice in this case.
+// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
+func AtLeastOnce() LinkSetting {
+	return func(l Link) {
+		SndSettle(SndUnsettled)(l)
+		RcvSettle(RcvFirst)(l)
+	}
+}
+
+const (
+	// Messages are sent unsettled
+	SndUnsettled = SndSettleMode(proton.SndUnsettled)
+	// Messages are sent already settled
+	SndSettled = SndSettleMode(proton.SndSettled)
+	// Sender can send either unsettled or settled messages.
+	SendMixed = SndSettleMode(proton.SndMixed)
+)
+
+// RcvSettleMode defines when the receiving end of the link settles message delivery.
+type RcvSettleMode proton.RcvSettleMode
+
+const (
+	// Receiver settles first.
+	RcvFirst = RcvSettleMode(proton.RcvFirst)
+	// Receiver waits for sender to settle before settling.
+	RcvSecond = RcvSettleMode(proton.RcvSecond)
+)
+
+type link struct {
+	endpoint
+
+	// Link settings.
+	source    string
+	target    string
+	linkName  string
+	isSender  bool
+	sndSettle SndSettleMode
+	rcvSettle RcvSettleMode
+	capacity  int
+	prefetch  bool
+
+	session *session
+	eLink   proton.Link
+	done    chan struct{} // Closed when link is closed
+
+	inAccept bool
+}
+
+func (l *link) Source() string           { return l.source }
+func (l *link) Target() string           { return l.target }
+func (l *link) LinkName() string         { return l.linkName }
+func (l *link) IsSender() bool           { return l.isSender }
+func (l *link) IsReceiver() bool         { return !l.isSender }
+func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *link) Session() Session         { return l.session }
+func (l *link) Connection() Connection   { return l.session.Connection() }
+
+func (l *link) engine() *proton.Engine { return l.session.connection.engine }
+func (l *link) handler() *handler      { return l.session.connection.handler }
+
+// Set up link fields and open the proton.Link
+func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error) {
+	l := &link{
+		session:  sn,
+		isSender: isSender,
+		capacity: 1,
+		prefetch: false,
+		done:     make(chan struct{}),
+	}
+	for _, set := range setting {
+		set(l)
+	}
+	if l.linkName == "" {
+		l.linkName = l.session.connection.container.nextLinkName()
+	}
+	if l.IsSender() {
+		l.eLink = l.session.eSession.Sender(l.linkName)
+	} else {
+		l.eLink = l.session.eSession.Receiver(l.linkName)
+	}
+	if l.eLink.IsNil() {
+		l.err.Set(internal.Errorf("cannot create link %s", l))
+		return nil, l.err.Get()
+	}
+	l.eLink.Source().SetAddress(l.source)
+	l.eLink.Target().SetAddress(l.target)
+	l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
+	l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
+	l.str = l.eLink.String()
+	l.eLink.Open()
+	return l, nil
+}
+
+// Set up a link from an incoming proton.Link.
+func incomingLink(sn *session, eLink proton.Link) link {
+	l := link{
+		session:   sn,
+		isSender:  eLink.IsSender(),
+		eLink:     eLink,
+		source:    eLink.RemoteSource().Address(),
+		target:    eLink.RemoteTarget().Address(),
+		linkName:  eLink.Name(),
+		sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
+		rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
+		capacity:  1,
+		prefetch:  false,
+		done:      make(chan struct{}),
+	}
+	l.str = eLink.String()
+	return l
+}
+
+// Called in proton goroutine on closed or disconnected
+func (l *link) closed(err error) {
+	l.err.Set(err)
+	l.err.Set(Closed) // If no error set, mark as closed.
+	close(l.done)
+}
+
+// Not part of Link interface but use by Sender and Receiver.
+func (l *link) Credit() (credit int, err error) {
+	err = l.engine().InjectWait(func() error {
+		credit = l.eLink.Credit()
+		return nil
+	})
+	return
+}
+
+// Not part of Link interface but use by Sender and Receiver.
+func (l *link) Capacity() int { return l.capacity }
+
+func (l *link) Close(err error) {
+	l.engine().Inject(func() { localClose(l.eLink, err) })
+}
+
+func (l *link) open() {
+	l.eLink.Open()
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
new file mode 100644
index 0000000..474bad7
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -0,0 +1,412 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"fmt"
+	"net"
+	"path"
+	"qpid.apache.org/amqp"
+	"runtime"
+	"testing"
+	"time"
+)
+
+func fatalIf(t *testing.T, err error) {
+	if err != nil { // FIXME aconway 2015-10-07:
+		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+		if ok {
+			_, file = path.Split(file)
+		}
+		t.Fatalf("(from %s:%d) %v", file, line, err)
+	}
+}
+
+// Start a server, return listening addr and channel for incoming Connection.
+func newServer(t *testing.T, cont Container, accept func(Endpoint) error) (net.Addr, <-chan Connection) {
+	listener, err := net.Listen("tcp", "")
+	fatalIf(t, err)
+	addr := listener.Addr()
+	ch := make(chan Connection)
+	go func() {
+		conn, err := listener.Accept()
+		c, err := cont.Connection(conn)
+		fatalIf(t, err)
+		c.Server()
+		c.Listen(accept)
+		fatalIf(t, c.Open())
+		ch <- c
+	}()
+	return addr, ch
+}
+
+// Return open an client connection and session, return the session.
+func newClient(t *testing.T, cont Container, addr net.Addr) Session {
+	conn, err := net.Dial(addr.Network(), addr.String())
+	fatalIf(t, err)
+	c, err := cont.Connection(conn)
+	fatalIf(t, err)
+	c.Open()
+	sn, err := c.Session()
+	fatalIf(t, err)
+	return sn
+}
+
+// Return client and server ends of the same connection.
+func newClientServer(t *testing.T, accept func(Endpoint) error) (client Session, server Connection) {
+	addr, ch := newServer(t, NewContainer(""), accept)
+	client = newClient(t, NewContainer(""), addr)
+	return client, <-ch
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+	client.Connection().Close(nil)
+	server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
+	timeout := time.Second * 2
+	nLinks := 3
+	nMessages := 3
+
+	rchan := make(chan Receiver, nLinks)
+	client, server := newClientServer(t, func(ep Endpoint) error {
+		if r, ok := ep.(Receiver); ok {
+			r.SetCapacity(1, false)
+			rchan <- r
+		}
+		return nil
+	})
+
+	defer func() {
+		closeClientServer(client, server)
+	}()
+
+	s := make([]Sender, nLinks)
+	for i := 0; i < nLinks; i++ {
+		var err error
+		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	r := make([]Receiver, nLinks)
+	for i := 0; i < nLinks; i++ {
+		r[i] = <-rchan
+	}
+
+	for i := 0; i < nLinks; i++ {
+		for j := 0; j < nMessages; j++ {
+			var sm SentMessage
+			// Client send
+			go func() {
+				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
+				var err error
+				sm, err = s[i].Send(m)
+				if err != nil {
+					t.Fatal(err)
+				}
+			}()
+
+			// Server recieve
+			rm, err := r[i].Receive()
+			if err != nil {
+				t.Fatal(err)
+			}
+			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
+				t.Errorf("%#v != %#v", want, got)
+			}
+
+			// Should not be acknowledged on client yet
+			if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
+				t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
+			}
+			// Server ack
+			if err := rm.Acknowledge(Rejected); err != nil {
+				t.Error(err)
+			}
+			// Client get ack.
+			if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
+				t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
+			}
+		}
+	}
+}
+
+func TestClientReceiver(t *testing.T) {
+	nMessages := 3
+	client, server := newClientServer(t, func(ep Endpoint) error {
+		if s, ok := ep.(Sender); ok {
+			go func() {
+				for i := int32(0); i < int32(nMessages); i++ {
+					sm, err := s.Send(amqp.NewMessageWith(i))
+					if err != nil {
+						t.Error(err)
+						return
+					} else {
+						sm.Disposition() // Sync send.
+					}
+				}
+				s.Close(nil)
+			}()
+		}
+		return nil
+	})
+
+	r, err := client.Receiver(Source("foo"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	for i := int32(0); i < int32(nMessages); i++ {
+		rm, err := r.Receive()
+		if err != nil {
+			if err != Closed {
+				t.Error(err)
+			}
+			break
+		}
+		if err := rm.Accept(); err != nil {
+			t.Error(err)
+		}
+		if b, ok := rm.Message.Body().(int32); !ok || b != i {
+			t.Errorf("want %v, true got %v, %v", i, b, ok)
+		}
+	}
+	server.Close(nil)
+	client.Connection().Close(nil)
+}
+
+// Test timeout versions of waiting functions.
+func TestTimeouts(t *testing.T) {
+	var err error
+	rchan := make(chan Receiver, 1)
+	client, server := newClientServer(t, func(ep Endpoint) error {
+		if r, ok := ep.(Receiver); ok {
+			r.SetCapacity(1, false) // Issue credit only on receive
+			rchan <- r
+		}
+		return nil
+	})
+	defer func() { closeClientServer(client, server) }()
+
+	// Open client sender
+	snd, err := client.Sender(Target("test"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	rcv := <-rchan
+
+	// Test send with timeout
+	short := time.Millisecond
+	long := time.Second
+	m := amqp.NewMessage()
+	if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// Test receive with timeout
+	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// Test receive with timeout
+	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// There is now a credit on the link due to receive
+	sm, err := snd.SendTimeout(m, long)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Disposition should timeout
+	if _, err = sm.DispositionTimeout(long); err != Timeout {
+		t.Error("want Timeout got", err)
+	}
+	if _, err = sm.DispositionTimeout(short); err != Timeout {
+		t.Error("want Timeout got", err)
+	}
+	// Receive and accept
+	rm, err := rcv.ReceiveTimeout(long)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rm.Accept()
+	// Sender get ack
+	d, err := sm.DispositionTimeout(long)
+	if err != nil || d != Accepted {
+		t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
+	}
+}
+
+// clientServer that returns sender/receiver pairs at opposite ends of link.
+type pairs struct {
+	t      *testing.T
+	client Session
+	server Connection
+	rchan  chan Receiver
+	schan  chan Sender
+}
+
+func newPairs(t *testing.T) *pairs {
+	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
+	p.client, p.server = newClientServer(t, func(ep Endpoint) error {
+		switch ep := ep.(type) {
+		case Receiver:
+			ep.SetCapacity(1, false)
+			p.rchan <- ep
+		case Sender:
+			p.schan <- ep
+		}
+		return nil
+	})
+	return p
+}
+
+func (p *pairs) close() {
+	closeClientServer(p.client, p.server)
+}
+
+func (p *pairs) senderReceiver() (Sender, Receiver) {
+	snd, err := p.client.Sender()
+	fatalIf(p.t, err)
+	rcv := <-p.rchan
+	return snd, rcv
+}
+
+func (p *pairs) receiverSender() (Receiver, Sender) {
+	rcv, err := p.client.Receiver()
+	fatalIf(p.t, err)
+	snd := <-p.schan
+	return rcv, snd
+}
+
+type result struct {
+	label string
+	err   error
+}
+
+func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
+
+func doSend(snd Sender, results chan result) {
+	_, err := snd.Send(amqp.NewMessage())
+	results <- result{"send", err}
+}
+
+func doReceive(rcv Receiver, results chan result) {
+	_, err := rcv.Receive()
+	results <- result{"receive", err}
+}
+
+func doDisposition(sm SentMessage, results chan result) {
+	_, err := sm.Disposition()
+	results <- result{"disposition", err}
+}
+
+// Test that closing Links interrupts blocked link functions.
+func TestLinkCloseInterrupt(t *testing.T) {
+	want := amqp.Errorf("x", "all bad")
+	pairs := newPairs(t)
+	results := make(chan result) // Collect expected errors
+
+	// Sender.Close() interrupts Send()
+	snd, rcv := pairs.senderReceiver()
+	go doSend(snd, results)
+	snd.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+
+	// Remote Receiver.Close() interrupts Send()
+	snd, rcv = pairs.senderReceiver()
+	go doSend(snd, results)
+	rcv.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+
+	// Receiver.Close() interrupts Receive()
+	snd, rcv = pairs.senderReceiver()
+	go doReceive(rcv, results)
+	rcv.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+
+	// Remote Sender.Close() interrupts Receive()
+	snd, rcv = pairs.senderReceiver()
+	go doReceive(rcv, results)
+	snd.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+}
+
+// Test closing the server end of a connection.
+func TestConnectionCloseInterrupt1(t *testing.T) {
+	want := amqp.Errorf("x", "bad")
+	pairs := newPairs(t)
+	results := make(chan result) // Collect expected errors
+
+	// Connection.Close() interrupts Send, Receive, Disposition.
+	snd, rcv := pairs.senderReceiver()
+	go doReceive(rcv, results)
+	sm, err := snd.Send(amqp.NewMessage())
+	fatalIf(t, err)
+	go doDisposition(sm, results)
+	snd, rcv = pairs.senderReceiver()
+	go doSend(snd, results)
+	rcv, snd = pairs.receiverSender()
+	go doReceive(rcv, results)
+	pairs.server.Close(want)
+	for i := 0; i < 3; i++ {
+		if r := <-results; want != r.err {
+			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF.
+			t.Logf("want %v got %v", want, r)
+		}
+	}
+}
+
+// Test closing the client end of the connection.
+func TestConnectionCloseInterrupt2(t *testing.T) {
+	want := amqp.Errorf("x", "bad")
+	pairs := newPairs(t)
+	results := make(chan result) // Collect expected errors
+
+	// Connection.Close() interrupts Send, Receive, Disposition.
+	snd, rcv := pairs.senderReceiver()
+	go doReceive(rcv, results)
+	sm, err := snd.Send(amqp.NewMessage())
+	fatalIf(t, err)
+	go doDisposition(sm, results)
+	snd, rcv = pairs.senderReceiver()
+	go doSend(snd, results)
+	rcv, snd = pairs.receiverSender()
+	go doReceive(rcv, results)
+	pairs.client.Close(want)
+	for i := 0; i < 3; i++ {
+		if r := <-results; want != r.err {
+			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil.
+			t.Logf("want %v got %v", want, r)
+		}
+	}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message