qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [49/50] [abbrv] qpid-proton git commit: PROTON-1338: Go: update `go get`
Date Wed, 02 Nov 2016 02:54:09 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/electron_test.go
----------------------------------------------------------------------
diff --cc electron/electron_test.go
index 0000000,0000000..294e952
new file mode 100644
--- /dev/null
+++ b/electron/electron_test.go
@@@ -1,0 -1,0 +1,546 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"fmt"
++	"net"
++	"path"
++	"qpid.apache.org/amqp"
++	"reflect"
++	"runtime"
++	"testing"
++	"time"
++)
++
++func fatalIf(t *testing.T, err error) {
++	if err != nil {
++		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
++		if ok {
++			_, file = path.Split(file)
++		}
++		t.Fatalf("(from %s:%d) %v", file, line, err)
++	}
++}
++
++func errorIf(t *testing.T, err error) {
++	if err != nil {
++		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
++		if ok {
++			_, file = path.Split(file)
++		}
++		t.Errorf("(from %s:%d) %v", file, line, err)
++	}
++}
++
++func checkEqual(want interface{}, got interface{}) error {
++	if !reflect.DeepEqual(want, got) {
++		return fmt.Errorf("%#v != %#v", want, got)
++	}
++	return nil
++}
++
++// Start a server, return listening addr and channel for incoming Connections.
++func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
++	listener, err := net.Listen("tcp", "")
++	fatalIf(t, err)
++	addr := listener.Addr()
++	ch := make(chan Connection)
++	go func() {
++		conn, err := listener.Accept()
++		c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
++		fatalIf(t, err)
++		ch <- c
++	}()
++	return addr, ch
++}
++
++// Open a client connection and session, return the session.
++func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
++	conn, err := net.Dial(addr.Network(), addr.String())
++	fatalIf(t, err)
++	c, err := cont.Connection(conn, opts...)
++	fatalIf(t, err)
++	sn, err := c.Session()
++	fatalIf(t, err)
++	return sn
++}
++
++// Return client and server ends of the same connection.
++func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
++	addr, ch := newServer(t, NewContainer("test-server"), sopts...)
++	client = newClient(t, NewContainer("test-client"), addr, copts...)
++	return client, <-ch
++}
++
++// Return client and server ends of the same connection.
++func newClientServer(t *testing.T) (client Session, server Connection) {
++	return newClientServerOpts(t, nil, nil)
++}
++
++// Close client and server
++func closeClientServer(client Session, server Connection) {
++	client.Connection().Close(nil)
++	server.Close(nil)
++}
++
++// Send a message one way with a client sender and server receiver, verify ack.
++func TestClientSendServerReceive(t *testing.T) {
++	nLinks := 3
++	nMessages := 3
++
++	rchan := make(chan Receiver, nLinks)
++	client, server := newClientServer(t)
++	go func() {
++		for in := range server.Incoming() {
++			switch in := in.(type) {
++			case *IncomingReceiver:
++				in.SetCapacity(1)
++				in.SetPrefetch(false)
++				rchan <- in.Accept().(Receiver)
++			default:
++				in.Accept()
++			}
++		}
++	}()
++
++	defer func() { closeClientServer(client, server) }()
++
++	s := make([]Sender, nLinks)
++	for i := 0; i < nLinks; i++ {
++		var err error
++		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
++		if err != nil {
++			t.Fatal(err)
++		}
++	}
++	r := make([]Receiver, nLinks)
++	for i := 0; i < nLinks; i++ {
++		r[i] = <-rchan
++	}
++
++	for i := 0; i < nLinks; i++ {
++		for j := 0; j < nMessages; j++ {
++			// Client send
++			ack := make(chan Outcome, 1)
++			sendDone := make(chan struct{})
++			go func() {
++				defer close(sendDone)
++				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
++				var err error
++				s[i].SendAsync(m, ack, "testing")
++				if err != nil {
++					t.Fatal(err)
++				}
++			}()
++
++			// Server recieve
++			rm, err := r[i].Receive()
++			if err != nil {
++				t.Fatal(err)
++			}
++			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
++				t.Errorf("%#v != %#v", want, got)
++			}
++
++			// Should not be acknowledged on client yet
++			<-sendDone
++			select {
++			case <-ack:
++				t.Errorf("unexpected ack")
++			default:
++			}
++
++			// Server send ack
++			if err := rm.Reject(); err != nil {
++				t.Error(err)
++			}
++			// Client get ack.
++			if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
++				t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
++			}
++		}
++	}
++}
++
++func TestClientReceiver(t *testing.T) {
++	nMessages := 3
++	client, server := newClientServer(t)
++	go func() {
++		for in := range server.Incoming() {
++			switch in := in.(type) {
++			case *IncomingSender:
++				s := in.Accept().(Sender)
++				go func() {
++					for i := int32(0); i < int32(nMessages); i++ {
++						out := s.SendSync(amqp.NewMessageWith(i))
++						if out.Error != nil {
++							t.Error(out.Error)
++							return
++						}
++					}
++					s.Close(nil)
++				}()
++			default:
++				in.Accept()
++			}
++		}
++	}()
++
++	r, err := client.Receiver(Source("foo"))
++	if err != nil {
++		t.Fatal(err)
++	}
++	for i := int32(0); i < int32(nMessages); i++ {
++		rm, err := r.Receive()
++		if err != nil {
++			if err != Closed {
++				t.Error(err)
++			}
++			break
++		}
++		if err := rm.Accept(); err != nil {
++			t.Error(err)
++		}
++		if b, ok := rm.Message.Body().(int32); !ok || b != i {
++			t.Errorf("want %v, true got %v, %v", i, b, ok)
++		}
++	}
++	server.Close(nil)
++	client.Connection().Close(nil)
++}
++
++// Test timeout versions of waiting functions.
++func TestTimeouts(t *testing.T) {
++	var err error
++	rchan := make(chan Receiver, 1)
++	client, server := newClientServer(t)
++	go func() {
++		for i := range server.Incoming() {
++			switch i := i.(type) {
++			case *IncomingReceiver:
++				i.SetCapacity(1)
++				i.SetPrefetch(false)
++				rchan <- i.Accept().(Receiver) // Issue credit only on receive
++			default:
++				i.Accept()
++			}
++		}
++	}()
++	defer func() { closeClientServer(client, server) }()
++
++	// Open client sender
++	snd, err := client.Sender(Target("test"))
++	if err != nil {
++		t.Fatal(err)
++	}
++	rcv := <-rchan
++
++	// Test send with timeout
++	short := time.Millisecond
++	long := time.Second
++	m := amqp.NewMessage()
++	if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// Test receive with timeout
++	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// Test receive with timeout
++	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// There is now a credit on the link due to receive
++	ack := make(chan Outcome)
++	snd.SendAsyncTimeout(m, ack, nil, short)
++	// Disposition should timeout
++	select {
++	case <-ack:
++		t.Errorf("want Timeout got %#v", ack)
++	case <-time.After(short):
++	}
++
++	// Receive and accept
++	rm, err := rcv.ReceiveTimeout(long)
++	if err != nil {
++		t.Fatal(err)
++	}
++	if err := rm.Accept(); err != nil {
++		t.Fatal(err)
++	}
++	// Sender get ack
++	if a := <-ack; a.Status != Accepted || a.Error != nil {
++		t.Errorf("want (accepted, nil) got %#v", a)
++	}
++}
++
++// A server that returns the opposite end of each client link via channels.
++type pairs struct {
++	t        *testing.T
++	client   Session
++	server   Connection
++	rchan    chan Receiver
++	schan    chan Sender
++	capacity int
++	prefetch bool
++}
++
++func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
++	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
++	p.client, p.server = newClientServer(t)
++	go func() {
++		for i := range p.server.Incoming() {
++			switch i := i.(type) {
++			case *IncomingReceiver:
++				i.SetCapacity(capacity)
++				i.SetPrefetch(prefetch)
++				p.rchan <- i.Accept().(Receiver)
++			case *IncomingSender:
++				p.schan <- i.Accept().(Sender)
++			default:
++				i.Accept()
++			}
++		}
++	}()
++	return p
++}
++
++func (p *pairs) close() {
++	closeClientServer(p.client, p.server)
++}
++
++// Return a client sender and server receiver
++func (p *pairs) senderReceiver() (Sender, Receiver) {
++	snd, err := p.client.Sender()
++	fatalIf(p.t, err)
++	rcv := <-p.rchan
++	return snd, rcv
++}
++
++// Return a client receiver and server sender
++func (p *pairs) receiverSender() (Receiver, Sender) {
++	rcv, err := p.client.Receiver()
++	fatalIf(p.t, err)
++	snd := <-p.schan
++	return rcv, snd
++}
++
++type result struct {
++	label string
++	err   error
++	value interface{}
++}
++
++func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
++
++func doSend(snd Sender, results chan result) {
++	err := snd.SendSync(amqp.NewMessage()).Error
++	results <- result{"send", err, nil}
++}
++
++func doReceive(rcv Receiver, results chan result) {
++	msg, err := rcv.Receive()
++	results <- result{"receive", err, msg}
++}
++
++func doDisposition(ack <-chan Outcome, results chan result) {
++	results <- result{"disposition", (<-ack).Error, nil}
++}
++
++// Senders get credit immediately if receivers have prefetch set
++func TestSendReceivePrefetch(t *testing.T) {
++	pairs := newPairs(t, 1, true)
++	s, r := pairs.senderReceiver()
++	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
++	if _, err := r.Receive(); err != nil {
++		t.Error(err)
++	}
++}
++
++// Senders do not get credit till Receive() if receivers don't have prefetch
++func TestSendReceiveNoPrefetch(t *testing.T) {
++	pairs := newPairs(t, 1, false)
++	s, r := pairs.senderReceiver()
++	done := make(chan struct{}, 1)
++	go func() {
++		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
++		close(done)
++	}()
++	select {
++	case <-done:
++		t.Errorf("send should be blocked on credit")
++	default:
++		if _, err := r.Receive(); err != nil {
++			t.Error(err)
++		} else {
++			<-done
++		} // Should be unblocked now
++	}
++}
++
++// Test that closing Links interrupts blocked link functions.
++func TestLinkCloseInterrupt(t *testing.T) {
++	want := amqp.Error{Name: "x", Description: "all bad"}
++	pairs := newPairs(t, 1, false)
++	results := make(chan result) // Collect expected errors
++
++	// Note closing the link does not interrupt Send() calls, the AMQP spec says
++	// that deliveries can be settled after the link is closed.
++
++	// Receiver.Close() interrupts Receive()
++	snd, rcv := pairs.senderReceiver()
++	go doReceive(rcv, results)
++	rcv.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++
++	// Remote Sender.Close() interrupts Receive()
++	snd, rcv = pairs.senderReceiver()
++	go doReceive(rcv, results)
++	snd.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++}
++
++// Test closing the server end of a connection.
++func TestConnectionCloseInterrupt1(t *testing.T) {
++	want := amqp.Error{Name: "x", Description: "bad"}
++	pairs := newPairs(t, 1, true)
++	results := make(chan result) // Collect expected errors
++
++	// Connection.Close() interrupts Send, Receive, Disposition.
++	snd, rcv := pairs.senderReceiver()
++	go doSend(snd, results)
++
++	if _, err := rcv.Receive(); err != nil {
++		t.Error("receive", err)
++	}
++	rcv, snd = pairs.receiverSender()
++	go doReceive(rcv, results)
++
++	snd, rcv = pairs.senderReceiver()
++	ack := snd.SendWaitable(amqp.NewMessage())
++	if _, err := rcv.Receive(); err != nil {
++		t.Error("receive", err)
++	}
++	go doDisposition(ack, results)
++
++	pairs.server.Close(want)
++	for i := 0; i < 3; i++ {
++		if r := <-results; want != r.err {
++			t.Errorf("want %v got %v", want, r)
++		}
++	}
++}
++
++// Test closing the client end of the connection.
++func TestConnectionCloseInterrupt2(t *testing.T) {
++	want := amqp.Error{Name: "x", Description: "bad"}
++	pairs := newPairs(t, 1, true)
++	results := make(chan result) // Collect expected errors
++
++	// Connection.Close() interrupts Send, Receive, Disposition.
++	snd, rcv := pairs.senderReceiver()
++	go doSend(snd, results)
++	if _, err := rcv.Receive(); err != nil {
++		t.Error("receive", err)
++	}
++
++	rcv, snd = pairs.receiverSender()
++	go doReceive(rcv, results)
++
++	snd, rcv = pairs.senderReceiver()
++	ack := snd.SendWaitable(amqp.NewMessage())
++	go doDisposition(ack, results)
++
++	pairs.client.Connection().Close(want)
++	for i := 0; i < 3; i++ {
++		if r := <-results; want != r.err {
++			t.Errorf("want %v got %v", want, r.err)
++		}
++	}
++}
++
++func heartbeat(c Connection) time.Duration {
++	return c.(*connection).engine.Transport().RemoteIdleTimeout()
++}
++
++func TestHeartbeat(t *testing.T) {
++	client, server := newClientServerOpts(t,
++		[]ConnectionOption{Heartbeat(102 * time.Millisecond)},
++		nil)
++	defer closeClientServer(client, server)
++
++	var serverHeartbeat time.Duration
++
++	go func() {
++		for in := range server.Incoming() {
++			switch in := in.(type) {
++			case *IncomingConnection:
++				serverHeartbeat = in.Heartbeat()
++				in.AcceptConnection(Heartbeat(101 * time.Millisecond))
++			default:
++				in.Accept()
++			}
++		}
++	}()
++
++	// Freeze the server to stop it sending heartbeats.
++	unfreeze := make(chan bool)
++	defer close(unfreeze)
++	freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
++
++	fatalIf(t, client.Sync())
++	errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
++	errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
++	errorIf(t, client.Connection().Error())
++
++	// Freeze the server for less than a heartbeat
++	fatalIf(t, freeze())
++	time.Sleep(50 * time.Millisecond)
++	unfreeze <- true
++	// Make sure server is still responding.
++	s, err := client.Sender()
++	errorIf(t, err)
++	errorIf(t, s.Sync())
++
++	// Freeze the server till the client times out the connection
++	fatalIf(t, freeze())
++	select {
++	case <-client.Done():
++		if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
++			t.Error("bad timeout error:", client.Error())
++		}
++	case <-time.After(400 * time.Millisecond):
++		t.Error("connection failed to time out")
++	}
++
++	unfreeze <- true // Unfreeze the server
++	<-server.Done()
++	if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
++		t.Error("bad timeout error:", server.Error())
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index fc701c6,0000000..ca93e5b
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,102 -1,0 +1,182 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
++	"fmt"
 +	"io"
 +	"qpid.apache.org/proton"
 +)
 +
 +// Closed is an alias for io.EOF. It is returned as an error when an endpoint
 +// was closed cleanly.
 +var Closed = io.EOF
 +
- // Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
++// Endpoint is the local end of a communications channel to the remote peer
++// process.  The following interface implement Endpoint: Connection, Session,
++// Sender and Receiver.
 +//
- // Endpoints can be created locally or by the remote peer. You must Open() an
- // endpoint before you can use it. Some endpoints have additional Set*() methods
- // that must be called before Open() to take effect, see Connection, Session,
- // Link, Sender and Receiver for details.
++// You can create an endpoint with functions on Container, Connection and
++// Session. You can accept incoming endpoints from the remote peer using
++// Connection.Incoming()
 +//
 +type Endpoint interface {
 +	// Close an endpoint and signal an error to the remote end if error != nil.
 +	Close(error)
 +
 +	// String is a human readable identifier, useful for debugging and logging.
 +	String() string
 +
 +	// Error returns nil if the endpoint is open, otherwise returns an error.
 +	// Error() == Closed means the endpoint was closed without error.
 +	Error() error
 +
- 	// Connection containing the endpoint
++	// Connection is the connection associated with this endpoint.
 +	Connection() Connection
 +
 +	// Done returns a channel that will close when the endpoint closes.
- 	// Error() will contain the reason.
++	// After Done() has closed, Error() will return the reason for closing.
 +	Done() <-chan struct{}
 +
++	// Sync() waits for the remote peer to confirm the endpoint is active or
++	// reject it with an error. You can call it immediately on new endpoints
++	// for more predictable error handling.
++	//
++	// AMQP is an asynchronous protocol. It is legal to create an endpoint and
++	// start using it without waiting for confirmation. This avoids a needless
++	// delay in the non-error case and throughput by "assuming the best".
++	//
++	// However if there *is* an error, these "optimistic" actions will fail. The
++	// endpoint and its children will be closed with an error. The error will only
++	// be detected when you try to use one of these endpoints or call Sync()
++	Sync() error
++
 +	// Called in handler goroutine when endpoint is remotely closed.
 +	closed(err error) error
++	wakeSync()
 +}
 +
- // DEVELOPER NOTES
- //
- // An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
- //
++// Base implementation for Endpoint
 +type endpoint struct {
- 	err  proton.ErrorHolder
- 	str  string // Must be set by the value that embeds endpoint.
- 	done chan struct{}
++	err    proton.ErrorHolder
++	str    string // String() return value.
++	done   chan struct{}
++	active chan struct{}
 +}
 +
- func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
++func (e *endpoint) init(s string) {
++	e.str = s
++	e.done = make(chan struct{})
++	e.active = make(chan struct{})
++}
++
++// Called in proton goroutine on remote open.
++func (e *endpoint) wakeSync() {
++	select { // Close active channel if not already closed.
++	case <-e.active:
++	default:
++		close(e.active)
++	}
++}
 +
- // Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
- // proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
- // the pointer has not been invalidated.
++// Called in proton goroutine (from handler) on a Closed or Disconnected event.
 +//
- // Returns the error stored on the endpoint, which may not be different to err if there was
- // already a n error
++// Set err if there is not already an error on the endpoint.
++// Return Error()
 +func (e *endpoint) closed(err error) error {
 +	select {
 +	case <-e.done:
 +		// Already closed
 +	default:
 +		e.err.Set(err)
 +		e.err.Set(Closed)
++		e.wakeSync() // Make sure we wake up Sync()
 +		close(e.done)
 +	}
- 	return e.err.Get()
++	return e.Error()
 +}
 +
 +func (e *endpoint) String() string { return e.str }
 +
 +func (e *endpoint) Error() error { return e.err.Get() }
 +
 +func (e *endpoint) Done() <-chan struct{} { return e.done }
 +
++func (e *endpoint) Sync() error {
++	<-e.active
++	return e.Error()
++}
++
 +// Call in proton goroutine to initiate closing an endpoint locally
 +// handler will complete the close when remote end closes.
 +func localClose(ep proton.Endpoint, err error) {
 +	if ep.State().LocalActive() {
 +		proton.CloseError(ep, err)
 +	}
 +}
++
++// Incoming is the interface for incoming endpoints, see Connection.Incoming()
++//
++// Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it
++// with optional error
++//
++// Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender
++// and *IncomingReceiver. Each type provides methods to examine the incoming
++// endpoint request and set configuration options for the local endpoint
++// before calling Accept() or Reject()
++type Incoming interface {
++	// Accept and open the endpoint.
++	Accept() Endpoint
++
++	// Reject the endpoint with an error
++	Reject(error)
++
++	// wait for and call the accept function, call in proton goroutine.
++	wait() error
++	pEndpoint() proton.Endpoint
++}
++
++type incoming struct {
++	pep      proton.Endpoint
++	acceptCh chan func() error
++}
++
++func makeIncoming(e proton.Endpoint) incoming {
++	return incoming{pep: e, acceptCh: make(chan func() error)}
++}
++
++func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", in.pep.Type(), in.pep) }
++func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
++
++// Call in proton goroutine, wait for and call the accept function.
++func (in *incoming) wait() error { return (<-in.acceptCh)() }
++
++func (in *incoming) pEndpoint() proton.Endpoint { return in.pep }
++
++// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
++func (in *incoming) accept(f func() Endpoint) Endpoint {
++	done := make(chan Endpoint)
++	in.acceptCh <- func() error {
++		ep := f()
++		done <- ep
++		return nil
++	}
++	return <-done
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/ex_client_server_test.go
----------------------------------------------------------------------
diff --cc electron/ex_client_server_test.go
index 0000000,0000000..93f275b
new file mode 100644
--- /dev/null
+++ b/electron/ex_client_server_test.go
@@@ -1,0 -1,0 +1,81 @@@
++package electron_test
++
++import (
++	"fmt"
++	"net"
++	"qpid.apache.org/amqp"
++	"qpid.apache.org/electron"
++)
++
++//  Print errors
++func check(msg string, err error) bool {
++	if err != nil {
++		fmt.Printf("%s: %s\n", msg, err)
++	}
++	return err == nil
++}
++
++func runServer(cont electron.Container, l net.Listener) {
++	for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) {
++		go func() { // Process connections concurrently, accepting AMQP endpoints
++			for in := range c.Incoming() {
++				ep := in.Accept() // Accept all endpoints
++				go func() {       // Process endpoints concurrently
++					switch ep := ep.(type) {
++					case electron.Sender:
++						m := amqp.NewMessageWith("hello yourself")
++						fmt.Printf("server %q sending %q\n", ep.Source(), m.Body())
++						ep.SendForget(m) // One-way send, client does not need to Accept.
++					case electron.Receiver:
++						if rm, err := ep.Receive(); check("server receive", err) {
++							fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body())
++							err := rm.Accept() // Client is waiting for Accept.
++							check("accept message", err)
++						}
++					}
++				}()
++			}
++		}()
++	}
++}
++
++func startServer() (addr net.Addr) {
++	cont := electron.NewContainer("server")
++	if l, err := net.Listen("tcp", ""); check("listen", err) {
++		addr = l.Addr()
++		go runServer(cont, l)
++	}
++	return addr
++}
++
++// Connect to addr and send/receive a message.
++func client(addr net.Addr) {
++	if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) {
++		defer c.Close(nil)
++		if s, err := c.Sender(electron.Target("target")); check("sender", err) {
++			fmt.Printf("client sending\n")
++			s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept()
++		}
++		if r, err := c.Receiver(electron.Source("source")); check("receiver", err) {
++			if rm, err := r.Receive(); err == nil {
++				fmt.Printf("client received %q\n", rm.Message.Body())
++			}
++		}
++	}
++}
++
++// Example client and server communicating via AMQP over a TCP/IP connection.
++//
++// Normally client and server would be separate processes.
++// For more realistic examples:
++//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
++//
++func Example_clientServer() {
++	addr := startServer()
++	client(addr)
++	// Output:
++	// client sending
++	// server "target" received "hello"
++	// server "source" sending "hello yourself"
++	// client received "hello yourself"
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index eb53df3,0000000..af1efd6
mode 100644,000000..100644
--- a/electron/handler.go
+++ b/electron/handler.go
@@@ -1,187 -1,0 +1,201 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"qpid.apache.org/amqp"
 +	"qpid.apache.org/proton"
 +)
 +
 +// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
 +
 +type handler struct {
 +	delegator    *proton.MessagingAdapter
 +	connection   *connection
 +	links        map[proton.Link]Endpoint
 +	sentMessages map[proton.Delivery]sentMessage
 +	sessions     map[proton.Session]*session
 +}
 +
 +func newHandler(c *connection) *handler {
 +	h := &handler{
 +		connection:   c,
 +		links:        make(map[proton.Link]Endpoint),
 +		sentMessages: make(map[proton.Delivery]sentMessage),
 +		sessions:     make(map[proton.Session]*session),
 +	}
 +	h.delegator = proton.NewMessagingAdapter(h)
 +	// Disable auto features of MessagingAdapter, we do these ourselves.
 +	h.delegator.Prefetch = 0
 +	h.delegator.AutoAccept = false
 +	h.delegator.AutoSettle = false
 +	h.delegator.AutoOpen = false
 +	return h
 +}
 +
 +func (h *handler) linkError(l proton.Link, msg string) {
 +	proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
 +}
 +
 +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
 +	switch t {
 +
 +	case proton.MMessage:
 +		if r, ok := h.links[e.Link()].(*receiver); ok {
 +			r.message(e.Delivery())
 +		} else {
 +			h.linkError(e.Link(), "no receiver")
 +		}
 +
 +	case proton.MSettled:
 +		if sm, ok := h.sentMessages[e.Delivery()]; ok {
 +			d := e.Delivery().Remote()
 +			sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
 +			delete(h.sentMessages, e.Delivery())
 +		}
 +
 +	case proton.MSendable:
 +		if s, ok := h.links[e.Link()].(*sender); ok {
 +			s.sendable()
 +		} else {
 +			h.linkError(e.Link(), "no sender")
 +		}
 +
++	case proton.MConnectionOpening:
++		h.connection.heartbeat = e.Transport().RemoteIdleTimeout()
++		if e.Connection().State().LocalUninit() { // Remotely opened
++			h.incoming(newIncomingConnection(h.connection))
++		}
++		h.connection.wakeSync()
++
 +	case proton.MSessionOpening:
 +		if e.Session().State().LocalUninit() { // Remotely opened
 +			h.incoming(newIncomingSession(h, e.Session()))
 +		}
++		h.sessions[e.Session()].wakeSync()
 +
 +	case proton.MSessionClosed:
 +		h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 +
 +	case proton.MLinkOpening:
 +		l := e.Link()
- 		if l.State().LocalActive() { // Already opened locally.
- 			break
- 		}
- 		ss := h.sessions[l.Session()]
- 		if ss == nil {
- 			h.linkError(e.Link(), "no session")
- 			break
- 		}
- 		if l.IsReceiver() {
- 			h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
++		if ss := h.sessions[l.Session()]; ss != nil {
++			if l.State().LocalUninit() { // Remotely opened.
++				if l.IsReceiver() {
++					h.incoming(newIncomingReceiver(ss, l))
++				} else {
++					h.incoming(newIncomingSender(ss, l))
++				}
++			}
++			if ep, ok := h.links[l]; ok {
++				ep.wakeSync()
++			} else {
++				h.linkError(l, "no link")
++			}
 +		} else {
- 			h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
++			h.linkError(l, "no session")
 +		}
 +
 +	case proton.MLinkClosing:
 +		e.Link().Close()
 +
 +	case proton.MLinkClosed:
 +		h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
 +
 +	case proton.MConnectionClosing:
 +		h.connection.err.Set(e.Connection().RemoteCondition().Error())
 +
 +	case proton.MConnectionClosed:
- 		h.connectionClosed(proton.EndpointError(e.Connection()))
++		h.shutdown(proton.EndpointError(e.Connection()))
 +
 +	case proton.MDisconnected:
- 		h.connection.err.Set(e.Transport().Condition().Error())
- 		// If err not set at this point (e.g. to Closed) then this is unexpected.
- 		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
- 
- 		err := h.connection.Error()
- 
- 		for l, _ := range h.links {
- 			h.linkClosed(l, err)
- 		}
- 		h.links = nil
- 		for _, s := range h.sessions {
- 			s.closed(err)
- 		}
- 		h.sessions = nil
- 		for _, sm := range h.sentMessages {
- 			sm.ack <- Outcome{Unacknowledged, err, sm.value}
++		err := e.Transport().Condition().Error()
++		if err == nil {
++			err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
 +		}
- 		h.sentMessages = nil
++		h.shutdown(err)
 +	}
 +}
 +
 +func (h *handler) incoming(in Incoming) {
 +	var err error
 +	if h.connection.incoming != nil {
 +		h.connection.incoming <- in
++		// Must block until accept/reject, subsequent events may use the incoming endpoint.
 +		err = in.wait()
 +	} else {
 +		err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
 +			in.pEndpoint().Type(), in.pEndpoint().String())
 +	}
 +	if err == nil {
 +		in.pEndpoint().Open()
 +	} else {
 +		proton.CloseError(in.pEndpoint(), err)
 +	}
 +}
 +
 +func (h *handler) addLink(pl proton.Link, el Endpoint) {
 +	h.links[pl] = el
 +}
 +
 +func (h *handler) linkClosed(l proton.Link, err error) {
 +	if link, ok := h.links[l]; ok {
- 		link.closed(err)
++		_ = link.closed(err)
 +		delete(h.links, l)
++		l.Free()
 +	}
 +}
 +
 +func (h *handler) sessionClosed(ps proton.Session, err error) {
 +	if s, ok := h.sessions[ps]; ok {
 +		delete(h.sessions, ps)
 +		err = s.closed(err)
 +		for l, _ := range h.links {
 +			if l.Session() == ps {
 +				h.linkClosed(l, err)
 +			}
 +		}
++		ps.Free()
 +	}
 +}
 +
- func (h *handler) connectionClosed(err error) {
++func (h *handler) shutdown(err error) {
 +	err = h.connection.closed(err)
- 	// Close links first to avoid repeated scans of the link list by sessions.
- 	for l, _ := range h.links {
- 		h.linkClosed(l, err)
++	for _, sm := range h.sentMessages {
++		// Don't block but ensure outcome is sent eventually.
++		if sm.ack != nil {
++			o := Outcome{Unacknowledged, err, sm.value}
++			select {
++			case sm.ack <- o:
++			default:
++				go func(ack chan<- Outcome) { ack <- o }(sm.ack) // Deliver it eventually
++			}
++		}
++	}
++	h.sentMessages = nil
++	for _, l := range h.links {
++		_ = l.closed(err)
 +	}
- 	for s, _ := range h.sessions {
- 		h.sessionClosed(s, err)
++	h.links = nil
++	for _, s := range h.sessions {
++		_ = s.closed(err)
 +	}
++	h.sessions = nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/link.go
----------------------------------------------------------------------
diff --cc electron/link.go
index 80b4d5c,0000000..1d17894
mode 100644,000000..100644
--- a/electron/link.go
+++ b/electron/link.go
@@@ -1,233 -1,0 +1,221 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"fmt"
 +	"qpid.apache.org/proton"
 +)
 +
 +// Settings associated with a link
 +type LinkSettings interface {
 +	// Source address that messages are coming from.
 +	Source() string
 +
 +	// Target address that messages are going to.
 +	Target() string
 +
 +	// Name is a unique name for the link among links between the same
 +	// containers in the same direction. By default generated automatically.
 +	LinkName() string
 +
 +	// IsSender is true if this is the sending end of the link.
 +	IsSender() bool
 +
 +	// IsReceiver is true if this is the receiving end of the link.
 +	IsReceiver() bool
 +
 +	// SndSettle defines when the sending end of the link settles message delivery.
 +	SndSettle() SndSettleMode
 +
 +	// RcvSettle defines when the sending end of the link settles message delivery.
 +	RcvSettle() RcvSettleMode
 +
 +	// Session containing the Link
 +	Session() Session
 +}
 +
 +// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
 +type LinkOption func(*linkSettings)
 +
 +// Source returns a LinkOption that sets address that messages are coming from.
 +func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } }
 +
 +// Target returns a LinkOption that sets address that messages are going to.
 +func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
 +
 +// LinkName returns a LinkOption that sets the link name.
 +func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
 +
 +// SndSettle returns a LinkOption that sets the send settle mode
 +func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } }
 +
 +// RcvSettle returns a LinkOption that sets the send settle mode
 +func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } }
 +
 +// SndSettleMode returns a LinkOption that defines when the sending end of the
 +// link settles message delivery.
 +type SndSettleMode proton.SndSettleMode
 +
 +// Capacity returns a LinkOption that sets the link capacity
 +func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
 +
 +// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
 +func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
 +
 +// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
 +// are sent but no acknowledgment is received, messages can be lost if there is
 +// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 +func AtMostOnce() LinkOption {
 +	return func(l *linkSettings) {
 +		SndSettle(SndSettled)(l)
 +		RcvSettle(RcvFirst)(l)
 +	}
 +}
 +
 +// AtLeastOnce returns a LinkOption that requests acknowledgment for every
 +// message, acknowledgment indicates the message was definitely received. In the
 +// event of a failure, unacknowledged messages can be re-sent but there is a
 +// chance that the message will be received twice in this case.  Sets
 +// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 +func AtLeastOnce() LinkOption {
 +	return func(l *linkSettings) {
 +		SndSettle(SndUnsettled)(l)
 +		RcvSettle(RcvFirst)(l)
 +	}
 +}
 +
 +const (
 +	// Messages are sent unsettled
 +	SndUnsettled = SndSettleMode(proton.SndUnsettled)
 +	// Messages are sent already settled
 +	SndSettled = SndSettleMode(proton.SndSettled)
 +	// Sender can send either unsettled or settled messages.
 +	SendMixed = SndSettleMode(proton.SndMixed)
 +)
 +
 +// RcvSettleMode defines when the receiving end of the link settles message delivery.
 +type RcvSettleMode proton.RcvSettleMode
 +
 +const (
 +	// Receiver settles first.
 +	RcvFirst = RcvSettleMode(proton.RcvFirst)
 +	// Receiver waits for sender to settle before settling.
 +	RcvSecond = RcvSettleMode(proton.RcvSecond)
 +)
 +
 +type linkSettings struct {
 +	source    string
 +	target    string
 +	linkName  string
 +	isSender  bool
 +	sndSettle SndSettleMode
 +	rcvSettle RcvSettleMode
 +	capacity  int
 +	prefetch  bool
 +	session   *session
- 	eLink     proton.Link
++	pLink     proton.Link
 +}
 +
 +type link struct {
 +	endpoint
 +	linkSettings
 +}
 +
 +func (l *linkSettings) Source() string           { return l.source }
 +func (l *linkSettings) Target() string           { return l.target }
 +func (l *linkSettings) LinkName() string         { return l.linkName }
 +func (l *linkSettings) IsSender() bool           { return l.isSender }
 +func (l *linkSettings) IsReceiver() bool         { return !l.isSender }
 +func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
 +func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
 +
 +func (l *link) Session() Session       { return l.session }
 +func (l *link) Connection() Connection { return l.session.Connection() }
 +func (l *link) engine() *proton.Engine { return l.session.connection.engine }
 +func (l *link) handler() *handler      { return l.session.connection.handler }
 +
 +// Open a link and return the linkSettings.
 +func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) {
 +	l := linkSettings{
 +		isSender: isSender,
 +		capacity: 1,
 +		prefetch: false,
 +		session:  sn,
 +	}
 +	for _, set := range setting {
 +		set(&l)
 +	}
 +	if l.linkName == "" {
 +		l.linkName = l.session.connection.container.nextLinkName()
 +	}
 +	if l.IsSender() {
- 		l.eLink = l.session.eSession.Sender(l.linkName)
++		l.pLink = l.session.pSession.Sender(l.linkName)
 +	} else {
- 		l.eLink = l.session.eSession.Receiver(l.linkName)
++		l.pLink = l.session.pSession.Receiver(l.linkName)
 +	}
- 	if l.eLink.IsNil() {
- 		return l, fmt.Errorf("cannot create link %s", l.eLink)
++	if l.pLink.IsNil() {
++		return l, fmt.Errorf("cannot create link %s", l.pLink)
 +	}
- 	l.eLink.Source().SetAddress(l.source)
- 	l.eLink.Target().SetAddress(l.target)
- 	l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
- 	l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
- 	l.eLink.Open()
++	l.pLink.Source().SetAddress(l.source)
++	l.pLink.Target().SetAddress(l.target)
++	l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
++	l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
++	l.pLink.Open()
 +	return l, nil
 +}
 +
- type incomingLink struct {
- 	incoming
- 	linkSettings
- 	eLink proton.Link
- 	sn    *session
- }
- 
- // Set up a link from an incoming proton.Link.
- func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
- 	l := incomingLink{
- 		incoming: makeIncoming(eLink),
- 		linkSettings: linkSettings{
- 			isSender:  eLink.IsSender(),
- 			source:    eLink.RemoteSource().Address(),
- 			target:    eLink.RemoteTarget().Address(),
- 			linkName:  eLink.Name(),
- 			sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
- 			rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
- 			capacity:  1,
- 			prefetch:  false,
- 			eLink:     eLink,
- 			session:   sn,
- 		},
++func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
++	return linkSettings{
++		isSender:  pLink.IsSender(),
++		source:    pLink.RemoteSource().Address(),
++		target:    pLink.RemoteTarget().Address(),
++		linkName:  pLink.Name(),
++		sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
++		rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
++		capacity:  1,
++		prefetch:  false,
++		pLink:     pLink,
++		session:   sn,
 +	}
- 	return l
 +}
 +
 +// Not part of Link interface but use by Sender and Receiver.
 +func (l *link) Credit() (credit int, err error) {
 +	err = l.engine().InjectWait(func() error {
 +		if l.Error() != nil {
 +			return l.Error()
 +		}
- 		credit = l.eLink.Credit()
++		credit = l.pLink.Credit()
 +		return nil
 +	})
 +	return
 +}
 +
 +// Not part of Link interface but use by Sender and Receiver.
 +func (l *link) Capacity() int { return l.capacity }
 +
 +func (l *link) Close(err error) {
- 	l.engine().Inject(func() {
++	_ = l.engine().Inject(func() {
 +		if l.Error() == nil {
- 			localClose(l.eLink, err)
++			localClose(l.pLink, err)
 +		}
 +	})
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/receiver.go
----------------------------------------------------------------------
diff --cc electron/receiver.go
index f2b7a52,0000000..781fd7c
mode 100644,000000..100644
--- a/electron/receiver.go
+++ b/electron/receiver.go
@@@ -1,225 -1,0 +1,236 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"fmt"
 +	"qpid.apache.org/amqp"
 +	"qpid.apache.org/proton"
 +	"time"
 +)
 +
 +// Receiver is a Link that receives messages.
 +//
 +type Receiver interface {
 +	Endpoint
 +	LinkSettings
 +
 +	// Receive blocks until a message is available or until the Receiver is closed
 +	// and has no more buffered messages.
 +	Receive() (ReceivedMessage, error)
 +
 +	// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
 +	//
 +	// Note that that if Prefetch is false, after a Timeout the credit issued by
 +	// Receive remains on the link. It will be used by the next call to Receive.
 +	ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
 +
 +	// Prefetch==true means the Receiver will automatically issue credit to the
 +	// remote sender to keep its buffer as full as possible, i.e. it will
 +	// "pre-fetch" messages independently of the application calling
 +	// Receive(). This gives good throughput for applications that handle a
 +	// continuous stream of messages. Larger capacity may improve throughput, the
 +	// optimal value depends on the characteristics of your application.
 +	//
 +	// Prefetch==false means the Receiver will issue only issue credit when you
 +	// call Receive(), and will only issue enough credit to satisfy the calls
 +	// actually made. This gives lower throughput but will not fetch any messages
 +	// in advance. It is good for synchronous applications that need to evaluate
 +	// each message before deciding whether to receive another. The
 +	// request-response pattern is a typical example.  If you make concurrent
 +	// calls to Receive with pre-fetch disabled, you can improve performance by
 +	// setting the capacity close to the expected number of concurrent calls.
 +	//
 +	Prefetch() bool
 +
 +	// Capacity is the size (number of messages) of the local message buffer
 +	// These are messages received but not yet returned to the application by a call to Receive()
 +	Capacity() int
 +}
 +
 +// Receiver implementation
 +type receiver struct {
 +	link
 +	buffer  chan ReceivedMessage
 +	callers int
 +}
 +
 +func (r *receiver) Capacity() int  { return cap(r.buffer) }
 +func (r *receiver) Prefetch() bool { return r.prefetch }
 +
 +// Call in proton goroutine
 +func newReceiver(ls linkSettings) *receiver {
 +	r := &receiver{link: link{linkSettings: ls}}
- 	r.endpoint.init(r.link.eLink.String())
++	r.endpoint.init(r.link.pLink.String())
 +	if r.capacity < 1 {
 +		r.capacity = 1
 +	}
 +	r.buffer = make(chan ReceivedMessage, r.capacity)
- 	r.handler().addLink(r.eLink, r)
- 	r.link.eLink.Open()
++	r.handler().addLink(r.pLink, r)
++	r.link.pLink.Open()
 +	if r.prefetch {
 +		r.flow(r.maxFlow())
 +	}
 +	return r
 +}
 +
 +// Call in proton gorotine. Max additional credit we can request.
- func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.eLink.Credit() }
++func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.pLink.Credit() }
 +
 +func (r *receiver) flow(credit int) {
 +	if credit > 0 {
- 		r.eLink.Flow(credit)
++		r.pLink.Flow(credit)
 +	}
 +}
 +
 +// Inject flow check per-caller call when prefetch is off.
 +// Called with inc=1 at start of call, inc = -1 at end
 +func (r *receiver) caller(inc int) {
- 	r.engine().Inject(func() {
++	_ = r.engine().Inject(func() {
 +		r.callers += inc
- 		need := r.callers - (len(r.buffer) + r.eLink.Credit())
++		need := r.callers - (len(r.buffer) + r.pLink.Credit())
 +		max := r.maxFlow()
 +		if need > max {
 +			need = max
 +		}
 +		r.flow(need)
 +	})
 +}
 +
 +// Inject flow top-up if prefetch is enabled
 +func (r *receiver) flowTopUp() {
 +	if r.prefetch {
- 		r.engine().Inject(func() { r.flow(r.maxFlow()) })
++		_ = r.engine().Inject(func() { r.flow(r.maxFlow()) })
 +	}
 +}
 +
- // Not claled
 +func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 +	return r.ReceiveTimeout(Forever)
 +}
 +
- func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) {
++func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
 +	assert(r.buffer != nil, "Receiver is not open: %s", r)
- 	select { // Check for immediate availability
- 	case rm := <-r.buffer:
- 		r.flowTopUp()
- 		return rm, nil
- 	default:
- 	}
 +	if !r.prefetch { // Per-caller flow control
- 		r.caller(+1)
- 		defer r.caller(-1)
++		select { // Check for immediate availability, avoid caller() inject
++		case rm2, ok := <-r.buffer:
++			if ok {
++				rm = rm2
++			} else {
++				err = r.Error()
++			}
++			return
++		default: // Not immediately available, inject caller() counts
++			r.caller(+1)
++			defer r.caller(-1)
++		}
 +	}
 +	rmi, err := timedReceive(r.buffer, timeout)
 +	switch err {
 +	case nil:
 +		r.flowTopUp()
- 		return rmi.(ReceivedMessage), err
++		rm = rmi.(ReceivedMessage)
 +	case Closed:
- 		return ReceivedMessage{}, r.Error()
- 	default:
- 		return ReceivedMessage{}, err
++		err = r.Error()
 +	}
++	return
 +}
 +
 +// Called in proton goroutine on MMessage event.
 +func (r *receiver) message(delivery proton.Delivery) {
- 	if r.eLink.State().RemoteClosed() {
- 		localClose(r.eLink, r.eLink.RemoteCondition().Error())
++	if r.pLink.State().RemoteClosed() {
++		localClose(r.pLink, r.pLink.RemoteCondition().Error())
 +		return
 +	}
 +	if delivery.HasMessage() {
 +		m, err := delivery.Message()
 +		if err != nil {
- 			localClose(r.eLink, err)
++			localClose(r.pLink, err)
 +			return
 +		}
 +		assert(m != nil)
- 		r.eLink.Advance()
- 		if r.eLink.Credit() < 0 {
- 			localClose(r.eLink, fmt.Errorf("received message in excess of credit limit"))
++		r.pLink.Advance()
++		if r.pLink.Credit() < 0 {
++			localClose(r.pLink, fmt.Errorf("received message in excess of credit limit"))
 +		} else {
 +			// We never issue more credit than cap(buffer) so this will not block.
 +			r.buffer <- ReceivedMessage{m, delivery, r}
 +		}
 +	}
 +}
 +
 +func (r *receiver) closed(err error) error {
++	e := r.link.closed(err)
 +	if r.buffer != nil {
 +		close(r.buffer)
 +	}
- 	return r.link.closed(err)
++	return e
 +}
 +
 +// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
 +type ReceivedMessage struct {
 +	// Message is the received message.
 +	Message amqp.Message
 +
- 	eDelivery proton.Delivery
++	pDelivery proton.Delivery
 +	receiver  Receiver
 +}
 +
 +// Acknowledge a ReceivedMessage with the given delivery status.
 +func (rm *ReceivedMessage) acknowledge(status uint64) error {
 +	return rm.receiver.(*receiver).engine().Inject(func() {
 +		// Deliveries are valid as long as the connection is, unless settled.
- 		rm.eDelivery.SettleAs(uint64(status))
++		rm.pDelivery.SettleAs(uint64(status))
 +	})
 +}
 +
 +// Accept tells the sender that we take responsibility for processing the message.
 +func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) }
 +
 +// Reject tells the sender we consider the message invalid and unusable.
 +func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) }
 +
 +// Release tells the sender we will not process the message but some other
 +// receiver might.
 +func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) }
 +
 +// IncomingReceiver is sent on the Connection.Incoming() channel when there is
 +// an incoming request to open a receiver link.
 +type IncomingReceiver struct {
- 	incomingLink
++	incoming
++	linkSettings
++}
++
++func newIncomingReceiver(sn *session, pLink proton.Link) *IncomingReceiver {
++	return &IncomingReceiver{
++		incoming:     makeIncoming(pLink),
++		linkSettings: makeIncomingLinkSettings(pLink, sn),
++	}
 +}
 +
 +// SetCapacity sets the capacity of the incoming receiver, call before Accept()
 +func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity }
 +
 +// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
 +func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch }
 +
 +// Accept accepts an incoming receiver endpoint
 +func (in *IncomingReceiver) Accept() Endpoint {
 +	return in.accept(func() Endpoint { return newReceiver(in.linkSettings) })
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 2f0e965,0000000..f46fdc4
mode 100644,000000..100644
--- a/electron/sender.go
+++ b/electron/sender.go
@@@ -1,274 -1,0 +1,288 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +	"fmt"
 +	"qpid.apache.org/amqp"
 +	"qpid.apache.org/proton"
 +	"time"
 +)
 +
 +// Sender is a Link that sends messages.
 +//
 +// The result of sending a message is provided by an Outcome value.
 +//
 +// A sender can buffer messages up to the credit limit provided by the remote receiver.
- // Send* methods will block if the buffer is full until there is space.
++// All the Send* methods will block if the buffer is full until there is space.
 +// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
 +//
 +type Sender interface {
 +	Endpoint
 +	LinkSettings
 +
 +	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
 +	// Returns an Outcome, which may contain an error if the message could not be sent.
 +	SendSync(m amqp.Message) Outcome
 +
 +	// SendWaitable puts a message in the send buffer and returns a channel that
 +	// you can use to wait for the Outcome of just that message. The channel is
- 	// buffered so you can receive from it whenever you want without blocking anything.
++	// buffered so you can receive from it whenever you want without blocking.
++	//
++	// Note: can block if there is no space to buffer the message.
 +	SendWaitable(m amqp.Message) <-chan Outcome
 +
 +	// SendForget buffers a message for sending and returns, with no notification of the outcome.
++	//
++	// Note: can block if there is no space to buffer the message.
 +	SendForget(m amqp.Message)
 +
 +	// SendAsync puts a message in the send buffer and returns immediately.  An
 +	// Outcome with Value = value will be sent to the ack channel when the remote
 +	// receiver has acknowledged the message or if there is an error.
 +	//
 +	// You can use the same ack channel for many calls to SendAsync(), possibly on
 +	// many Senders. The channel will receive the outcomes in the order they
 +	// become available. The channel should be buffered and/or served by dedicated
 +	// goroutines to avoid blocking the connection.
 +	//
 +	// If ack == nil no Outcome is sent.
++	//
++	// Note: can block if there is no space to buffer the message.
 +	SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 +
 +	SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)
 +
 +	SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome
 +
 +	SendForgetTimeout(m amqp.Message, timeout time.Duration)
 +
 +	SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
 +}
 +
 +// Outcome provides information about the outcome of sending a message.
 +type Outcome struct {
 +	// Status of the message: was it sent, how was it acknowledged.
 +	Status SentStatus
 +	// Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
 +	Error error
 +	// Value provided by the application in SendAsync()
 +	Value interface{}
 +}
 +
 +func (o Outcome) send(ack chan<- Outcome) {
 +	if ack != nil {
 +		ack <- o
 +	}
 +}
 +
 +// SentStatus indicates the status of a sent message.
 +type SentStatus int
 +
 +const (
 +	// Message was never sent
 +	Unsent SentStatus = iota
 +	// Message was sent but never acknowledged. It may or may not have been received.
 +	Unacknowledged
 +	// Message was accepted by the receiver (or was sent pre-settled, accept is assumed)
 +	Accepted
 +	// Message was rejected as invalid by the receiver
 +	Rejected
 +	// Message was not processed by the receiver but may be valid for a different receiver
 +	Released
 +	// Receiver responded with an unrecognized status.
 +	Unknown
 +)
 +
 +// String human readable name for SentStatus.
 +func (s SentStatus) String() string {
 +	switch s {
 +	case Unsent:
 +		return "unsent"
 +	case Unacknowledged:
 +		return "unacknowledged"
 +	case Accepted:
 +		return "accepted"
 +	case Rejected:
 +		return "rejected"
 +	case Released:
 +		return "released"
 +	case Unknown:
 +		return "unknown"
 +	default:
 +		return fmt.Sprintf("invalid(%d)", s)
 +	}
 +}
 +
 +// Convert proton delivery state code to SentStatus value
 +func sentStatus(d uint64) SentStatus {
 +	switch d {
 +	case proton.Accepted:
 +		return Accepted
 +	case proton.Rejected:
 +		return Rejected
 +	case proton.Released, proton.Modified:
 +		return Released
 +	default:
 +		return Unknown
 +	}
 +}
 +
 +// Sender implementation, held by handler.
 +type sender struct {
 +	link
 +	credit chan struct{} // Signal available credit.
 +}
 +
 +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
 +	// wait for credit
 +	if _, err := timedReceive(s.credit, t); err != nil {
 +		if err == Closed && s.Error() != nil {
 +			err = s.Error()
 +		}
 +		Outcome{Unsent, err, v}.send(ack)
 +		return
 +	}
 +	// Send a message in handler goroutine
 +	err := s.engine().Inject(func() {
 +		if s.Error() != nil {
 +			Outcome{Unsent, s.Error(), v}.send(ack)
 +			return
 +		}
 +
- 		delivery, err2 := s.eLink.Send(m)
++		delivery, err2 := s.pLink.Send(m)
 +		switch {
 +		case err2 != nil:
 +			Outcome{Unsent, err2, v}.send(ack)
 +		case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
 +			if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy
 +				delivery.Settle()
 +			}
 +			Outcome{Accepted, nil, v}.send(ack) // Assume accepted
 +		default:
 +			s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler
 +		}
- 		if s.eLink.Credit() > 0 { // Signal there is still credit
++		if s.pLink.Credit() > 0 { // Signal there is still credit
 +			s.sendable()
 +		}
 +	})
 +	if err != nil {
 +		Outcome{Unsent, err, v}.send(ack)
 +	}
 +}
 +
 +// Set credit flag if not already set. Non-blocking, any goroutine
 +func (s *sender) sendable() {
 +	select { // Non-blocking
 +	case s.credit <- struct{}{}:
 +	default:
 +	}
 +}
 +
 +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome {
 +	out := make(chan Outcome, 1)
 +	s.SendAsyncTimeout(m, out, nil, t)
 +	return out
 +}
 +
 +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
 +	s.SendAsyncTimeout(m, nil, nil, t)
 +}
 +
 +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
 +	deadline := time.Now().Add(t)
 +	ack := s.SendWaitableTimeout(m, t)
 +	t = deadline.Sub(time.Now()) // Adjust for time already spent.
 +	if t < 0 {
 +		t = 0
 +	}
 +	if out, err := timedReceive(ack, t); err == nil {
 +		return out.(Outcome)
 +	} else {
 +		if err == Closed && s.Error() != nil {
 +			err = s.Error()
 +		}
 +		return Outcome{Unacknowledged, err, nil}
 +	}
 +}
 +
 +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
 +	s.SendAsyncTimeout(m, ack, v, Forever)
 +}
 +
 +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
 +	return s.SendWaitableTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendForget(m amqp.Message) {
 +	s.SendForgetTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendSync(m amqp.Message) Outcome {
 +	return <-s.SendWaitable(m)
 +}
 +
 +// handler goroutine
 +func (s *sender) closed(err error) error {
 +	close(s.credit)
 +	return s.link.closed(err)
 +}
 +
 +func newSender(ls linkSettings) *sender {
 +	s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)}
- 	s.endpoint.init(s.link.eLink.String())
- 	s.handler().addLink(s.eLink, s)
- 	s.link.eLink.Open()
++	s.endpoint.init(s.link.pLink.String())
++	s.handler().addLink(s.pLink, s)
++	s.link.pLink.Open()
 +	return s
 +}
 +
 +// sentMessage records a sent message on the handler.
 +type sentMessage struct {
 +	ack   chan<- Outcome
 +	value interface{}
 +}
 +
 +// IncomingSender is sent on the Connection.Incoming() channel when there is
 +// an incoming request to open a sender link.
 +type IncomingSender struct {
- 	incomingLink
++	incoming
++	linkSettings
++}
++
++func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender {
++	return &IncomingSender{
++		incoming:     makeIncoming(pLink),
++		linkSettings: makeIncomingLinkSettings(pLink, sn),
++	}
 +}
 +
 +// Accept accepts an incoming sender endpoint
 +func (in *IncomingSender) Accept() Endpoint {
 +	return in.accept(func() Endpoint { return newSender(in.linkSettings) })
 +}
 +
 +// Call in injected functions to check if the sender is valid.
 +func (s *sender) valid() bool {
- 	s2, ok := s.handler().links[s.eLink].(*sender)
++	s2, ok := s.handler().links[s.pLink].(*sender)
 +	return ok && s2 == s
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/session.go
----------------------------------------------------------------------
diff --cc electron/session.go
index 1bbc52c,0000000..6dae354
mode 100644,000000..100644
--- a/electron/session.go
+++ b/electron/session.go
@@@ -1,128 -1,0 +1,139 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"qpid.apache.org/proton"
 +)
 +
 +// Session is an AMQP session, it contains Senders and Receivers.
 +type Session interface {
 +	Endpoint
 +
 +	// Sender opens a new sender.
 +	Sender(...LinkOption) (Sender, error)
 +
 +	// Receiver opens a new Receiver.
 +	Receiver(...LinkOption) (Receiver, error)
 +}
 +
 +type session struct {
 +	endpoint
- 	eSession   proton.Session
- 	connection *connection
- 	capacity   uint
++	pSession                         proton.Session
++	connection                       *connection
++	incomingCapacity, outgoingWindow uint
 +}
 +
 +// SessionOption can be passed when creating a Session
 +type SessionOption func(*session)
 +
 +// IncomingCapacity returns a Session Option that sets the size (in bytes) of
- // the sessions incoming data buffer..
- func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } }
++// the session's incoming data buffer.
++func IncomingCapacity(bytes uint) SessionOption {
++	return func(s *session) { s.incomingCapacity = bytes }
++}
++
++// OutgoingWindow returns a Session Option that sets the outgoing window size (in frames).
++func OutgoingWindow(frames uint) SessionOption {
++	return func(s *session) { s.outgoingWindow = frames }
++}
 +
 +// in proton goroutine
 +func newSession(c *connection, es proton.Session, setting ...SessionOption) *session {
 +	s := &session{
 +		connection: c,
- 		eSession:   es,
++		pSession:   es,
 +	}
 +	s.endpoint.init(es.String())
 +	for _, set := range setting {
 +		set(s)
 +	}
- 	c.handler.sessions[s.eSession] = s
- 	s.eSession.SetIncomingCapacity(s.capacity)
- 	s.eSession.Open()
++	c.handler.sessions[s.pSession] = s
++	s.pSession.SetIncomingCapacity(s.incomingCapacity)
++	s.pSession.SetOutgoingWindow(s.outgoingWindow)
++	s.pSession.Open()
 +	return s
 +}
 +
 +func (s *session) Connection() Connection     { return s.connection }
- func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
++func (s *session) pEndpoint() proton.Endpoint { return s.pSession }
 +func (s *session) engine() *proton.Engine     { return s.connection.engine }
 +
 +func (s *session) Close(err error) {
- 	s.engine().Inject(func() {
++	_ = s.engine().Inject(func() {
 +		if s.Error() == nil {
- 			localClose(s.eSession, err)
++			localClose(s.pSession, err)
 +		}
 +	})
 +}
 +
 +func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
 +	err = s.engine().InjectWait(func() error {
 +		if s.Error() != nil {
 +			return s.Error()
 +		}
 +		l, err := makeLocalLink(s, true, setting...)
 +		if err == nil {
 +			snd = newSender(l)
 +		}
 +		return err
 +	})
 +	return
 +}
 +
 +func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
 +	err = s.engine().InjectWait(func() error {
 +		if s.Error() != nil {
 +			return s.Error()
 +		}
 +		l, err := makeLocalLink(s, false, setting...)
 +		if err == nil {
 +			rcv = newReceiver(l)
 +		}
 +		return err
 +	})
 +	return
 +}
 +
 +// IncomingSender is sent on the Connection.Incoming() channel when there is an
 +// incoming request to open a session.
 +type IncomingSession struct {
 +	incoming
- 	h        *handler
- 	pSession proton.Session
- 	capacity uint
++	h                                *handler
++	pSession                         proton.Session
++	incomingCapacity, outgoingWindow uint
 +}
 +
 +func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
 +	return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
 +}
 +
- // SetCapacity sets the session buffer capacity of an incoming session in bytes.
- func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
++// SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes.
++func (in *IncomingSession) SetIncomingCapacity(bytes uint) { in.incomingCapacity = bytes }
++
++// SetOutgoingWindow sets the session outgoing window of an incoming session in frames.
++func (in *IncomingSession) SetOutgoingWindow(frames uint) { in.outgoingWindow = frames }
 +
 +// Accept an incoming session endpoint.
 +func (in *IncomingSession) Accept() Endpoint {
 +	return in.accept(func() Endpoint {
- 		return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity))
++		return newSession(in.h.connection, in.pSession, IncomingCapacity(in.incomingCapacity), OutgoingWindow(in.outgoingWindow))
 +	})
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index eecda7a,0000000..c0f0093
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,409 -1,0 +1,422 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
- // #include <proton/connection.h>
- // #include <proton/event.h>
- // #include <proton/error.h>
- // #include <proton/handlers.h>
- // #include <proton/session.h>
- // #include <proton/transport.h>
- // #include <memory.h>
- // #include <stdlib.h>
- //
- // PN_HANDLE(REMOTE_ADDR)
- import "C"
- 
 +import (
 +	"fmt"
 +	"net"
++	"os"
++	"strings"
 +	"sync"
 +	"time"
 +	"unsafe"
 +)
 +
++/*
++#include <proton/connection.h>
++#include <proton/event.h>
++#include <proton/error.h>
++#include <proton/handlers.h>
++#include <proton/session.h>
++#include <proton/transport.h>
++#include <memory.h>
++#include <stdlib.h>
++*/
++import "C"
++
 +// Injecter allows functions to be "injected" into the event-processing loop, to
 +// be called in the same goroutine as event handlers.
 +type Injecter interface {
 +	// Inject a function into the engine goroutine.
 +	//
 +	// f() will be called in the same goroutine as event handlers, so it can safely
 +	// use values belonging to event handlers without synchronization. f() should
 +	// not block, no further events or injected functions can be processed until
 +	// f() returns.
 +	//
 +	// Returns a non-nil error if the function could not be injected and will
 +	// never be called. Otherwise the function will eventually be called.
 +	//
 +	// Note that proton values (Link, Session, Connection etc.) that existed when
 +	// Inject(f) was called may have become invalid by the time f() is executed.
 +	// Handlers should handle keep track of Closed events to ensure proton values
 +	// are not used after they become invalid. One technique is to have map from
 +	// proton values to application values. Check that the map has the correct
 +	// proton/application value pair at the start of the injected function and
 +	// delete the value from the map when handling a Closed event.
 +	Inject(f func()) error
 +
 +	// InjectWait is like Inject but does not return till f() has completed.
 +	// If f() cannot be injected it returns the error from Inject(), otherwise
 +	// it returns the error from f()
 +	InjectWait(f func() error) error
 +}
 +
- // bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
- type bufferChan struct {
- 	buffers    chan []byte
- 	buf1, buf2 []byte
- }
- 
- func newBufferChan(size int) *bufferChan {
- 	return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
- }
- 
- func (b *bufferChan) buffer() []byte {
- 	b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
- 	return b.buf1[:cap(b.buf1)]
- }
- 
 +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
 +// Handler functions sequentially in a single goroutine. Actions taken by
 +// Handler functions (such as sending messages) are encoded and written to the
 +// net.Conn. You can create multiple Engines to handle multiple connections
 +// concurrently.
 +//
 +// You implement the EventHandler and/or MessagingHandler interfaces and provide
 +// those values to NewEngine(). Their HandleEvent method will be called in the
 +// event-handling goroutine.
 +//
 +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
 +// other goroutines, store them, or use them as map indexes. Effectively they are
 +// just pointers.  Other goroutines cannot call their methods directly but they can
 +// can create a function closure to call such methods and pass it to Engine.Inject()
 +// to have it evaluated in the engine goroutine.
 +//
 +// You are responsible for ensuring you don't use an event value after it is
 +// invalid. The handler methods will tell you when a value is no longer valid. For
 +// example after a LinkClosed event, that link is no longer valid. If you do
 +// Link.Close() yourself (in a handler or injected function) the link remains valid
 +// until the corresponing LinkClosed event is received by the handler.
 +//
 +// Engine.Close() will take care of cleaning up any remaining values when you are
 +// done with the Engine. All values associated with a engine become invalid when you
 +// call Engine.Close()
 +//
 +// The qpid.apache.org/proton/concurrent package will do all this for you, so it
 +// may be a better choice for some applications.
 +//
 +type Engine struct {
 +	// Error is set on exit from Run() if there was an error.
 +	err    ErrorHolder
 +	inject chan func()
 +
 +	conn       net.Conn
 +	connection Connection
 +	transport  Transport
 +	collector  *C.pn_collector_t
- 	read       *bufferChan    // Read buffers channel.
- 	write      *bufferChan    // Write buffers channel.
 +	handlers   []EventHandler // Handlers for proton events.
 +	running    chan struct{}  // This channel will be closed when the goroutines are done.
 +	closeOnce  sync.Once
++	timer      *time.Timer
++	traceEvent bool
 +}
 +
 +const bufferSize = 4096
 +
- // NewEngine initializes a engine with a connection and handlers. To start it running:
- //    eng := NewEngine(...)
- //    go run eng.Run()
- // The goroutine will exit when the engine is closed or disconnected.
- // You can check for errors on Engine.Error.
- //
++func envBool(name string) bool {
++	v := strings.ToLower(os.Getenv(name))
++	return v == "true" || v == "1" || v == "yes" || v == "on"
++}
++
++// Create a new Engine and call Initialize() with conn and handlers
 +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
- 	// Save the connection ID for Connection.String()
- 	eng := &Engine{
- 		inject:     make(chan func()),
- 		conn:       conn,
- 		transport:  Transport{C.pn_transport()},
- 		connection: Connection{C.pn_connection()},
- 		collector:  C.pn_collector(),
- 		handlers:   handlers,
- 		read:       newBufferChan(bufferSize),
- 		write:      newBufferChan(bufferSize),
- 		running:    make(chan struct{}),
- 	}
++	eng := &Engine{}
++	return eng, eng.Initialize(conn, handlers...)
++}
++
++// Initialize an Engine with a connection and handlers. Start it with Run()
++func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error {
++	eng.inject = make(chan func())
++	eng.conn = conn
++	eng.connection = Connection{C.pn_connection()}
++	eng.transport = Transport{C.pn_transport()}
++	eng.collector = C.pn_collector()
++	eng.handlers = handlers
++	eng.running = make(chan struct{})
++	eng.timer = time.NewTimer(0)
++	eng.traceEvent = envBool("PN_TRACE_EVT")
 +	if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
- 		return nil, fmt.Errorf("failed to allocate engine")
++		eng.free()
++		return fmt.Errorf("proton.NewEngine cannot allocate")
 +	}
++	C.pn_connection_collect(eng.connection.pn, eng.collector)
++	return nil
++}
 +
- 	// TODO aconway 2015-06-25: connection settings for user, password, container etc.
- 	// before transport.Bind() Set up connection before Engine, allow Engine or Reactor
- 	// to run connection.
- 
- 	// Unique container-id by default.
- 	eng.connection.SetContainer(UUID4().String())
- 	pnErr := eng.transport.Bind(eng.connection)
- 	if pnErr != 0 {
- 		return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr))
++// Create a byte slice backed by C memory.
++// Empty or error (size <= 0) returns a nil byte slice.
++func cByteSlice(start unsafe.Pointer, size int) []byte {
++	if start == nil || size <= 0 {
++		return nil
++	} else {
++		// Slice from very large imaginary array in C memory
++		return (*[1 << 30]byte)(start)[:size:size]
 +	}
- 	C.pn_connection_collect(eng.connection.pn, eng.collector)
- 	eng.connection.Open()
- 	return eng, nil
++}
++
++func (eng *Engine) Connection() Connection {
++	return eng.connection
++}
++
++func (eng *Engine) Transport() Transport {
++	return eng.transport
 +}
 +
 +func (eng *Engine) String() string {
- 	return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
++	return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 +}
 +
 +func (eng *Engine) Id() string {
- 	return fmt.Sprintf("%p", eng)
++	// Use transport address to match default PN_TRACE_FRM=1 output.
++	return fmt.Sprintf("%p", eng.Transport().CPtr())
 +}
 +
 +func (eng *Engine) Error() error {
 +	return eng.err.Get()
 +}
 +
 +// Inject a function into the Engine's event loop.
 +//
 +// f() will be called in the same event-processing goroutine that calls Handler
 +// methods. f() can safely call methods on values that belong to this engine
 +// (Sessions, Links etc)
 +//
 +// The injected function has no parameters or return values. It is normally a
 +// closure and can use channels to communicate with the injecting goroutine if
 +// necessary.
 +//
 +// Returns a non-nil error if the engine is closed before the function could be
 +// injected.
 +func (eng *Engine) Inject(f func()) error {
 +	select {
 +	case eng.inject <- f:
 +		return nil
 +	case <-eng.running:
 +		return eng.Error()
 +	}
 +}
 +
 +// InjectWait is like Inject but does not return till f() has completed or the
 +// engine is closed, and returns an error value from f()
 +func (eng *Engine) InjectWait(f func() error) error {
 +	done := make(chan error)
 +	defer close(done)
 +	err := eng.Inject(func() { done <- f() })
 +	if err != nil {
 +		return err
 +	}
 +	select {
 +	case <-eng.running:
 +		return eng.Error()
 +	case err := <-done:
 +		return err
 +	}
 +}
 +
 +// Server puts the Engine in server mode, meaning it will auto-detect security settings on
 +// the incoming connnection such as use of SASL and SSL.
 +// Must be called before Run()
 +//
- func (eng *Engine) Server() { eng.transport.SetServer() }
++func (eng *Engine) Server() { eng.Transport().SetServer() }
 +
- func (eng *Engine) disconnect() {
- 	eng.transport.CloseHead()
++func (eng *Engine) disconnect(err error) {
++	cond := eng.Transport().Condition()
++	cond.SetError(err)              // Set the provided error.
++	cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set.
 +	eng.transport.CloseTail()
- 	eng.conn.Close()
- 	eng.dispatch()
++	eng.transport.CloseHead()
 +}
 +
 +// Close the engine's connection.
 +// If err != nil pass it to the remote end as the close condition.
 +// Returns when the remote end closes or disconnects.
 +func (eng *Engine) Close(err error) {
- 	eng.Inject(func() { CloseError(eng.connection, err) })
++	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +	<-eng.running
 +}
 +
 +// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
 +func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
- 	eng.Inject(func() { CloseError(eng.connection, err) })
++	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 +	select {
 +	case <-eng.running:
 +	case <-time.After(timeout):
 +		eng.Disconnect(err)
 +	}
 +}
 +
 +// Disconnect the engine's connection immediately without an AMQP close.
 +// Process any termination events before returning.
 +func (eng *Engine) Disconnect(err error) {
- 	eng.Inject(func() { eng.transport.Condition().SetError(err); eng.disconnect() })
++	_ = eng.Inject(func() { eng.disconnect(err) })
 +	<-eng.running
 +}
 +
++// Let proton run timed activity and set up the next tick
++func (eng *Engine) tick() {
++	now := time.Now()
++	next := eng.Transport().Tick(now)
++	if !next.IsZero() {
++		eng.timer.Reset(next.Sub(now))
++	}
++}
++
++func (eng *Engine) dispatch() bool {
++	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
++		e := makeEvent(ce, eng)
++		if eng.traceEvent {
++			eng.transport.Log(e.String())
++		}
++		for _, h := range eng.handlers {
++			h.HandleEvent(e)
++		}
++		if e.Type() == EConnectionRemoteOpen {
++			eng.tick() // Update the tick if changed by remote.
++		}
++		C.pn_collector_pop(eng.collector)
++	}
++	return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil
++}
++
++func (eng *Engine) writeBuffer() []byte {
++	size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
++	start := eng.Transport().Head()
++	return cByteSlice(start, size)
++}
++
++func (eng *Engine) readBuffer() []byte {
++	size := eng.Transport().Capacity()
++	start := eng.Transport().Tail()
++	return cByteSlice(start, size)
++}
++
++func (eng *Engine) free() {
++	if !eng.transport.IsNil() {
++		eng.transport.Unbind()
++		eng.transport.Free()
++		eng.transport = Transport{}
++	}
++	if !eng.connection.IsNil() {
++		eng.connection.Free()
++		eng.connection = Connection{}
++	}
++	if eng.collector != nil {
++		C.pn_collector_release(eng.collector)
++		C.pn_collector_free(eng.collector)
++		eng.collector = nil
++	}
++}
++
 +// Run the engine. Engine.Run() will exit when the engine is closed or
 +// disconnected.  You can check for errors after exit with Engine.Error().
 +//
 +func (eng *Engine) Run() error {
++	defer eng.free()
++	eng.transport.Bind(eng.connection)
++	eng.tick() // Start ticking if needed
++
++	// Channels for read and write buffers going in and out of the read/write goroutines.
++	// The channels are unbuffered: we want to exchange buffers in seuquence.
++	readsIn, writesIn := make(chan []byte), make(chan []byte)
++	readsOut, writesOut := make(chan []byte), make(chan []byte)
++
 +	wait := sync.WaitGroup{}
 +	wait.Add(2) // Read and write goroutines
 +
- 	readErr := make(chan error, 1) // Don't block
- 	go func() {                    // Read goroutine
++	go func() { // Read goroutine
 +		defer wait.Done()
 +		for {
- 			rbuf := eng.read.buffer()
++			rbuf, ok := <-readsIn
++			if !ok {
++				return
++			}
 +			n, err := eng.conn.Read(rbuf)
 +			if n > 0 {
- 				eng.read.buffers <- rbuf[:n]
- 			}
- 			if err != nil {
- 				readErr <- err
- 				close(readErr)
- 				close(eng.read.buffers)
++				readsOut <- rbuf[:n]
++			} else if err != nil {
++				_ = eng.Inject(func() {
++					eng.Transport().Condition().SetError(err)
++					eng.Transport().CloseTail()
++				})
 +				return
 +			}
 +		}
 +	}()
 +
- 	writeErr := make(chan error, 1) // Don't block
- 	go func() {                     // Write goroutine
++	go func() { // Write goroutine
 +		defer wait.Done()
 +		for {
- 			wbuf, ok := <-eng.write.buffers
++			wbuf, ok := <-writesIn
 +			if !ok {
 +				return
 +			}
- 			_, err := eng.conn.Write(wbuf)
- 			if err != nil {
- 				writeErr <- err
- 				close(writeErr)
++			n, err := eng.conn.Write(wbuf)
++			if n > 0 {
++				writesOut <- wbuf[:n]
++			} else if err != nil {
++				_ = eng.Inject(func() {
++					eng.Transport().Condition().SetError(err)
++					eng.Transport().CloseHead()
++				})
 +				return
 +			}
 +		}
 +	}()
 +
- 	wbuf := eng.write.buffer()[:0]
++	for eng.dispatch() {
++		readBuf := eng.readBuffer()
++		writeBuf := eng.writeBuffer()
++		// Note that getting the buffers can generate events (eg. SASL events) that
++		// might close the transport. Check if we are already finished before
++		// blocking for IO.
++		if !eng.dispatch() {
++			break
++		}
 +
- 	for !eng.transport.Closed() {
- 		if len(wbuf) == 0 {
- 			eng.pop(&wbuf)
++		// sendReads/sendWrites are nil (not sendable in select) unless we have a
++		// buffer to read/write
++		var sendReads, sendWrites chan []byte
++		if readBuf != nil {
++			sendReads = readsIn
 +		}
- 		// Don't set wchan unless there is something to write.
- 		var wchan chan []byte
- 		if len(wbuf) > 0 {
- 			wchan = eng.write.buffers
++		if writeBuf != nil {
++			sendWrites = writesIn
 +		}
 +
++		// Send buffers to the read/write goroutines if we have them.
++		// Get buffers from the read/write goroutines and process them
++		// Check for injected functions
 +		select {
- 		case buf, ok := <-eng.read.buffers: // Read a buffer
- 			if ok {
- 				eng.push(buf)
- 			}
- 		case wchan <- wbuf: // Write a buffer
- 			wbuf = eng.write.buffer()[:0]
- 		case f, ok := <-eng.inject: // Function injected from another goroutine
- 			if ok {
- 				f()
- 			}
- 		case err := <-readErr:
- 			eng.transport.Condition().SetError(err)
- 			eng.transport.CloseTail()
- 		case err := <-writeErr:
- 			eng.transport.Condition().SetError(err)
- 			eng.transport.CloseHead()
- 		}
- 		eng.dispatch()
- 		if eng.connection.State().RemoteClosed() && eng.connection.State().LocalClosed() {
- 			eng.disconnect()
- 		}
- 	}
- 	eng.err.Set(EndpointError(eng.connection))
- 	eng.err.Set(eng.transport.Condition().Error())
- 	close(eng.write.buffers)
- 	eng.conn.Close() // Make sure connection is closed
- 	wait.Wait()
- 	close(eng.running) // Signal goroutines have exited and Error is set.
 +
- 	if !eng.connection.IsNil() {
- 		eng.connection.Free()
- 	}
- 	if !eng.transport.IsNil() {
- 		eng.transport.Free()
- 	}
- 	if eng.collector != nil {
- 		C.pn_collector_free(eng.collector)
- 	}
- 	for _, h := range eng.handlers {
- 		switch h := h.(type) {
- 		case cHandler:
- 			C.pn_handler_free(h.pn)
- 		}
- 	}
- 	return eng.err.Get()
- }
++		case sendReads <- readBuf:
 +
- func minInt(a, b int) int {
- 	if a < b {
- 		return a
- 	} else {
- 		return b
- 	}
- }
++		case sendWrites <- writeBuf:
 +
- func (eng *Engine) pop(buf *[]byte) {
- 	pending := int(eng.transport.Pending())
- 	switch {
- 	case pending == int(C.PN_EOS):
- 		*buf = (*buf)[:]
- 		return
- 	case pending < 0:
- 		panic(fmt.Errorf("%s", PnErrorCode(pending)))
- 	}
- 	size := minInt(pending, cap(*buf))
- 	*buf = (*buf)[:size]
- 	if size == 0 {
- 		return
- 	}
- 	C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size))
- 	assert(size > 0)
- 	eng.transport.Pop(uint(size))
- }
++		case buf := <-readsOut:
++			eng.transport.Process(uint(len(buf)))
 +
- func (eng *Engine) push(buf []byte) {
- 	buf2 := buf
- 	for len(buf2) > 0 {
- 		n := eng.transport.Push(buf2)
- 		if n <= 0 {
- 			panic(fmt.Errorf("error in transport: %s", PnErrorCode(n)))
- 		}
- 		buf2 = buf2[n:]
- 	}
- }
++		case buf := <-writesOut:
++			eng.transport.Pop(uint(len(buf)))
 +
- func (eng *Engine) peek() *C.pn_event_t { return C.pn_collector_peek(eng.collector) }
++		case f, ok := <-eng.inject: // Function injected from another goroutine
++			if ok {
++				f()
++			}
 +
- func (eng *Engine) dispatch() {
- 	for ce := eng.peek(); ce != nil; ce = eng.peek() {
- 		for _, h := range eng.handlers {
- 			h.HandleEvent(makeEvent(ce, eng))
++		case <-eng.timer.C:
++			eng.tick()
 +		}
- 		C.pn_collector_pop(eng.collector)
 +	}
- }
 +
- func (eng *Engine) Connection() Connection { return eng.connection }
++	eng.err.Set(EndpointError(eng.Connection()))
++	eng.err.Set(eng.Transport().Condition().Error())
++	close(readsIn)
++	close(writesIn)
++	close(eng.running)   // Signal goroutines have exited and Error is set, disable Inject()
++	_ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
++	wait.Wait()          // Wait for goroutines
++	return eng.err.Get()
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message