qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [06/50] [abbrv] qpid-proton git commit: PROTON-1011: Go example of event driven broker. Package renaming and some new features.
Date Fri, 23 Oct 2015 14:36:08 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 63dc452..5dc8727 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -34,7 +34,7 @@ import (
 	"fmt"
 	"io"
 	"net"
-	"qpid.apache.org/proton/internal"
+	"qpid.apache.org/internal"
 	"sync"
 	"unsafe"
 )
@@ -107,7 +107,7 @@ func (b *bufferChan) buffer() []byte {
 //
 type Engine struct {
 	// Error is set on exit from Run() if there was an error.
-	err    internal.FirstError
+	err    internal.ErrorHolder
 	inject chan func()
 
 	conn       net.Conn
@@ -127,14 +127,14 @@ const bufferSize = 4096
 var engines = internal.MakeSafeMap()
 
 // NewEngine initializes a engine with a connection and handlers. To start it running:
-//    p := NewEngine(...)
-//    go run p.Run()
+//    eng := NewEngine(...)
+//    go run eng.Run()
 // The goroutine will exit when the engine is closed or disconnected.
 // You can check for errors on Engine.Error.
 //
 func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 	// Save the connection ID for Connection.String()
-	p := &Engine{
+	eng := &Engine{
 		inject:     make(chan func()),
 		conn:       conn,
 		transport:  Transport{C.pn_transport()},
@@ -145,7 +145,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error)
{
 		write:      newBufferChan(bufferSize),
 		running:    make(chan struct{}),
 	}
-	if p.transport.IsNil() || p.connection.IsNil() || p.collector == nil {
+	if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
 		return nil, internal.Errorf("failed to allocate engine")
 	}
 
@@ -154,27 +154,27 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error)
{
 	// to run connection.
 
 	// Unique container-id by default.
-	p.connection.SetContainer(internal.UUID4().String())
-	pnErr := p.transport.Bind(p.connection)
+	eng.connection.SetContainer(internal.UUID4().String())
+	pnErr := eng.transport.Bind(eng.connection)
 	if pnErr != 0 {
 		return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr))
 	}
-	C.pn_connection_collect(p.connection.pn, p.collector)
-	p.connection.Open()
-	connectionContexts.Put(p.connection, connectionContext{p, p.String()})
-	return p, nil
+	C.pn_connection_collect(eng.connection.pn, eng.collector)
+	eng.connection.Open()
+	connectionContexts.Put(eng.connection, connectionContext{eng, eng.String()})
+	return eng, nil
 }
 
-func (p *Engine) String() string {
-	return fmt.Sprintf("%s-%s", p.conn.LocalAddr(), p.conn.RemoteAddr())
+func (eng *Engine) String() string {
+	return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 }
 
-func (p *Engine) Id() string {
-	return fmt.Sprintf("%p", &p)
+func (eng *Engine) Id() string {
+	return fmt.Sprintf("%eng", &eng)
 }
 
-func (p *Engine) Error() error {
-	return p.err.Get()
+func (eng *Engine) Error() error {
+	return eng.err.Get()
 }
 
 // Inject a function into the Engine's event loop.
@@ -189,27 +189,27 @@ func (p *Engine) Error() error {
 //
 // Returns a non-nil error if the engine is closed before the function could be
 // injected.
-func (p *Engine) Inject(f func()) error {
+func (eng *Engine) Inject(f func()) error {
 	select {
-	case p.inject <- f:
+	case eng.inject <- f:
 		return nil
-	case <-p.running:
-		return p.Error()
+	case <-eng.running:
+		return eng.Error()
 	}
 }
 
 // InjectWait is like Inject but does not return till f() has completed or the
 // engine is closed, and returns an error value from f()
-func (p *Engine) InjectWait(f func() error) error {
+func (eng *Engine) InjectWait(f func() error) error {
 	done := make(chan error)
 	defer close(done)
-	err := p.Inject(func() { done <- f() })
+	err := eng.Inject(func() { done <- f() })
 	if err != nil {
 		return err
 	}
 	select {
-	case <-p.running:
-		return p.Error()
+	case <-eng.running:
+		return eng.Error()
 	case err := <-done:
 		return err
 	}
@@ -219,119 +219,131 @@ func (p *Engine) InjectWait(f func() error) error {
 // the incoming connnection such as use of SASL and SSL.
 // Must be called before Run()
 //
-func (p *Engine) Server() { p.transport.SetServer() }
+func (eng *Engine) Server() { eng.transport.SetServer() }
 
 // Close the engine's connection, returns when the engine has exited.
-func (p *Engine) Close(err error) {
-	p.Inject(func() {
-		if err != nil {
-			p.connection.Condition().SetError(err)
-		}
-		p.connection.Close()
+func (eng *Engine) Close(err error) {
+	eng.Inject(func() {
+		CloseError(eng.connection, err)
 	})
-	<-p.running
+	<-eng.running
 }
 
 // Disconnect the engine's connection without and AMQP close, returns when the engine has
exited.
-func (p *Engine) Disconnect(err error) {
+func (eng *Engine) Disconnect(err error) {
 	if err != nil {
-		p.err.Set(err)
+		eng.err.Set(err)
 	}
-	p.conn.Close()
-	<-p.running
+	eng.conn.Close()
+	<-eng.running
 }
 
-// Run the engine. Normally called in a goroutine as: go engine.Run()
-// Engine.Run() will exit when the engine is closed or disconnected.
-// You can check for errors after exit with Engine.Error().
+// Run the engine. Engine.Run() will exit when the engine is closed or
+// disconnected.  You can check for errors after exit with Engine.Error().
 //
-func (p *Engine) Run() {
-	// Signal errors from the read/write goroutines. Don't block if we don't
-	// read all the errors, we only care about the first.
-	error := make(chan error, 2)
+func (eng *Engine) Run() error {
 	wait := sync.WaitGroup{}
-	wait.Add(2)
+	wait.Add(2) // Read and write goroutines
 
-	go func() { // Read goroutine
+	readErr := make(chan error, 1) // Don't block
+	go func() {                    // Read goroutine
 		defer wait.Done()
 		for {
-			rbuf := p.read.buffer()
-			n, err := p.conn.Read(rbuf)
+			rbuf := eng.read.buffer()
+			n, err := eng.conn.Read(rbuf)
 			if n > 0 {
-				p.read.buffers <- rbuf[:n]
-			} else if err != nil {
-				close(p.read.buffers)
-				error <- err
+				eng.read.buffers <- rbuf[:n]
+			}
+			if err != nil {
+				readErr <- err
+				close(readErr)
+				close(eng.read.buffers)
 				return
 			}
 		}
 	}()
 
-	go func() { // Write goroutine
+	writeErr := make(chan error, 1) // Don't block
+	go func() {                     // Write goroutine
 		defer wait.Done()
 		for {
-			wbuf, ok := <-p.write.buffers
+			wbuf, ok := <-eng.write.buffers
 			if !ok {
 				return
 			}
-			_, err := p.conn.Write(wbuf)
+			_, err := eng.conn.Write(wbuf)
 			if err != nil {
-				error <- err
+				writeErr <- err
+				close(writeErr)
 				return
 			}
 		}
 	}()
 
-	wbuf := p.write.buffer()[:0]
+	wbuf := eng.write.buffer()[:0]
 loop:
 	for {
 		if len(wbuf) == 0 {
-			p.pop(&wbuf)
+			eng.pop(&wbuf)
 		}
 		// Don't set wchan unless there is something to write.
 		var wchan chan []byte
 		if len(wbuf) > 0 {
-			wchan = p.write.buffers
+			wchan = eng.write.buffers
 		}
 
 		select {
-		case buf := <-p.read.buffers: // Read a buffer
-			p.push(buf)
+		case buf, ok := <-eng.read.buffers: // Read a buffer
+			if ok {
+				eng.push(buf)
+			}
 		case wchan <- wbuf: // Write a buffer
-			wbuf = p.write.buffer()[:0]
-		case f := <-p.inject: // Function injected from another goroutine
-			f()
-		case err := <-error: // Network read or write error
-			p.conn.Close() // Make sure both sides are closed
-			p.err.Set(err)
-			p.transport.CloseHead()
-			p.transport.CloseTail()
+			wbuf = eng.write.buffer()[:0]
+		case f, ok := <-eng.inject: // Function injected from another goroutine
+			if ok {
+				f()
+			}
+		case err := <-readErr:
+			eng.netError(err)
+		case err := <-writeErr:
+			eng.netError(err)
 		}
-		p.process()
-		if p.err.Get() != nil {
+		eng.process()
+		if eng.err.Get() != nil {
 			break loop
 		}
 	}
-	close(p.write.buffers)
-	p.conn.Close()
+	close(eng.write.buffers)
+	eng.conn.Close() // Make sure connection is closed
 	wait.Wait()
-	connectionContexts.Delete(p.connection)
-	if !p.connection.IsNil() {
-		p.connection.Free()
+	connectionContexts.Delete(eng.connection)
+	if !eng.connection.IsNil() {
+		eng.connection.Free()
 	}
-	if !p.transport.IsNil() {
-		p.transport.Free()
+	if !eng.transport.IsNil() {
+		eng.transport.Free()
 	}
-	if p.collector != nil {
-		C.pn_collector_free(p.collector)
+	if eng.collector != nil {
+		C.pn_collector_free(eng.collector)
 	}
-	for _, h := range p.handlers {
+	for _, h := range eng.handlers {
 		switch h := h.(type) {
 		case cHandler:
 			C.pn_handler_free(h.pn)
 		}
 	}
-	close(p.running) // Signal goroutines have exited and Error is set.
+	close(eng.running) // Signal goroutines have exited and Error is set.
+	return eng.err.Get()
+}
+
+func (eng *Engine) netError(err error) {
+	if err == nil {
+		err = internal.Errorf("unknown network error")
+	}
+	eng.conn.Close() // Make sure both sides are closed
+	eng.err.Set(err)
+	eng.transport.CloseHead()
+	eng.transport.CloseTail()
 }
 
 func minInt(a, b int) int {
@@ -342,8 +354,8 @@ func minInt(a, b int) int {
 	}
 }
 
-func (p *Engine) pop(buf *[]byte) {
-	pending := int(p.transport.Pending())
+func (eng *Engine) pop(buf *[]byte) {
+	pending := int(eng.transport.Pending())
 	switch {
 	case pending == int(C.PN_EOS):
 		*buf = (*buf)[:]
@@ -356,15 +368,15 @@ func (p *Engine) pop(buf *[]byte) {
 	if size == 0 {
 		return
 	}
-	C.memcpy(unsafe.Pointer(&(*buf)[0]), p.transport.Head(), C.size_t(size))
+	C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size))
 	internal.Assert(size > 0)
-	p.transport.Pop(uint(size))
+	eng.transport.Pop(uint(size))
 }
 
-func (p *Engine) push(buf []byte) {
+func (eng *Engine) push(buf []byte) {
 	buf2 := buf
 	for len(buf2) > 0 {
-		n := p.transport.Push(buf2)
+		n := eng.transport.Push(buf2)
 		if n <= 0 {
 			panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
 		}
@@ -372,30 +384,24 @@ func (p *Engine) push(buf []byte) {
 	}
 }
 
-func (p *Engine) handle(e Event) (more bool) {
-	for _, h := range p.handlers {
+func (eng *Engine) handle(e Event) {
+	for _, h := range eng.handlers {
 		h.HandleEvent(e)
 	}
 	if e.Type() == ETransportClosed {
-		p.err.Set(e.Connection().RemoteCondition().Error())
-		p.err.Set(e.Connection().Transport().Condition().Error())
-		if p.err.Get() == nil {
-			p.err.Set(io.EOF)
+		eng.err.Set(e.Connection().RemoteCondition().Error())
+		eng.err.Set(e.Connection().Transport().Condition().Error())
+		if eng.err.Get() == nil {
+			eng.err.Set(io.EOF)
 		}
-		return false
 	}
-	return true
 }
 
-func (p *Engine) process() (more bool) {
-	for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector)
{
-		e := makeEvent(ce)
-		if !p.handle(e) {
-			return false
-		}
-		C.pn_collector_pop(p.collector)
+func (eng *Engine) process() {
+	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector)
{
+		eng.handle(makeEvent(ce))
+		C.pn_collector_pop(eng.collector)
 	}
-	return true
 }
 
-func (p *Engine) Connection() Connection { return p.connection }
+func (eng *Engine) Connection() Connection { return eng.connection }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index aa4d76b..8a5cbf8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -23,7 +23,7 @@ package proton
 import "C"
 
 import (
-	"qpid.apache.org/proton/internal"
+	"qpid.apache.org/internal"
 )
 
 // EventHandler handles core proton events.
@@ -166,7 +166,7 @@ func (t MessagingEvent) String() string {
 	case MLinkClosed:
 		return "LinkClosed"
 	case MDisconnected:
-		return "MDisconnected"
+		return "Disconnected"
 	case MSendable:
 		return "Sendable"
 	case MAccepted:
@@ -355,20 +355,12 @@ func (d *MessagingDelegator) HandleEvent(e Event) {
 func (d *MessagingDelegator) incoming(e Event) (err error) {
 	delivery := e.Delivery()
 	if delivery.HasMessage() {
-		if e.Link().State().LocalClosed() {
+		d.mhandler.HandleMessagingEvent(MMessage, e)
+		if d.AutoAccept && !delivery.Settled() {
+			delivery.Accept()
+		}
+		if delivery.Current() {
 			e.Link().Advance()
-			if d.AutoAccept {
-				delivery.Release(false)
-			}
-		} else {
-			d.mhandler.HandleMessagingEvent(MMessage, e)
-			if d.AutoAccept && !delivery.Settled() {
-				if err == nil {
-					delivery.Accept()
-				} else {
-					delivery.Reject()
-				}
-			}
 		}
 	} else if delivery.Updated() && delivery.Settled() {
 		d.mhandler.HandleMessagingEvent(MSettled, e)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go
deleted file mode 100644
index 9f65e04..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/error.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// Internal implementation details - ignore.
-package internal
-
-// #cgo LDFLAGS: -lqpid-proton
-// #include <proton/error.h>
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"fmt"
-	"sync"
-	"unsafe"
-)
-
-// Error type for proton runtime errors returned as error values.
-type Error string
-
-// Error prefixes error message with proton:
-func (e Error) Error() string {
-	return "proton: " + string(e)
-}
-
-// Errorf creates an Error with a formatted message
-func Errorf(format string, a ...interface{}) Error {
-	return Error(fmt.Sprintf(format, a...))
-}
-
-type PnErrorCode int
-
-func (e PnErrorCode) String() string {
-	switch e {
-	case C.PN_EOS:
-		return "end-of-data"
-	case C.PN_ERR:
-		return "error"
-	case C.PN_OVERFLOW:
-		return "overflow"
-	case C.PN_UNDERFLOW:
-		return "underflow"
-	case C.PN_STATE_ERR:
-		return "bad-state"
-	case C.PN_ARG_ERR:
-		return "invalid-argument"
-	case C.PN_TIMEOUT:
-		return "timeout"
-	case C.PN_INTR:
-		return "interrupted"
-	case C.PN_INPROGRESS:
-		return "in-progress"
-	default:
-		return fmt.Sprintf("unknown-error(%d)", e)
-	}
-}
-
-func PnError(p unsafe.Pointer) error {
-	e := (*C.pn_error_t)(p)
-	if e == nil || C.pn_error_code(e) == 0 {
-		return nil
-	}
-	return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
-}
-
-// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
-func panicIf(condition bool, fmt string, args ...interface{}) {
-	if condition {
-		panic(Errorf(fmt, args...))
-	}
-}
-
-// FirstError is a goroutine-safe error holder that keeps the first error that is set.
-type FirstError struct {
-	err  error
-	lock sync.Mutex
-}
-
-// Set the error if not already set, return the error.
-func (e *FirstError) Set(err error) error {
-	e.lock.Lock()
-	defer e.lock.Unlock()
-	if e.err == nil {
-		e.err = err
-	}
-	return e.err
-}
-
-// Get the error.
-func (e *FirstError) Get() error {
-	e.lock.Lock()
-	defer e.lock.Unlock()
-	return e.err
-}
-
-// Assert panics if condition is false with optional formatted message
-func Assert(condition bool, format ...interface{}) {
-	if !condition {
-		if len(format) > 0 {
-			panic(Errorf(format[0].(string), format[1:]...))
-		} else {
-			panic(Errorf("assertion failed"))
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go
deleted file mode 100644
index 77b524c..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel.go
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package internal
-
-// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel().
-type FlexChannel struct {
-	// In channel to send to. close(In) will close the FlexChannel once buffer has drained.
-	In chan<- interface{}
-	// Out channel to receive from. Out closes when In has closed and the buffer is empty.
-	Out <-chan interface{}
-
-	in, out chan interface{}
-	buffer  []interface{}
-	limit   int
-}
-
-// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer.
-//
-// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means
-// there is no limit.
-//
-func NewFlexChannel(limit int) *FlexChannel {
-	fc := &FlexChannel{
-		in:     make(chan interface{}),
-		out:    make(chan interface{}),
-		buffer: make([]interface{}, 0),
-		limit:  limit,
-	}
-	fc.In = fc.in
-	fc.Out = fc.out
-	go fc.run()
-	return fc
-}
-
-func (fc *FlexChannel) run() {
-	defer func() { // Flush the channel on exit
-		for _, data := range fc.buffer {
-			fc.out <- data
-		}
-		close(fc.out)
-	}()
-
-	for {
-		var usein, useout chan interface{}
-		var outvalue interface{}
-		if len(fc.buffer) > 0 {
-			useout = fc.out
-			outvalue = fc.buffer[0]
-		}
-		if len(fc.buffer) < fc.limit || fc.limit < 0 {
-			usein = fc.in
-		}
-		Assert(usein != nil || useout != nil)
-		select {
-		case useout <- outvalue:
-			fc.buffer = fc.buffer[1:]
-		case data, ok := <-usein:
-			if ok {
-				fc.buffer = append(fc.buffer, data)
-			} else {
-				return
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go
b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go
deleted file mode 100644
index d0e1a44..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/flexchannel_test.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package internal
-
-import (
-	"testing"
-)
-
-func recvall(ch <-chan interface{}) (result []interface{}) {
-	for {
-		select {
-		case x := <-ch:
-			result = append(result, x)
-		default:
-			return
-		}
-	}
-}
-
-func sendall(data []interface{}, ch chan<- interface{}) {
-}
-
-func TestFlex(t *testing.T) {
-	fc := NewFlexChannel(5)
-
-	// Test send/receve
-	go func() {
-		for i := 0; i < 4; i++ {
-			fc.In <- i
-		}
-	}()
-
-	for i := 0; i < 4; i++ {
-		j := <-fc.Out
-		if i != j {
-			t.Error("%v != %v", i, j)
-		}
-	}
-	select {
-	case x, ok := <-fc.Out:
-		t.Error("receive empty channel got", x, ok)
-	default:
-	}
-
-	// Test buffer limit
-	for i := 10; i < 15; i++ {
-		fc.In <- i
-	}
-	select {
-	case fc.In <- 0:
-		t.Error("send to full channel did not block")
-	default:
-	}
-	i := <-fc.Out
-	if i != 10 {
-		t.Error("%v != %v", i, 10)
-	}
-	fc.In <- 15
-	close(fc.In)
-
-	for i := 11; i < 16; i++ {
-		j := <-fc.Out
-		if i != j {
-			t.Error("%v != %v", i, j)
-		}
-	}
-
-	x, ok := <-fc.Out
-	if ok {
-		t.Error("Unexpected value on Out", x)
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go
deleted file mode 100644
index 3a1fe2b..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/safemap.go
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package internal
-
-import (
-	"sync"
-)
-
-// SafeMap is a goroutine-safe map of interface{} to interface{}.
-type SafeMap struct {
-	m    map[interface{}]interface{}
-	lock sync.Mutex
-}
-
-func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} }
-
-func (m *SafeMap) Get(key interface{}) interface{} {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	return m.m[key]
-}
-
-func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	v, ok := m.m[key]
-	return v, ok
-}
-
-func (m *SafeMap) Put(key, value interface{}) {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	m.m[key] = value
-}
-
-func (m *SafeMap) Delete(key interface{}) {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	delete(m.m, key)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go b/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go
deleted file mode 100644
index ef941a1..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/internal/uuid.go
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package internal
-
-import (
-	"fmt"
-	"math/rand"
-	"strconv"
-	"sync"
-	"sync/atomic"
-	"time"
-)
-
-type UUID [16]byte
-
-func (u UUID) String() string {
-	return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
-}
-
-// Don't mess with the default random source.
-var randomSource = rand.NewSource(time.Now().UnixNano())
-var randomLock sync.Mutex
-
-func random() byte {
-	randomLock.Lock()
-	defer randomLock.Unlock()
-	return byte(randomSource.Int63())
-}
-
-func UUID4() UUID {
-	var u UUID
-	for i := 0; i < len(u); i++ {
-		u[i] = random()
-	}
-	// See /https://tools.ietf.org/html/rfc4122#section-4.4
-	u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4
-	u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01
-	return u
-}
-
-// A simple atomic counter to generate unique 64 bit IDs.
-type IdCounter struct{ count uint64 }
-
-// NextInt gets the next uint64 value from the atomic counter.
-func (uc *IdCounter) NextInt() uint64 {
-	return atomic.AddUint64(&uc.count, 1)
-}
-
-// Next gets the next integer value encoded as a base32 string, safe for NUL
-// terminated C strings.
-func (uc *IdCounter) Next() string {
-	return strconv.FormatUint(uc.NextInt(), 32)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go
index a4370ff..c545b7e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/message.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/message.go
@@ -25,8 +25,8 @@ package proton
 import "C"
 
 import (
-	"qpid.apache.org/proton/amqp"
-	"qpid.apache.org/proton/internal"
+	"qpid.apache.org/internal"
+	"qpid.apache.org/amqp"
 )
 
 // HasMessage is true if all message data is available.
@@ -34,10 +34,17 @@ import (
 func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() &&
!d.Partial() }
 
 // Message decodes the message containined in a delivery.
-// Will return an error if delivery.HasMessage() is false.
+//
+// Must be called in the correct link context with this delivery as the current message,
+// handling an MMessage event is always a safe context to call this function.
+//
+// Will return an error if message is incomplete or not current.
 func (delivery Delivery) Message() (m amqp.Message, err error) {
-	if !delivery.Readable() || delivery.Partial() {
-		return nil, internal.Errorf("attempting to get incomplete message")
+	if !delivery.Readable() {
+		return nil, internal.Errorf("delivery is not readable")
+	}
+	if delivery.Partial() {
+		return nil, internal.Errorf("delivery has partial message")
 	}
 	data := make([]byte, delivery.Pending())
 	result := delivery.Link().Recv(data)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 4e208f7..7d40890 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -36,8 +36,8 @@ import "C"
 
 import (
 	"fmt"
-	"qpid.apache.org/proton/amqp"
-	"qpid.apache.org/proton/internal"
+	"qpid.apache.org/internal"
+	"qpid.apache.org/amqp"
 	"reflect"
 	"time"
 	"unsafe"
@@ -134,10 +134,22 @@ type Endpoint interface {
 
 // CloseError sets an error condition on an endpoint and closes the endpoint.
 func CloseError(e Endpoint, err error) {
-	e.Condition().SetError(err)
+	if err != nil {
+		e.Condition().SetError(err)
+	}
 	e.Close()
 }
 
+// EndpointError returns the remote error if there is one, the local error if not
+// nil if there is no error.
+func EndpointError(e Endpoint) error {
+	err := e.RemoteCondition().Error()
+	if err == nil {
+		err = e.Condition().Error()
+	}
+	return err
+}
+
 const (
 	Received uint64 = C.PN_RECEIVED
 	Accepted        = C.PN_ACCEPTED
@@ -271,9 +283,12 @@ func (l Link) Connection() Connection { return l.Session().Connection()
}
 
 // Human-readable link description including name, source, target and direction.
 func (l Link) String() string {
-	if l.IsSender() {
+	switch {
+	case l.IsNil():
+		return fmt.Sprintf("<nil-link>")
+	case l.IsSender():
 		return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
-	} else {
+	default:
 		return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
index 82de2cf..074495d 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
@@ -25,7 +25,7 @@ under the License.
 package proton
 
 import (
-	"qpid.apache.org/proton/internal"
+	"qpid.apache.org/internal"
 	"time"
 	"unsafe"
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message