qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [48/50] [abbrv] qpid-proton git commit: Merge tag '0.19.0' into go1
Date Fri, 05 Jan 2018 16:36:12 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index c0f0093,0000000..d28a09f
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,422 -1,0 +1,422 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +import (
 +	"fmt"
 +	"net"
 +	"os"
 +	"strings"
 +	"sync"
 +	"time"
 +	"unsafe"
 +)
 +
 +/*
 +#include <proton/connection.h>
 +#include <proton/event.h>
 +#include <proton/error.h>
 +#include <proton/handlers.h>
 +#include <proton/session.h>
 +#include <proton/transport.h>
 +#include <memory.h>
 +#include <stdlib.h>
 +*/
 +import "C"
 +
 +// Injecter allows functions to be "injected" into the event-processing loop, to
 +// be called in the same goroutine as event handlers.
 +type Injecter interface {
 +	// Inject a function into the engine goroutine.
 +	//
 +	// f() will be called in the same goroutine as event handlers, so it can safely
 +	// use values belonging to event handlers without synchronization. f() should
 +	// not block, no further events or injected functions can be processed until
 +	// f() returns.
 +	//
 +	// Returns a non-nil error if the function could not be injected and will
 +	// never be called. Otherwise the function will eventually be called.
 +	//
 +	// Note that proton values (Link, Session, Connection etc.) that existed when
 +	// Inject(f) was called may have become invalid by the time f() is executed.
 +	// Handlers should handle keep track of Closed events to ensure proton values
 +	// are not used after they become invalid. One technique is to have map from
 +	// proton values to application values. Check that the map has the correct
 +	// proton/application value pair at the start of the injected function and
 +	// delete the value from the map when handling a Closed event.
 +	Inject(f func()) error
 +
 +	// InjectWait is like Inject but does not return till f() has completed.
 +	// If f() cannot be injected it returns the error from Inject(), otherwise
 +	// it returns the error from f()
 +	InjectWait(f func() error) error
 +}
 +
 +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
 +// Handler functions sequentially in a single goroutine. Actions taken by
 +// Handler functions (such as sending messages) are encoded and written to the
 +// net.Conn. You can create multiple Engines to handle multiple connections
 +// concurrently.
 +//
 +// You implement the EventHandler and/or MessagingHandler interfaces and provide
 +// those values to NewEngine(). Their HandleEvent method will be called in the
 +// event-handling goroutine.
 +//
 +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
 +// other goroutines, store them, or use them as map indexes. Effectively they are
 +// just pointers.  Other goroutines cannot call their methods directly but they can
 +// can create a function closure to call such methods and pass it to Engine.Inject()
 +// to have it evaluated in the engine goroutine.
 +//
 +// You are responsible for ensuring you don't use an event value after it is
 +// invalid. The handler methods will tell you when a value is no longer valid. For
 +// example after a LinkClosed event, that link is no longer valid. If you do
 +// Link.Close() yourself (in a handler or injected function) the link remains valid
- // until the corresponing LinkClosed event is received by the handler.
++// until the corresponding LinkClosed event is received by the handler.
 +//
 +// Engine.Close() will take care of cleaning up any remaining values when you are
 +// done with the Engine. All values associated with a engine become invalid when you
 +// call Engine.Close()
 +//
 +// The qpid.apache.org/proton/concurrent package will do all this for you, so it
 +// may be a better choice for some applications.
 +//
 +type Engine struct {
 +	// Error is set on exit from Run() if there was an error.
 +	err    ErrorHolder
 +	inject chan func()
 +
 +	conn       net.Conn
 +	connection Connection
 +	transport  Transport
 +	collector  *C.pn_collector_t
 +	handlers   []EventHandler // Handlers for proton events.
 +	running    chan struct{}  // This channel will be closed when the goroutines are done.
 +	closeOnce  sync.Once
 +	timer      *time.Timer
 +	traceEvent bool
 +}
 +
 +const bufferSize = 4096
 +
 +func envBool(name string) bool {
 +	v := strings.ToLower(os.Getenv(name))
 +	return v == "true" || v == "1" || v == "yes" || v == "on"
 +}
 +
 +// Create a new Engine and call Initialize() with conn and handlers
 +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 +	eng := &Engine{}
 +	return eng, eng.Initialize(conn, handlers...)
 +}
 +
 +// Initialize an Engine with a connection and handlers. Start it with Run()
 +func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error {
 +	eng.inject = make(chan func())
 +	eng.conn = conn
 +	eng.connection = Connection{C.pn_connection()}
 +	eng.transport = Transport{C.pn_transport()}
 +	eng.collector = C.pn_collector()
 +	eng.handlers = handlers
 +	eng.running = make(chan struct{})
 +	eng.timer = time.NewTimer(0)
 +	eng.traceEvent = envBool("PN_TRACE_EVT")
 +	if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
 +		eng.free()
 +		return fmt.Errorf("proton.NewEngine cannot allocate")
 +	}
 +	C.pn_connection_collect(eng.connection.pn, eng.collector)
 +	return nil
 +}
 +
 +// Create a byte slice backed by C memory.
 +// Empty or error (size <= 0) returns a nil byte slice.
 +func cByteSlice(start unsafe.Pointer, size int) []byte {
 +	if start == nil || size <= 0 {
 +		return nil
 +	} else {
 +		// Slice from very large imaginary array in C memory
 +		return (*[1 << 30]byte)(start)[:size:size]
 +	}
 +}
 +
 +func (eng *Engine) Connection() Connection {
 +	return eng.connection
 +}
 +
 +func (eng *Engine) Transport() Transport {
 +	return eng.transport
 +}
 +
 +func (eng *Engine) String() string {
 +	return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 +}
 +
 +func (eng *Engine) Id() string {
 +	// Use transport address to match default PN_TRACE_FRM=1 output.
 +	return fmt.Sprintf("%p", eng.Transport().CPtr())
 +}
 +
 +func (eng *Engine) Error() error {
 +	return eng.err.Get()
 +}
 +
 +// Inject a function into the Engine's event loop.
 +//
 +// f() will be called in the same event-processing goroutine that calls Handler
 +// methods. f() can safely call methods on values that belong to this engine
 +// (Sessions, Links etc)
 +//
 +// The injected function has no parameters or return values. It is normally a
 +// closure and can use channels to communicate with the injecting goroutine if
 +// necessary.
 +//
 +// Returns a non-nil error if the engine is closed before the function could be
 +// injected.
 +func (eng *Engine) Inject(f func()) error {
 +	select {
 +	case eng.inject <- f:
 +		return nil
 +	case <-eng.running:
 +		return eng.Error()
 +	}
 +}
 +
 +// InjectWait is like Inject but does not return till f() has completed or the
 +// engine is closed, and returns an error value from f()
 +func (eng *Engine) InjectWait(f func() error) error {
 +	done := make(chan error)
 +	defer close(done)
 +	err := eng.Inject(func() { done <- f() })
 +	if err != nil {
 +		return err
 +	}
 +	select {
 +	case <-eng.running:
 +		return eng.Error()
 +	case err := <-done:
 +		return err
 +	}
 +}
 +
 +// Server puts the Engine in server mode, meaning it will auto-detect security settings
on
- // the incoming connnection such as use of SASL and SSL.
++// the incoming connection such as use of SASL and SSL.
 +// Must be called before Run()
 +//
 +func (eng *Engine) Server() { eng.Transport().SetServer() }
 +
 +func (eng *Engine) disconnect(err error) {
 +	cond := eng.Transport().Condition()
 +	cond.SetError(err)              // Set the provided error.
 +	cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set.
 +	eng.transport.CloseTail()
 +	eng.transport.CloseHead()
 +}
 +
 +// Close the engine's connection.
 +// If err != nil pass it to the remote end as the close condition.
 +// Returns when the remote end closes or disconnects.
 +func (eng *Engine) Close(err error) {
 +	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +	<-eng.running
 +}
 +
 +// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
 +func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
 +	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +	select {
 +	case <-eng.running:
 +	case <-time.After(timeout):
 +		eng.Disconnect(err)
 +	}
 +}
 +
 +// Disconnect the engine's connection immediately without an AMQP close.
 +// Process any termination events before returning.
 +func (eng *Engine) Disconnect(err error) {
 +	_ = eng.Inject(func() { eng.disconnect(err) })
 +	<-eng.running
 +}
 +
 +// Let proton run timed activity and set up the next tick
 +func (eng *Engine) tick() {
 +	now := time.Now()
 +	next := eng.Transport().Tick(now)
 +	if !next.IsZero() {
 +		eng.timer.Reset(next.Sub(now))
 +	}
 +}
 +
 +func (eng *Engine) dispatch() bool {
 +	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector)
{
 +		e := makeEvent(ce, eng)
 +		if eng.traceEvent {
 +			eng.transport.Log(e.String())
 +		}
 +		for _, h := range eng.handlers {
 +			h.HandleEvent(e)
 +		}
 +		if e.Type() == EConnectionRemoteOpen {
 +			eng.tick() // Update the tick if changed by remote.
 +		}
 +		C.pn_collector_pop(eng.collector)
 +	}
 +	return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil
 +}
 +
 +func (eng *Engine) writeBuffer() []byte {
 +	size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
 +	start := eng.Transport().Head()
 +	return cByteSlice(start, size)
 +}
 +
 +func (eng *Engine) readBuffer() []byte {
 +	size := eng.Transport().Capacity()
 +	start := eng.Transport().Tail()
 +	return cByteSlice(start, size)
 +}
 +
 +func (eng *Engine) free() {
 +	if !eng.transport.IsNil() {
 +		eng.transport.Unbind()
 +		eng.transport.Free()
 +		eng.transport = Transport{}
 +	}
 +	if !eng.connection.IsNil() {
 +		eng.connection.Free()
 +		eng.connection = Connection{}
 +	}
 +	if eng.collector != nil {
 +		C.pn_collector_release(eng.collector)
 +		C.pn_collector_free(eng.collector)
 +		eng.collector = nil
 +	}
 +}
 +
 +// Run the engine. Engine.Run() will exit when the engine is closed or
 +// disconnected.  You can check for errors after exit with Engine.Error().
 +//
 +func (eng *Engine) Run() error {
 +	defer eng.free()
 +	eng.transport.Bind(eng.connection)
 +	eng.tick() // Start ticking if needed
 +
 +	// Channels for read and write buffers going in and out of the read/write goroutines.
- 	// The channels are unbuffered: we want to exchange buffers in seuquence.
++	// The channels are unbuffered: we want to exchange buffers in sequence.
 +	readsIn, writesIn := make(chan []byte), make(chan []byte)
 +	readsOut, writesOut := make(chan []byte), make(chan []byte)
 +
 +	wait := sync.WaitGroup{}
 +	wait.Add(2) // Read and write goroutines
 +
 +	go func() { // Read goroutine
 +		defer wait.Done()
 +		for {
 +			rbuf, ok := <-readsIn
 +			if !ok {
 +				return
 +			}
 +			n, err := eng.conn.Read(rbuf)
 +			if n > 0 {
 +				readsOut <- rbuf[:n]
 +			} else if err != nil {
 +				_ = eng.Inject(func() {
 +					eng.Transport().Condition().SetError(err)
 +					eng.Transport().CloseTail()
 +				})
 +				return
 +			}
 +		}
 +	}()
 +
 +	go func() { // Write goroutine
 +		defer wait.Done()
 +		for {
 +			wbuf, ok := <-writesIn
 +			if !ok {
 +				return
 +			}
 +			n, err := eng.conn.Write(wbuf)
 +			if n > 0 {
 +				writesOut <- wbuf[:n]
 +			} else if err != nil {
 +				_ = eng.Inject(func() {
 +					eng.Transport().Condition().SetError(err)
 +					eng.Transport().CloseHead()
 +				})
 +				return
 +			}
 +		}
 +	}()
 +
 +	for eng.dispatch() {
 +		readBuf := eng.readBuffer()
 +		writeBuf := eng.writeBuffer()
 +		// Note that getting the buffers can generate events (eg. SASL events) that
 +		// might close the transport. Check if we are already finished before
 +		// blocking for IO.
 +		if !eng.dispatch() {
 +			break
 +		}
 +
 +		// sendReads/sendWrites are nil (not sendable in select) unless we have a
 +		// buffer to read/write
 +		var sendReads, sendWrites chan []byte
 +		if readBuf != nil {
 +			sendReads = readsIn
 +		}
 +		if writeBuf != nil {
 +			sendWrites = writesIn
 +		}
 +
 +		// Send buffers to the read/write goroutines if we have them.
 +		// Get buffers from the read/write goroutines and process them
 +		// Check for injected functions
 +		select {
 +
 +		case sendReads <- readBuf:
 +
 +		case sendWrites <- writeBuf:
 +
 +		case buf := <-readsOut:
 +			eng.transport.Process(uint(len(buf)))
 +
 +		case buf := <-writesOut:
 +			eng.transport.Pop(uint(len(buf)))
 +
 +		case f, ok := <-eng.inject: // Function injected from another goroutine
 +			if ok {
 +				f()
 +			}
 +
 +		case <-eng.timer.C:
 +			eng.tick()
 +		}
 +	}
 +
 +	eng.err.Set(EndpointError(eng.Connection()))
 +	eng.err.Set(eng.Transport().Condition().Error())
 +	close(readsIn)
 +	close(writesIn)
 +	close(eng.running)   // Signal goroutines have exited and Error is set, disable Inject()
 +	_ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
 +	wait.Wait()          // Wait for goroutines
 +	return eng.err.Get()
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/handlers.go
----------------------------------------------------------------------
diff --cc proton/handlers.go
index 961136e,0000000..f101548
mode 100644,000000..100644
--- a/proton/handlers.go
+++ b/proton/handlers.go
@@@ -1,395 -1,0 +1,395 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +import "fmt"
 +
 +// EventHandler handles core proton events.
 +type EventHandler interface {
 +	// HandleEvent is called with an event.
 +	// Typically HandleEvent() is implemented as a switch on e.Type()
 +	// Returning an error will stop the Engine.
 +	HandleEvent(e Event)
 +}
 +
 +// MessagingHandler provides an alternative interface to EventHandler.
 +// it is easier to use for most applications that send and receive messages.
 +//
 +// Implement this interface and then wrap your value with a MessagingHandlerDelegator.
 +// MessagingHandlerDelegator implements EventHandler and can be registered with a Engine.
 +//
 +type MessagingHandler interface {
 +	// HandleMessagingEvent is called with  MessagingEvent.
 +	// Typically HandleEvent() is implemented as a switch on e.Type()
 +	// Returning an error will stop the Engine.
 +	HandleMessagingEvent(MessagingEvent, Event)
 +}
 +
 +// MessagingEvent provides a set of events that are easier to work with than the
 +// core events defined by EventType
 +//
 +// There are 3 types of "endpoint": Connection, Session and Link.  For each
 +// endpoint there are 5 events: Opening, Opened, Closing, Closed and Error.
 +//
 +// The meaning of these events is as follows:
 +//
 +// Opening: The remote end opened, the local end will open automatically.
 +//
 +// Opened: Both ends are open, regardless of which end opened first.
 +//
 +// Closing: The remote end closed without error, the local end will close automatically.
 +//
 +// Error: The remote end closed with an error, the local end will close automatically.
 +//
 +// Closed: Both ends are closed, regardless of which end closed first or if there was an
error.
 +// No further events will be received for the endpoint.
 +//
 +type MessagingEvent int
 +
 +const (
 +	// The event loop starts.
 +	MStart MessagingEvent = iota
 +	// The peer closes the connection with an error condition.
 +	MConnectionError
 +	// The peer closes the session with an error condition.
 +	MSessionError
 +	// The peer closes the link with an error condition.
 +	MLinkError
 +	// The peer Initiates the opening of the connection.
 +	MConnectionOpening
 +	// The peer initiates the opening of the session.
 +	MSessionOpening
 +	// The peer initiates the opening of the link.
 +	MLinkOpening
 +	// The connection is opened.
 +	MConnectionOpened
 +	// The session is opened.
 +	MSessionOpened
 +	// The link is opened.
 +	MLinkOpened
 +	// The peer initiates the closing of the connection.
 +	MConnectionClosing
 +	// The peer initiates the closing of the session.
 +	MSessionClosing
 +	// The peer initiates the closing of the link.
 +	MLinkClosing
 +	// Both ends of the connection are closed.
 +	MConnectionClosed
 +	// Both ends of the session are closed.
 +	MSessionClosed
 +	// Both ends of the link are closed.
 +	MLinkClosed
 +	// The sender link has credit and messages can
 +	// therefore be transferred.
 +	MSendable
 +	// The remote peer accepts an outgoing message.
 +	MAccepted
 +	// The remote peer rejects an outgoing message.
 +	MRejected
 +	// The peer releases an outgoing message. Note that this may be in response to
 +	// either the RELEASE or MODIFIED state as defined by the AMQP specification.
 +	MReleased
 +	// The peer has settled the outgoing message. This is the point at which it
 +	// should never be re-transmitted.
 +	MSettled
 +	// A message is received. Call Event.Delivery().Message() to decode as an amqp.Message.
 +	// To manage the outcome of this messages (e.g. to accept or reject the message)
 +	// use Event.Delivery().
 +	MMessage
 +	// A network connection was disconnected.
 +	MDisconnected
 +)
 +
 +func (t MessagingEvent) String() string {
 +	switch t {
 +	case MStart:
 +		return "Start"
 +	case MConnectionError:
 +		return "ConnectionError"
 +	case MSessionError:
 +		return "SessionError"
 +	case MLinkError:
 +		return "LinkError"
 +	case MConnectionOpening:
 +		return "ConnectionOpening"
 +	case MSessionOpening:
 +		return "SessionOpening"
 +	case MLinkOpening:
 +		return "LinkOpening"
 +	case MConnectionOpened:
 +		return "ConnectionOpened"
 +	case MSessionOpened:
 +		return "SessionOpened"
 +	case MLinkOpened:
 +		return "LinkOpened"
 +	case MConnectionClosing:
 +		return "ConnectionClosing"
 +	case MSessionClosing:
 +		return "SessionClosing"
 +	case MLinkClosing:
 +		return "LinkClosing"
 +	case MConnectionClosed:
 +		return "ConnectionClosed"
 +	case MSessionClosed:
 +		return "SessionClosed"
 +	case MLinkClosed:
 +		return "LinkClosed"
 +	case MDisconnected:
 +		return "Disconnected"
 +	case MSendable:
 +		return "Sendable"
 +	case MAccepted:
 +		return "Accepted"
 +	case MRejected:
 +		return "Rejected"
 +	case MReleased:
 +		return "Released"
 +	case MSettled:
 +		return "Settled"
 +	case MMessage:
 +		return "Message"
 +	default:
 +		return "Unknown"
 +	}
 +}
 +
 +// ResourceHandler provides a simple way to track the creation and deletion of
 +// various proton objects.
 +// endpointDelegator captures common patterns for endpoints opening/closing
 +type endpointDelegator struct {
 +	remoteOpen, remoteClose, localOpen, localClose EventType
 +	opening, opened, closing, closed, error        MessagingEvent
 +	endpoint                                       func(Event) Endpoint
 +	delegator                                      *MessagingAdapter
 +}
 +
 +// HandleEvent handles an open/close event for an endpoint in a generic way.
 +func (d endpointDelegator) HandleEvent(e Event) {
 +	endpoint := d.endpoint(e)
 +	state := endpoint.State()
 +
 +	switch e.Type() {
 +
 +	case d.localOpen:
 +		if state.RemoteActive() {
 +			d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
 +		}
 +
 +	case d.remoteOpen:
 +		d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
 +		switch {
 +		case state.LocalActive():
 +			d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
 +		case state.LocalUninit():
 +			if d.delegator.AutoOpen {
 +				endpoint.Open()
 +			}
 +		}
 +
 +	case d.remoteClose:
 +		if endpoint.RemoteCondition().IsSet() { // Closed with error
 +			d.delegator.mhandler.HandleMessagingEvent(d.error, e)
 +		} else {
 +			d.delegator.mhandler.HandleMessagingEvent(d.closing, e)
 +		}
 +		if state.LocalClosed() {
 +			d.delegator.mhandler.HandleMessagingEvent(d.closed, e)
 +		} else if state.LocalActive() {
 +			endpoint.Close()
 +		}
 +
 +	case d.localClose:
 +		if state.RemoteClosed() {
 +			d.delegator.mhandler.HandleMessagingEvent(d.closed, e)
 +		}
 +
 +	default:
 +		// We shouldn't be called with any other event type.
 +		panic(fmt.Errorf("internal error, not an open/close event: %s", e))
 +	}
 +}
 +
 +type flowcontroller struct {
 +	window, drained int
 +}
 +
 +func (d flowcontroller) HandleEvent(e Event) {
 +	link := e.Link()
 +
 +	switch e.Type() {
 +	case ELinkLocalOpen, ELinkRemoteOpen, ELinkFlow, EDelivery:
 +		if link.IsReceiver() {
 +			d.drained += link.Drained()
 +			if d.drained != 0 {
 +				link.Flow(d.window - link.Credit())
 +			}
 +		}
 +	}
 +}
 +
- // MessagingAdapter implments a EventHandler and delegates to a MessagingHandler.
++// MessagingAdapter implements a EventHandler and delegates to a MessagingHandler.
 +// You can modify the exported fields before you pass the MessagingAdapter to
 +// a Engine.
 +type MessagingAdapter struct {
 +	mhandler                  MessagingHandler
 +	connection, session, link endpointDelegator
 +	flowcontroller            EventHandler
 +
 +	// AutoSettle (default true) automatically pre-settle outgoing messages.
 +	AutoSettle bool
 +	// AutoAccept (default true) automatically accept and settle incoming messages
 +	// if they are not settled by the delegate.
 +	AutoAccept bool
 +	// AutoOpen (default true) automatically open remotely opened endpoints.
 +	AutoOpen bool
 +	// Prefetch (default 10) initial credit to issue for incoming links.
 +	Prefetch int
 +	// PeerCloseIsError (default false) if true a close by the peer will be treated as an error.
 +	PeerCloseError bool
 +}
 +
 +func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter {
 +	return &MessagingAdapter{
 +		mhandler:       h,
 +		flowcontroller: nil,
 +		AutoSettle:     true,
 +		AutoAccept:     true,
 +		AutoOpen:       true,
 +		Prefetch:       10,
 +		PeerCloseError: false,
 +	}
 +}
 +
 +func handleIf(h EventHandler, e Event) {
 +	if h != nil {
 +		h.HandleEvent(e)
 +	}
 +}
 +
 +// Handle a proton event by passing the corresponding MessagingEvent(s) to
 +// the MessagingHandler.
 +func (d *MessagingAdapter) HandleEvent(e Event) {
 +	handleIf(d.flowcontroller, e)
 +
 +	switch e.Type() {
 +
 +	case EConnectionInit:
 +		d.connection = endpointDelegator{
 +			EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose,
 +			MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed,
 +			MConnectionError,
 +			func(e Event) Endpoint { return e.Connection() },
 +			d,
 +		}
 +		d.session = endpointDelegator{
 +			ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose,
 +			MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed,
 +			MSessionError,
 +			func(e Event) Endpoint { return e.Session() },
 +			d,
 +		}
 +		d.link = endpointDelegator{
 +			ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose,
 +			MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed,
 +			MLinkError,
 +			func(e Event) Endpoint { return e.Link() },
 +			d,
 +		}
 +		if d.Prefetch > 0 {
 +			d.flowcontroller = flowcontroller{window: d.Prefetch, drained: 0}
 +		}
 +		d.mhandler.HandleMessagingEvent(MStart, e)
 +
 +	case EConnectionRemoteOpen:
 +
 +		d.connection.HandleEvent(e)
 +
 +	case EConnectionRemoteClose:
 +		d.connection.HandleEvent(e)
 +		e.Connection().Transport().CloseTail()
 +
 +	case EConnectionLocalOpen, EConnectionLocalClose:
 +		d.connection.HandleEvent(e)
 +
 +	case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose:
 +		d.session.HandleEvent(e)
 +
 +	case ELinkRemoteOpen:
 +		e.Link().Source().Copy(e.Link().RemoteSource())
 +		e.Link().Target().Copy(e.Link().RemoteTarget())
 +		d.link.HandleEvent(e)
 +
 +	case ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose:
 +		d.link.HandleEvent(e)
 +
 +	case ELinkFlow:
 +		if e.Link().IsSender() && e.Link().Credit() > 0 {
 +			d.mhandler.HandleMessagingEvent(MSendable, e)
 +		}
 +
 +	case EDelivery:
 +		if e.Delivery().Link().IsReceiver() {
 +			d.incoming(e)
 +		} else {
 +			d.outgoing(e)
 +		}
 +
 +	case ETransportClosed:
 +		d.mhandler.HandleMessagingEvent(MDisconnected, e)
 +	}
 +}
 +
 +func (d *MessagingAdapter) incoming(e Event) {
 +	delivery := e.Delivery()
 +	if delivery.HasMessage() {
 +		d.mhandler.HandleMessagingEvent(MMessage, e)
 +		if d.AutoAccept && !delivery.Settled() {
 +			delivery.Accept()
 +		}
 +		if delivery.Current() {
 +			e.Link().Advance()
 +		}
 +	} else if delivery.Updated() && delivery.Settled() {
 +		d.mhandler.HandleMessagingEvent(MSettled, e)
 +	}
 +	return
 +}
 +
 +func (d *MessagingAdapter) outgoing(e Event) {
 +	delivery := e.Delivery()
 +	if delivery.Updated() {
 +		switch delivery.Remote().Type() {
 +		case Accepted:
 +			d.mhandler.HandleMessagingEvent(MAccepted, e)
 +		case Rejected:
 +			d.mhandler.HandleMessagingEvent(MRejected, e)
 +		case Released, Modified:
 +			d.mhandler.HandleMessagingEvent(MReleased, e)
 +		}
 +		if delivery.Settled() {
 +			// The delivery was settled remotely, inform the local end.
 +			d.mhandler.HandleMessagingEvent(MSettled, e)
 +		}
 +		if d.AutoSettle {
 +			delivery.Settle() // Local settle, don't mhandler MSettled till the remote end settles.
 +		}
 +	}
 +	return
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/message.go
----------------------------------------------------------------------
diff --cc proton/message.go
index 2336483,0000000..fbb1d48
mode 100644,000000..100644
--- a/proton/message.go
+++ b/proton/message.go
@@@ -1,93 -1,0 +1,93 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +// #include <proton/types.h>
 +// #include <proton/message.h>
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +	"fmt"
 +	"qpid.apache.org/amqp"
 +	"strconv"
 +	"sync/atomic"
 +)
 +
 +// HasMessage is true if all message data is available.
 +// Equivalent to !d.isNil && d.Readable() && !d.Partial()
 +func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() &&
!d.Partial() }
 +
- // Message decodes the message containined in a delivery.
++// Message decodes the message contained in a delivery.
 +//
 +// Must be called in the correct link context with this delivery as the current message,
 +// handling an MMessage event is always a safe context to call this function.
 +//
 +// Will return an error if message is incomplete or not current.
 +func (delivery Delivery) Message() (m amqp.Message, err error) {
 +	if !delivery.Readable() {
 +		return nil, fmt.Errorf("delivery is not readable")
 +	}
 +	if delivery.Partial() {
 +		return nil, fmt.Errorf("delivery has partial message")
 +	}
 +	data := make([]byte, delivery.Pending())
 +	result := delivery.Link().Recv(data)
 +	if result != len(data) {
 +		return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result))
 +	}
 +	m = amqp.NewMessage()
 +	err = m.Decode(data)
 +	return
 +}
 +
 +// Process-wide atomic counter for generating tag names
 +var tagCounter uint64
 +
 +func nextTag() string {
 +	return strconv.FormatUint(atomic.AddUint64(&tagCounter, 1), 32)
 +}
 +
 +// Send sends a amqp.Message over a Link.
 +// Returns a Delivery that can be use to determine the outcome of the message.
 +func (link Link) Send(m amqp.Message) (Delivery, error) {
 +	if !link.IsSender() {
 +		return Delivery{}, fmt.Errorf("attempt to send message on receiving link")
 +	}
 +
 +	delivery := link.Delivery(nextTag())
 +	bytes, err := m.Encode(nil)
 +	if err != nil {
- 		return Delivery{}, fmt.Errorf("cannot send mesage %s", err)
++		return Delivery{}, fmt.Errorf("cannot send message %s", err)
 +	}
 +	result := link.SendBytes(bytes)
 +	link.Advance()
 +	if result != len(bytes) {
 +		if result < 0 {
 +			return delivery, fmt.Errorf("send failed %v", PnErrorCode(result))
 +		} else {
 +			return delivery, fmt.Errorf("send incomplete %v of %v", result, len(bytes))
 +		}
 +	}
 +	if link.RemoteSndSettleMode() == SndSettled {
 +		delivery.Settle()
 +	}
 +	return delivery, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b25d21e6/proton/wrappers.go
----------------------------------------------------------------------
diff --cc proton/wrappers.go
index 09f3e65,0000000..a7b7fb2
mode 100644,000000..100644
--- a/proton/wrappers.go
+++ b/proton/wrappers.go
@@@ -1,460 -1,0 +1,460 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +// This file contains special-case wrapper functions or wrappers that don't follow
 +// the pattern of genwrap.go.
 +
 +package proton
 +
 +//#include <proton/codec.h>
 +//#include <proton/connection.h>
 +//#include <proton/delivery.h>
 +//#include <proton/event.h>
 +//#include <proton/link.h>
 +//#include <proton/link.h>
 +//#include <proton/object.h>
 +//#include <proton/sasl.h>
 +//#include <proton/session.h>
 +//#include <proton/transport.h>
 +//#include <stdlib.h>
 +import "C"
 +
 +import (
 +	"fmt"
 +	"qpid.apache.org/amqp"
 +	"reflect"
 +	"time"
 +	"unsafe"
 +)
 +
 +// TODO aconway 2015-05-05: Documentation for generated types.
 +
 +// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the
 +// Go type implementing this interface. For low level, at-your-own-risk use only.
 +type CHandle interface {
 +	// CPtr returns the unsafe C pointer, equivalent to a C void*.
 +	CPtr() unsafe.Pointer
 +}
 +
 +// Incref increases the refcount of a proton value, which prevents the
 +// underlying C struct being freed until you call Decref().
 +//
 +// It can be useful to "pin" a proton value in memory while it is in use by
 +// goroutines other than the event loop goroutine. For example if you Incref() a
 +// Link, the underlying object is not freed when the link is closed, so means
 +// other goroutines can continue to safely use it as an index in a map or inject
 +// it into the event loop goroutine. There will of course be an error if you try
 +// to use a link after it is closed, but not a segmentation fault.
 +func Incref(c CHandle) {
 +	if p := c.CPtr(); p != nil {
 +		C.pn_incref(p)
 +	}
 +}
 +
 +// Decref decreases the refcount of a proton value, freeing the underlying C
 +// struct if this is the last reference.  Only call this if you previously
 +// called Incref() for this value.
 +func Decref(c CHandle) {
 +	if p := c.CPtr(); p != nil {
 +		C.pn_decref(p)
 +	}
 +}
 +
 +// Event is an AMQP protocol event.
 +type Event struct {
 +	pn         *C.pn_event_t
 +	eventType  EventType
 +	connection Connection
 +	transport  Transport
 +	session    Session
 +	link       Link
 +	delivery   Delivery
 +	injecter   Injecter
 +}
 +
 +func makeEvent(pn *C.pn_event_t, injecter Injecter) Event {
 +	return Event{
 +		pn:         pn,
 +		eventType:  EventType(C.pn_event_type(pn)),
 +		connection: Connection{C.pn_event_connection(pn)},
 +		transport:  Transport{C.pn_event_transport(pn)},
 +		session:    Session{C.pn_event_session(pn)},
 +		link:       Link{C.pn_event_link(pn)},
 +		delivery:   Delivery{C.pn_event_delivery(pn)},
 +		injecter:   injecter,
 +	}
 +}
 +func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
 +func (e Event) Type() EventType        { return e.eventType }
 +func (e Event) Connection() Connection { return e.connection }
 +func (e Event) Transport() Transport   { return e.transport }
 +func (e Event) Session() Session       { return e.session }
 +func (e Event) Link() Link             { return e.link }
 +func (e Event) Delivery() Delivery     { return e.delivery }
 +func (e Event) String() string         { return e.Type().String() }
 +
 +// Injecter should not be used in a handler function, but it can be passed to
 +// other goroutines (via a channel or to a goroutine started by handler
 +// functions) to let them inject functions back into the handlers goroutine.
 +func (e Event) Injecter() Injecter { return e.injecter }
 +
 +// Data is an intermediate form of decoded AMQP data.
 +type Data struct{ pn *C.pn_data_t }
 +
 +func (d Data) Free()                { C.pn_data_free(d.pn) }
 +func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) }
 +func (d Data) Clear()               { C.pn_data_clear(d.pn) }
 +func (d Data) Rewind()              { C.pn_data_rewind(d.pn) }
 +func (d Data) Next()                { C.pn_data_next(d.pn) }
 +func (d Data) Error() error         { return PnError(C.pn_data_error(d.pn)) }
 +func (d Data) Empty() bool          { return C.pn_data_size(d.pn) == 0 }
 +
 +func (d Data) String() string {
 +	str := C.pn_string(C.CString(""))
 +	defer C.pn_free(unsafe.Pointer(str))
 +	C.pn_inspect(unsafe.Pointer(d.pn), str)
 +	return C.GoString(C.pn_string_get(str))
 +}
 +
 +// Unmarshal the value of d into value pointed at by ptr, see amqp.Unmarshal() for details
 +func (d Data) Unmarshal(ptr interface{}) error {
 +	d.Rewind()
 +	d.Next()
 +	err := amqp.UnmarshalUnsafe(d.CPtr(), ptr)
 +	return err
 +}
 +
 +// Marshal the value v into d, see amqp.Marshal() for details
 +func (d Data) Marshal(v interface{}) error {
 +	d.Clear()
 +	return amqp.MarshalUnsafe(v, d.CPtr())
 +}
 +
 +// State holds the state flags for an AMQP endpoint.
 +type State byte
 +
 +const (
 +	SLocalUninit  State = C.PN_LOCAL_UNINIT
 +	SLocalActive        = C.PN_LOCAL_ACTIVE
 +	SLocalClosed        = C.PN_LOCAL_CLOSED
 +	SRemoteUninit       = C.PN_REMOTE_UNINIT
 +	SRemoteActive       = C.PN_REMOTE_ACTIVE
 +	SRemoteClosed       = C.PN_REMOTE_CLOSED
 +)
 +
 +// Has is True if bits & state is non 0.
 +func (s State) Has(bits State) bool { return s&bits != 0 }
 +
 +func (s State) LocalUninit() bool  { return s.Has(SLocalUninit) }
 +func (s State) LocalActive() bool  { return s.Has(SLocalActive) }
 +func (s State) LocalClosed() bool  { return s.Has(SLocalClosed) }
 +func (s State) RemoteUninit() bool { return s.Has(SRemoteUninit) }
 +func (s State) RemoteActive() bool { return s.Has(SRemoteActive) }
 +func (s State) RemoteClosed() bool { return s.Has(SRemoteClosed) }
 +
- // Return a State containig just the local flags
++// Return a State containing just the local flags
 +func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) }
 +
- // Return a State containig just the remote flags
++// Return a State containing just the remote flags
 +func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) }
 +
 +// Endpoint is the common interface for Connection, Link and Session.
 +type Endpoint interface {
 +	// State is the open/closed state.
 +	State() State
 +	// Open an endpoint.
 +	Open()
 +	// Close an endpoint.
 +	Close()
 +	// Condition holds a local error condition.
 +	Condition() Condition
 +	// RemoteCondition holds a remote error condition.
 +	RemoteCondition() Condition
 +	// Human readable name
 +	String() string
 +	// Human readable endpoint type "sender-link", "session" etc.
 +	Type() string
 +}
 +
 +// CloseError sets an error condition (if err != nil) on an endpoint and closes
 +// the endpoint if not already closed
 +func CloseError(e Endpoint, err error) {
 +	if err != nil && !e.Condition().IsSet() {
 +		e.Condition().SetError(err)
 +	}
 +	e.Close()
 +}
 +
 +// EndpointError returns the remote error if there is one, the local error if not
 +// nil if there is no error.
 +func EndpointError(e Endpoint) error {
 +	err := e.RemoteCondition().Error()
 +	if err == nil {
 +		err = e.Condition().Error()
 +	}
 +	return err
 +}
 +
 +const (
 +	Received uint64 = C.PN_RECEIVED
 +	Accepted        = C.PN_ACCEPTED
 +	Rejected        = C.PN_REJECTED
 +	Released        = C.PN_RELEASED
 +	Modified        = C.PN_MODIFIED
 +)
 +
 +// SettleAs is equivalent to d.Update(disposition); d.Settle()
 +func (d Delivery) SettleAs(disposition uint64) {
 +	d.Update(disposition)
 +	d.Settle()
 +}
 +
 +// Accept accepts and settles a delivery.
 +func (d Delivery) Accept() { d.SettleAs(Accepted) }
 +
 +// Reject rejects and settles a delivery
 +func (d Delivery) Reject() { d.SettleAs(Rejected) }
 +
 +// Release releases and settles a delivery
 +// If delivered is true the delivery count for the message will be increased.
 +func (d Delivery) Release(delivered bool) {
 +	if delivered {
 +		d.SettleAs(Modified)
 +	} else {
 +		d.SettleAs(Released)
 +	}
 +}
 +
 +type DeliveryTag struct{ pn C.pn_delivery_tag_t }
 +
 +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size))
}
 +
 +func (l Link) Recv(buf []byte) int {
 +	if len(buf) == 0 {
 +		return 0
 +	}
 +	return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
 +}
 +
 +func (l Link) SendBytes(bytes []byte) int {
 +	return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes)))
 +}
 +
 +func pnTag(tag string) C.pn_delivery_tag_t {
 +	bytes := []byte(tag)
 +	return C.pn_dtag(cPtr(bytes), cLen(bytes))
 +}
 +
 +func (l Link) Delivery(tag string) Delivery {
 +	return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
 +}
 +
 +func (l Link) Connection() Connection { return l.Session().Connection() }
 +
 +// Human-readable link description including name, source, target and direction.
 +func (l Link) String() string {
 +	switch {
 +	case l.IsNil():
 +		return fmt.Sprintf("<nil-link>")
 +	case l.IsSender():
 +		return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
 +	default:
 +		return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
 +	}
 +}
 +
 +func (l Link) Type() string {
 +	if l.IsSender() {
 +		return "sender-link"
 +	} else {
 +		return "receiver-link"
 +	}
 +}
 +
 +// IsDrain calls pn_link_get_drain(), it conflicts with pn_link_drain() under the normal
mapping.
 +func (l Link) IsDrain() bool {
 +	return bool(C.pn_link_get_drain(l.pn))
 +}
 +
 +func cPtr(b []byte) *C.char {
 +	if len(b) == 0 {
 +		return nil
 +	}
 +	return (*C.char)(unsafe.Pointer(&b[0]))
 +}
 +
 +func cLen(b []byte) C.size_t {
 +	return C.size_t(len(b))
 +}
 +
 +func (s Session) Sender(name string) Link {
 +	cname := C.CString(name)
 +	defer C.free(unsafe.Pointer(cname))
 +	return Link{C.pn_sender(s.pn, cname)}
 +}
 +
 +func (s Session) Receiver(name string) Link {
 +	cname := C.CString(name)
 +	defer C.free(unsafe.Pointer(cname))
 +	return Link{C.pn_receiver(s.pn, cname)}
 +}
 +
 +func (t Transport) String() string {
 +	return fmt.Sprintf("(Transport)(%p)", t.CPtr())
 +}
 +
 +// Unique (per process) string identifier for a connection, useful for debugging.
 +func (c Connection) String() string {
 +	// Use the transport address to match the default transport logs from PN_TRACE.
 +	return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr())
 +}
 +
 +func (c Connection) Type() string {
 +	return "connection"
 +}
 +
 +// Head functions don't follow the normal naming conventions so missed by the generator.
 +
 +func (c Connection) LinkHead(s State) Link {
 +	return Link{C.pn_link_head(c.pn, C.pn_state_t(s))}
 +}
 +
 +func (c Connection) SessionHead(s State) Session {
 +	return Session{C.pn_session_head(c.pn, C.pn_state_t(s))}
 +}
 +
 +func (c Connection) Links(state State) (links []Link) {
 +	for l := c.LinkHead(state); !l.IsNil(); l = l.Next(state) {
 +		links = append(links, l)
 +	}
 +	return
 +}
 +
 +func (c Connection) Sessions(state State) (sessions []Session) {
 +	for s := c.SessionHead(state); !s.IsNil(); s = s.Next(state) {
 +		sessions = append(sessions, s)
 +	}
 +	return
 +}
 +
 +// SetPassword takes []byte not string because it is impossible to erase a string
 +// from memory reliably. Proton will not keep the password in memory longer than
 +// needed, the caller should overwrite their copy on return.
 +//
 +// The password must not contain embedded nul characters, a trailing nul is ignored.
 +func (c Connection) SetPassword(password []byte) {
 +	if len(password) == 0 || password[len(password)-1] != 0 {
 +		password = append(password, 0) // Proton requires a terminating null.
 +	}
 +	C.pn_connection_set_password(c.pn, (*C.char)(unsafe.Pointer(&password[0])))
 +}
 +
 +func (s Session) String() string {
 +	return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: should print channel
number.
 +}
 +
 +func (s Session) Type() string { return "session" }
 +
 +// Error returns an instance of amqp.Error or nil.
 +func (c Condition) Error() error {
 +	if c.IsNil() || !c.IsSet() {
 +		return nil
 +	}
 +	return amqp.Error{Name: c.Name(), Description: c.Description()}
 +}
 +
 +// Set a Go error into a condition.
 +// If it is not an amqp.Condition use the error type as name, error string as description.
 +func (c Condition) SetError(err error) {
 +	if err != nil {
 +		if cond, ok := err.(amqp.Error); ok {
 +			c.SetName(cond.Name)
 +			c.SetDescription(cond.Description)
 +		} else {
 +			c.SetName(reflect.TypeOf(err).Name())
 +			c.SetDescription(err.Error())
 +		}
 +	}
 +}
 +
 +func (c Connection) Session() (Session, error) {
 +	s := Session{C.pn_session(c.pn)}
 +	if s.IsNil() {
 +		return s, Connection(c).Error()
 +	}
 +	return s, nil
 +}
 +
 +// pnTime converts Go time.Time to Proton millisecond Unix time.
 +//
 +// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These
 +// are used as "not set" sentinel values by the Go and Proton APIs, so it is
 +// better to conserve the "zeroness" even though they don't represent the same
 +// time instant.
 +//
 +func pnTime(t time.Time) (pnt C.pn_timestamp_t) {
 +	if !t.IsZero() {
 +		pnt = C.pn_timestamp_t(t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond))
 +	}
 +	return
 +}
 +
 +// goTime converts a pn_timestamp_t to a Go time.Time.
 +//
 +// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and
 +// vice-versa. These are used as "not set" sentinel values by the Go and Proton
 +// APIs, so it is better to conserve the "zeroness" even though they don't
 +// represent the same time instant.
 +//
 +func goTime(pnt C.pn_timestamp_t) (t time.Time) {
 +	if pnt != 0 {
 +		t = time.Unix(int64(pnt/1000), int64(pnt%1000)*int64(time.Millisecond))
 +	}
 +	return
 +}
 +
 +// Special treatment for Transport.Head, return value is unsafe.Pointer not string
 +func (t Transport) Head() unsafe.Pointer {
 +	return unsafe.Pointer(C.pn_transport_head(t.pn))
 +}
 +
 +// Special treatment for Transport.Tail, return value is unsafe.Pointer not string
 +func (t Transport) Tail() unsafe.Pointer {
 +	return unsafe.Pointer(C.pn_transport_tail(t.pn))
 +}
 +
 +// Special treatment for Transport.Push, takes []byte instead of char*, size
 +func (t Transport) Push(bytes []byte) int {
 +	return int(C.pn_transport_push(t.pn, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes))))
 +}
 +
 +// Get the SASL object for the transport.
 +func (t Transport) SASL() SASL {
 +	return SASL{C.pn_sasl(t.pn)}
 +}
 +
 +// Do we support extended SASL negotiation?
 +// All implementations of Proton support ANONYMOUS and EXTERNAL on both
 +// client and server sides and PLAIN on the client side.
 +//
 +// Extended SASL implememtations use an external library (Cyrus SASL)
 +// to support other mechanisms beyond these basic ones.
 +func SASLExtended() bool {
 +	return bool(C.pn_sasl_extended())
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message