Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1D9E318A95 for ; Thu, 22 Oct 2015 22:16:19 +0000 (UTC) Received: (qmail 12387 invoked by uid 500); 22 Oct 2015 22:16:19 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 12374 invoked by uid 500); 22 Oct 2015 22:16:19 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 12365 invoked by uid 99); 22 Oct 2015 22:16:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Oct 2015 22:16:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BBF58E392F; Thu, 22 Oct 2015 22:16:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Message-Id: <2b6bb8847550461aa908309a5ecaf574@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: qpid-proton git commit: NO-JIRA: go: Bug fixes and improved examples. Date: Thu, 22 Oct 2015 22:16:18 +0000 (UTC) Repository: qpid-proton Updated Branches: refs/heads/master 6a306163f -> bc0a242e4 NO-JIRA: go: Bug fixes and improved examples. package proton: - Injecter() provided by event rather than connection. Allow different event-loop strategies. - Add access to proton refcounts, may be useful for some apps. package electron: - simplified sender logic using credit flag. - consistent link, session and connection options. examples: simplified & improved proton/broker: concurrent broker using handler per connection. electron/broker: cleaned up for comparison to proton/broker. examples/README.md discussion of brokers. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bc0a242e Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bc0a242e Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bc0a242e Branch: refs/heads/master Commit: bc0a242e4cc335f2aa496b98bf06d033904ca23d Parents: 6a30616 Author: Alan Conway Authored: Tue Oct 13 10:31:01 2015 -0400 Committer: Alan Conway Committed: Thu Oct 22 18:14:17 2015 -0400 ---------------------------------------------------------------------- examples/go/CMakeLists.txt | 9 +- examples/go/README.md | 31 ++ examples/go/electron/broker.go | 67 ++-- examples/go/electron/receive.go | 5 +- examples/go/electron/send.go | 2 - examples/go/example_test.go | 57 ++- examples/go/proton/broker.go | 389 ++++++++++--------- examples/go/util/util.go | 2 +- proton-c/bindings/go/CMakeLists.txt | 7 +- .../src/qpid.apache.org/electron/connection.go | 151 ++++--- .../src/qpid.apache.org/electron/container.go | 6 +- .../go/src/qpid.apache.org/electron/handler.go | 102 +++-- .../go/src/qpid.apache.org/electron/link.go | 67 ++-- .../go/src/qpid.apache.org/electron/receiver.go | 60 +-- .../go/src/qpid.apache.org/electron/sender.go | 120 +++--- .../go/src/qpid.apache.org/electron/session.go | 49 ++- .../go/src/qpid.apache.org/electron/time.go | 19 +- .../go/src/qpid.apache.org/proton/doc.go | 29 +- .../go/src/qpid.apache.org/proton/engine.go | 71 ++-- .../go/src/qpid.apache.org/proton/handlers.go | 18 +- .../go/src/qpid.apache.org/proton/wrappers.go | 98 +++-- .../qpid/proton/amqp/messaging/Terminus.java | 2 +- 22 files changed, 736 insertions(+), 625 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt index c345523..32be548 100644 --- a/examples/go/CMakeLists.txt +++ b/examples/go/CMakeLists.txt @@ -20,6 +20,7 @@ if(BUILD_GO) set(examples electron/broker electron/receive electron/send proton/broker) + file(GLOB_RECURSE example_source ${CMAKE_CURRENT_SOURCE_DIR}/*.go) # Build example exes foreach(example ${examples}) @@ -28,7 +29,8 @@ if(BUILD_GO) add_custom_command( OUTPUT ${target} COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${target} ${source} - DEPENDS ${source} ${GO_TARGETS}) + DEPENDS ${example_source} ${GO_TARGETS} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) list(APPEND example_exes ${target}) endforeach() @@ -36,8 +38,9 @@ if(BUILD_GO) set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/example_test) add_custom_command( OUTPUT ${test_exe} - DEPENDS ${example_exes} - COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go) + DEPENDS ${example_exes} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go + COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go + WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) add_custom_target(go-test-exe ALL DEPENDS ${test_exe}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/README.md ---------------------------------------------------------------------- diff --git a/examples/go/README.md b/examples/go/README.md index 0114d0e..ce9206b 100644 --- a/examples/go/README.md +++ b/examples/go/README.md @@ -87,3 +87,34 @@ Or use the Go broker and the python clients: python ../python/simple_send.py python ../python/simple_recv.py + + +## A tale of two brokers. + +The `proton` and `electron` packages provide two alternate APIs for AMQP applications. +See [the proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) for a discussion +of why there are two APIs. + +The examples `proton/broker.go` and `electron/broker.go` both implement the same +simple broker-like functionality using each of the two APIs. They both handle +multiple connections concurrently and store messages on bounded queues +implemented by Go channels. + +However the `electron/broker` is less than half as long as the `proton/broker` +illustrating why it is better suited for most Go applications. + +`proton/broker` must explicitly handle proton events, which are processed in a +single goroutine per connection since proton is not concurrent safe. Each +connection uses channels to exchange messages between the event-handling +goroutine and the shared queues that are accessible to all connections. Sending +messages is particularly tricky since we must monitor the queue for available +messages and the sending link for available credit in order to send messages. + + +`electron/broker` takes advantage of the `electron` package, which hides all the +event handling and passing of messages between goroutines beind behind +straightforward interfaces for sending and receiving messages. The electron +broker can implement links as simple goroutines that loop popping messages from +a queue and sending them or receiving messages and pushing them to a queue. + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go index 1e4a931..f1dce17 100644 --- a/examples/go/electron/broker.go +++ b/examples/go/electron/broker.go @@ -52,21 +52,20 @@ var qsize = flag.Int("qsize", 1000, "Max queue size") func main() { flag.Usage = usage flag.Parse() - if err := newBroker().run(); err != nil { + b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")} + if err := b.run(); err != nil { log.Fatal(err) } } +// State for the broker type broker struct { queues util.Queues container electron.Container } -func newBroker() *broker { - return &broker{util.MakeQueues(*qsize), electron.NewContainer("")} -} - -func (b *broker) run() (err error) { +// Listens for connections and starts an electron.Connection for each one. +func (b *broker) run() error { listener, err := net.Listen("tcp", *addr) if err != nil { return err @@ -76,46 +75,29 @@ func (b *broker) run() (err error) { for { conn, err := listener.Accept() if err != nil { - util.Debugf("Accept error: %s", err) + util.Debugf("Accept error: %v", err) continue } - if err := b.connection(conn); err != nil { - if err != nil { - util.Debugf("Connection error: %s", err) - continue - } + c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept)) + if err != nil { + util.Debugf("Connection error: %v", err) + continue } + util.Debugf("Accepted %v", c) } } -// connection creates a new AMQP connection for a net.Conn. -func (b *broker) connection(conn net.Conn) error { - c, err := b.container.Connection(conn) - if err != nil { - return err - } - c.Server() // Enable server-side protocol negotiation. - c.Listen(b.accept) // Call accept() for remotely-opened endpoints. - if err := c.Open(); err != nil { - return err - } - util.Debugf("Accepted %s", c) - return nil -} - // accept remotely-opened endpoints (Session, Sender and Receiver) // and start goroutines to service them. -func (b *broker) accept(ep electron.Endpoint) error { - switch ep := ep.(type) { - case electron.Sender: - util.Debugf("%s opened", ep) - go b.sender(ep) - case electron.Receiver: - util.Debugf("%s opened", ep) - ep.SetCapacity(100, true) // Pre-fetch 100 messages - go b.receiver(ep) +func (b *broker) accept(i electron.Incoming) { + switch i := i.(type) { + case *electron.IncomingSender: + go b.sender(i.AcceptSender()) + case *electron.IncomingReceiver: + go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages + default: + i.Accept() } - return nil } // sender pops messages from a queue and sends them. @@ -127,17 +109,16 @@ func (b *broker) sender(sender electron.Sender) { return } if err := sender.SendForget(m); err == nil { - util.Debugf("send %s: %s", sender, util.FormatMessage(m)) + util.Debugf("%s send: %s", sender, util.FormatMessage(m)) } else { - util.Debugf("send error %s: %s", sender, err) + util.Debugf("%s error: %s", sender, err) q <- m // Put it back on the queue. - break + return } } } -// receiver receives messages and pushes to the queue named by the receivers's -// Target address +// receiver receives messages and pushes to a queue. func (b *broker) receiver(receiver electron.Receiver) { q := b.queues.Get(receiver.Target()) for { @@ -146,7 +127,7 @@ func (b *broker) receiver(receiver electron.Receiver) { q <- rm.Message rm.Accept() } else { - util.Debugf("%s: error %s", receiver, err) + util.Debugf("%s error: %s", receiver, err) break } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go index e450a75..f7d41fa 100644 --- a/examples/go/electron/receive.go +++ b/examples/go/electron/receive.go @@ -77,8 +77,6 @@ func main() { conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" util.ExitIf(err) c, err := container.Connection(conn) - util.ExitIf(err) - util.ExitIf(c.Open()) connections <- c // Save connection so we can Close() when main() ends // Create a Receiver using the path of the URL as the source address @@ -106,9 +104,8 @@ func main() { // print each message until the count is exceeded. for i := uint64(0); i < *count; i++ { - util.Debugf("pre (%d/%d)\n", i, *count) m := <-messages - util.Debugf("%s (%d/%d)\n", util.FormatMessage(m), i, *count) + util.Debugf("%s\n", util.FormatMessage(m)) } fmt.Printf("Received %d messages\n", *count) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/send.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go index 6b7aec1..c9bdbc9 100644 --- a/examples/go/electron/send.go +++ b/examples/go/electron/send.go @@ -80,8 +80,6 @@ func main() { util.ExitIf(err) c, err := container.Connection(conn) util.ExitIf(err) - err = c.Open() - util.ExitIf(err) connections <- c // Save connection so we can Close() when main() ends // Create a Sender using the path of the URL as the AMQP address http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/example_test.go ---------------------------------------------------------------------- diff --git a/examples/go/example_test.go b/examples/go/example_test.go index 1e497b9..006e17c 100644 --- a/examples/go/example_test.go +++ b/examples/go/example_test.go @@ -28,6 +28,7 @@ import ( "flag" "fmt" "io" + "log" "math/rand" "net" "os" @@ -35,7 +36,6 @@ import ( "path" "path/filepath" "reflect" - "strings" "testing" "time" ) @@ -108,27 +108,8 @@ func checkEqual(want interface{}, got interface{}) error { return fmt.Errorf("%#v != %#v", want, got) } -// 'go build' uses the installed copy of the proton Go libraries, which may be out of date. -func checkStaleLibs(t *testing.T) { - var stale []string - pp := "qpid.apache.org" - for _, p := range []string{pp + "/proton", pp + "/amqp", pp + "/electron"} { - out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput() - if err != nil { - t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out)) - } - if string(out) != "false\n" { - stale = append(stale, p) - } - } - if len(stale) > 0 { - t.Fatalf("Stale libraries, run 'go install %s'", strings.Trim(fmt.Sprint(stale), "[]")) - } -} - // exampleCommand returns an exec.Cmd to run an example. func exampleCommand(t *testing.T, prog string, arg ...string) (cmd *exec.Cmd) { - checkStaleLibs(t) args := []string{} if *debug { args = append(args, "-debug=true") @@ -230,6 +211,7 @@ func goReceiveWant(t *testing.T, errchan chan<- error, want string, arg ...strin errchan <- ready buf := bytes.Buffer{} io.Copy(&buf, out) // Collect the rest of the output + cmd.Wait() errchan <- checkEqual(want, buf.String()) close(errchan) }() @@ -242,26 +224,30 @@ func TestExampleReceiveSend(t *testing.T) { t.Skip("Skip demo tests in short mode") } testBroker.start(t) - recvErr := make(chan error) - recvCmd := goReceiveWant( - t, recvErr, - fmt.Sprintf("Received %d messages\n", expected), - exampleArgs(fmt.Sprintf("-count=%d", expected))...) - defer func() { - recvCmd.Process.Kill() - recvCmd.Wait() - }() - if err := <-recvErr; err != ready { // Wait for receiver ready + + // Start receiver, wait for "listening" message on stdout + recvCmd := exampleCommand(t, "receive", exampleArgs(fmt.Sprintf("-count=%d", expected))...) + pipe, err := recvCmd.StdoutPipe() + if err != nil { t.Fatal(err) } - err := runExampleWant(t, + recvCmd.Start() + out := bufio.NewReader(pipe) + line, err := out.ReadString('\n') + if err := checkEqual("Listening on 3 connections\n", line); err != nil { + t.Fatal(err) + } + + if err := runExampleWant(t, fmt.Sprintf("Received all %d acknowledgements\n", expected), "send", - exampleArgs("-count", fmt.Sprintf("%d", *count))...) - if err != nil { + exampleArgs("-count", fmt.Sprintf("%d", *count))...); err != nil { t.Fatal(err) } - if err := <-recvErr; err != nil { + + buf := bytes.Buffer{} + io.Copy(&buf, out) + if err := checkEqual(fmt.Sprintf("Received %d messages\n", expected), buf.String()); err != nil { t.Fatal(err) } } @@ -276,6 +262,9 @@ var dir = flag.String("dir", "", "Directory containing example sources or binari var expected int func TestMain(m *testing.M) { + if out, err := exec.Command("go", "install", "qpid.apache.org/...").CombinedOutput(); err != nil { + log.Fatalf("go install failed: %s\n%s", err, out) + } expected = (*count) * (*connections) rand.Seed(time.Now().UTC().UnixNano()) testBroker = &broker{} // Broker is started on-demand by tests. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/proton/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go index 75f14f5..3eb5880 100644 --- a/examples/go/proton/broker.go +++ b/examples/go/proton/broker.go @@ -30,7 +30,7 @@ import ( "./util" "flag" "fmt" - "io" + "log" "net" "os" "qpid.apache.org/amqp" @@ -53,247 +53,276 @@ var qsize = flag.Int("qsize", 1000, "Max queue size") func main() { flag.Usage = usage flag.Parse() + b := &broker{util.MakeQueues(*qsize)} + if err := b.run(); err != nil { + log.Fatal(err) + } +} + +// State for the broker +type broker struct { + queues util.Queues +} - b := newBroker() +// Listens for connections and starts a proton.Engine for each one. +func (b *broker) run() error { listener, err := net.Listen("tcp", *addr) - util.ExitIf(err) + if err != nil { + return err + } defer listener.Close() fmt.Printf("Listening on %s\n", listener.Addr()) - - // Loop accepting new connections. for { conn, err := listener.Accept() if err != nil { - util.Debugf("Accept error: %s", err) + util.Debugf("Accept error: %v", err) continue } - if err := b.connection(conn); err != nil { - if err != nil { - util.Debugf("Connection error: %s", err) - continue - } + adapter := proton.NewMessagingAdapter(newHandler(&b.queues)) + // We want to accept messages when they are enqueued, not just when they + // are received, so we turn off auto-accept and prefetch by the adapter. + adapter.Prefetch = 0 + adapter.AutoAccept = false + engine, err := proton.NewEngine(conn, adapter) + if err != nil { + util.Debugf("Connection error: %v", err) + continue } + engine.Server() // Enable server-side protocol negotiation. + util.Debugf("Accepted connection %s", engine) + go func() { // Start goroutine to run the engine event loop + engine.Run() + util.Debugf("Closed %s", engine) + }() } } -type broker struct { - queues util.Queues -} - -func newBroker() *broker { - return &broker{util.MakeQueues(*qsize)} -} - -// connection creates a new AMQP connection for a net.Conn. -func (b *broker) connection(conn net.Conn) error { - delegator := proton.NewMessagingDelegator(newHandler(&b.queues, *credit)) - // We want to accept messages when they are enqueued, not just when they - // are received, so we turn off auto-accept and prefetch by the handler. - delegator.Prefetch = 0 - delegator.AutoAccept = false - engine, err := proton.NewEngine(conn, delegator) - if err != nil { - return err - } - engine.Server() // Enable server-side protocol negotiation. - go func() { // Start goroutine to run the engine event loop - engine.Run() - util.Debugf("Closed %s", engine) - }() - util.Debugf("Accepted %s", engine) - return nil -} - -// receiver is a channel to buffer messages waiting to go on the queue. -type receiver chan receivedMessage - -// receivedMessage is a message and the corresponding delivery for acknowledgement. -type receivedMessage struct { - delivery proton.Delivery - message amqp.Message -} - -// sender is a signal channel, closed when we are done sending. -type sender chan struct{} - // handler handles AMQP events. There is one handler per connection. The -// handler does not need to be concurrent-safe as proton will serialize all -// calls to a handler. We will use channels to communicate from the handler -// to goroutines sending and receiving messages. +// handler does not need to be concurrent-safe as proton.Engine will serialize +// all calls to the handler. We use channels to communicate between the handler +// goroutine and other goroutines sending and receiving messages. type handler struct { queues *util.Queues - credit int // Credit window for receiver flow control. - receivers map[proton.Link]receiver - senders map[proton.Link]sender + receivers map[proton.Link]*receiver + senders map[proton.Link]*sender + injecter proton.Injecter } -func newHandler(queues *util.Queues, credit int) *handler { +func newHandler(queues *util.Queues) *handler { return &handler{ - queues, - credit, - make(map[proton.Link]receiver), - make(map[proton.Link]sender), + queues: queues, + receivers: make(map[proton.Link]*receiver), + senders: make(map[proton.Link]*sender), } } -// Handle an AMQP event. +// HandleMessagingEvent handles an event, called in the handler goroutine. func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { switch t { + case proton.MStart: + h.injecter = e.Injecter() + case proton.MLinkOpening: - l := e.Link() - var err error - if l.IsReceiver() { - err = h.receiver(l) - } else { // IsSender() - err = h.sender(l) - } - if err == nil { - util.Debugf("%s opened", l) + if e.Link().IsReceiver() { + h.startReceiver(e) } else { - util.Debugf("%s open error: %s", l, err) - proton.CloseError(l, err) + h.startSender(e) } - case proton.MLinkClosing: - l := e.Link() - if r, ok := h.receivers[l]; ok { - close(r) - delete(h.receivers, l) - } else if s, ok := h.senders[l]; ok { - close(s) - delete(h.senders, l) - } - util.Debugf("%s closed", l) + case proton.MLinkClosed: + h.linkClosed(e.Link(), e.Link().RemoteCondition().Error()) case proton.MSendable: - l := e.Link() - q := h.queues.Get(l.RemoteSource().Address()) - if n, err := h.sendAll(e.Link(), q); err == nil && n > 0 { - // Still have credit, start a watcher. - go h.sendWatch(e.Link(), q) + if s, ok := h.senders[e.Link()]; ok { + s.sendable() // Signal the send goroutine that we have credit. + } else { + proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s sender not found", e.Link())) } case proton.MMessage: - l := e.Link() - d := e.Delivery() - m, err := d.Message() // Must decode message immediately before link state changes. + m, err := e.Delivery().Message() // Message() must be called while handling the MMessage event. if err != nil { - util.Debugf("%s error decoding message: %s", e.Link(), err) - proton.CloseError(l, err) - } else { - // This will not block, AMQP credit prevents us from overflowing the buffer. - h.receivers[l] <- receivedMessage{d, m} - util.Debugf("%s received %s", l, util.FormatMessage(m)) + proton.CloseError(e.Link(), err) + break } + r, ok := h.receivers[e.Link()] + if !ok { + proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s receiver not found", e.Link())) + break + } + // This will not block as AMQP credit is set to the buffer capacity. + r.buffer <- receivedMessage{e.Delivery(), m} + util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m)) - case proton.MConnectionClosing, proton.MDisconnected: - for l, r := range h.receivers { - close(r) - delete(h.receivers, l) + case proton.MConnectionClosed, proton.MDisconnected: + for l, _ := range h.receivers { + h.linkClosed(l, nil) } - for l, s := range h.senders { - close(s) - delete(h.senders, l) + for l, _ := range h.senders { + h.linkClosed(l, nil) } } } -// receiver is called by the handler when a receiver link opens. +// linkClosed is called when a link has been closed by both ends. +// It removes the link from the handlers maps and stops its goroutine. +func (h *handler) linkClosed(l proton.Link, err error) { + if s, ok := h.senders[l]; ok { + s.stop() + delete(h.senders, l) + } else if r, ok := h.receivers[l]; ok { + r.stop() + delete(h.receivers, l) + } +} + +// link has some common data and methods that are used by the sender and receiver types. // -// It sets up data structures in the handler and then starts a goroutine -// to receive messages and put them on a queue. -func (h *handler) receiver(l proton.Link) error { - q := h.queues.Get(l.RemoteTarget().Address()) - buffer := make(receiver, h.credit) - h.receivers[l] = buffer - l.Flow(cap(buffer)) // credit==cap(buffer) so we won't overflow the buffer. - go h.runReceive(l, buffer, q) - return nil +// An active link is represented by a sender or receiver value and a goroutine +// running its run() method. The run() method communicates with the handler via +// channels. +type link struct { + l proton.Link + q util.Queue + h *handler +} + +func makeLink(l proton.Link, q util.Queue, h *handler) link { + lnk := link{l: l, q: q, h: h} + return lnk +} + +// receiver has a channel to buffer messages that have been received by the +// handler and are waiting to go on the queue. AMQP credit ensures that the +// handler does not overflow the buffer and block. +type receiver struct { + link + buffer chan receivedMessage } -// runReceive moves messages from buffer to queue -func (h *handler) runReceive(l proton.Link, buffer receiver, q util.Queue) { - for rm := range buffer { - q <- rm.message - rm2 := rm // Save in temp var for injected closure - err := l.Connection().Injecter().Inject(func() { - rm2.delivery.Accept() - l.Flow(1) +// receivedMessage holds a message and a Delivery so that the message can be +// acknowledged when it is put on the queue. +type receivedMessage struct { + delivery proton.Delivery + message amqp.Message +} + +// startReceiver creates a receiver and a goroutine for its run() method. +func (h *handler) startReceiver(e proton.Event) { + q := h.queues.Get(e.Link().RemoteTarget().Address()) + r := &receiver{ + link: makeLink(e.Link(), q, h), + buffer: make(chan receivedMessage, *credit), + } + h.receivers[r.l] = r + r.l.Flow(cap(r.buffer)) // Give credit to fill the buffer to capacity. + go r.run() +} + +// run runs in a separate goroutine. It moves messages from the buffer to the +// queue for a receiver link, and injects a handler function to acknowledge the +// message and send a credit. +func (r *receiver) run() { + for rm := range r.buffer { + r.q <- rm.message + d := rm.delivery + // We are not in the handler goroutine so we Inject the accept function as a closure. + r.h.injecter.Inject(func() { + // Check that the receiver is still open, it may have been closed by the remote end. + if r == r.h.receivers[r.l] { + d.Accept() // Accept the delivery + r.l.Flow(1) // Add one credit + } }) - if err != nil { - util.Debugf("%s receive error: %s", l, err) - proton.CloseError(l, err) - } } } -// sender is called by the handler when a sender link opens. -// It sets up a sender structures in the handler. -func (h *handler) sender(l proton.Link) error { - h.senders[l] = make(sender) - return nil +// stop closes the buffer channel and waits for the run() goroutine to stop. +func (r *receiver) stop() { + close(r.buffer) } -// send one message in handler context, assumes we have credit. -func (h *handler) send(l proton.Link, m amqp.Message, q util.Queue) error { - delivery, err := l.Send(m) - if err != nil { - h.closeSender(l, err) - return err +// sender has a channel that is used to signal when there is credit to send messages. +type sender struct { + link + credit chan struct{} // Channel to signal availability of credit. +} + +// startSender creates a sender and starts a goroutine for sender.run() +func (h *handler) startSender(e proton.Event) { + q := h.queues.Get(e.Link().RemoteSource().Address()) + s := &sender{ + link: makeLink(e.Link(), q, h), + credit: make(chan struct{}, 1), // Capacity of 1 for signalling. } - delivery.Settle() // Pre-settled, unreliable. - util.Debugf("%s sent %s", l, util.FormatMessage(m)) - return nil + h.senders[e.Link()] = s + go s.run() } -// sendAll sends as many messages as possible without blocking, call in handler context. -// Returns the number of credits left, >0 means we ran out of messages. -func (h *handler) sendAll(l proton.Link, q util.Queue) (int, error) { - for l.Credit() > 0 { - select { - case m, ok := <-q: - if ok { // Got a message - if err := h.send(l, m, q); err != nil { - return 0, err - } - } else { // Queue is closed - l.Close() - return 0, io.EOF - } - default: // Queue empty - return l.Credit(), nil - } +// stop closes the credit channel and waits for the run() goroutine to stop. +func (s *sender) stop() { + close(s.credit) +} + +// sendable signals that the sender has credit, it does not block. +// sender.credit has capacity 1, if it is already full we carry on. +func (s *sender) sendable() { + select { // Non-blocking + case s.credit <- struct{}{}: + default: } - return l.Credit(), nil } -// sendWatch watches the queue for more messages and re-runs sendAll. -// Run in a separate goroutine, so must inject handler functions. -func (h *handler) sendWatch(l proton.Link, q util.Queue) { - select { - case m, ok := <-q: - l.Connection().Injecter().Inject(func() { - if ok { - if h.send(l, m, q) != nil { +// run runs in a separate goroutine. It monitors the queue for messages and injects +// a function to send them when there is credit +func (s *sender) run() { + var q chan amqp.Message // q is nil initially as we have no credit. + for { + select { + case _, ok := <-s.credit: + if !ok { // sender closed + return + } + q = s.q // We have credit, enable selecting on the queue. + + case m, ok := <-q: // q is only enabled when we have credit. + if !ok { // queue closed + return + } + q = nil // Assume all credit will be used used, will be signaled otherwise. + s.h.injecter.Inject(func() { // Inject handler function to actually send + if s.h.senders[s.l] != s { // The sender has been closed by the remote end. + go func() { q <- m }() // Put the message back on the queue but don't block return } - if n, err := h.sendAll(l, q); err != nil { + if s.sendOne(m) != nil { return - } else if n > 0 { - go h.sendWatch(l, q) // Start a new watcher. } - } - }) - case <-h.senders[l]: // Closed - return + // Send as many more messages as we can without blocking + for s.l.Credit() > 0 { + select { // Non blocking receive from q + case m, ok := <-s.q: + if ok { + s.sendOne(m) + } + default: // Queue is empty but we have credit, signal the run() goroutine. + s.sendable() + } + } + }) + } } } -// closeSender closes a sender link and signals goroutines processing that sender. -func (h *handler) closeSender(l proton.Link, err error) { - util.Debugf("%s sender closed: %s", l, err) - proton.CloseError(l, err) - close(h.senders[l]) - delete(h.senders, l) +// sendOne runs in the handler goroutine. It sends a single message. +func (s *sender) sendOne(m amqp.Message) error { + delivery, err := s.l.Send(m) + if err == nil { + delivery.Settle() // Pre-settled, unreliable. + util.Debugf("link %s sent %s", s.l, util.FormatMessage(m)) + } else { + go func() { s.q <- m }() // Put the message back on the queue but don't block + } + return err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/util/util.go ---------------------------------------------------------------------- diff --git a/examples/go/util/util.go b/examples/go/util/util.go index 5118467..20f2192 100644 --- a/examples/go/util/util.go +++ b/examples/go/util/util.go @@ -64,5 +64,5 @@ func FormatMessage(m amqp.Message) string { func init() { log.SetFlags(0) _, prog := path.Split(os.Args[0]) - log.SetPrefix(prog + ": ") + log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid())) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt index 51c2d86..74c7e13 100644 --- a/proton-c/bindings/go/CMakeLists.txt +++ b/proton-c/bindings/go/CMakeLists.txt @@ -75,14 +75,17 @@ foreach (pkg amqp proton electron) set(sources "${GoFiles}${CgoFiles}") # Build the package library - add_custom_command(OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends}) + add_custom_command( + OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set(target go-package-${pkg}) add_custom_target(${target} ALL DEPENDS ${lib}) # Package test go_sources(TestGoFiles) set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/${pkg}.test) - add_custom_command(OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package} + add_custom_command( + OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package} DEPENDS ${target} qpid-proton) add_custom_target(go-package-test-${pkg} ALL DEPENDS ${test_exe}) add_test(NAME go_test_${pkg} COMMAND ${test_exe} WORKING_DIRECTORY ${dir}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index bef8c17..d6761d6 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -28,131 +28,114 @@ import ( "qpid.apache.org/internal" "qpid.apache.org/proton" "sync" + "time" ) // Connection is an AMQP connection, created by a Container. type Connection interface { Endpoint - // Server puts the connection in server mode, must be called before Open(). - // - // A server connection will do protocol negotiation to accept a incoming AMQP - // connection. Normally you would call this for a connection created by - // net.Listener.Accept() - // - Server() - - // Listen arranges for endpoints opened by the remote peer to be passed to accept(). - // Listen() must be called before Connection.Open(). - // - // accept() is passed a Session, Sender or Receiver. It can examine endpoint - // properties and set some properties (e.g. Receiver.SetCapacity()) Returning nil - // will accept the endpoint, returning an error will reject it. - // - // accept() must not block or use the endpoint other than to examine or set - // properties. It can start a goroutine to process the Endpoint, or pass the - // Endpoint to another goroutine via a channel, and that goroutine can use - // the endpoint as normal. - // - // The default Listen function is RejectEndpoint which rejects all endpoints. - // You can call Listen(AcceptEndpoint) to accept all endpoints - Listen(accept func(Endpoint) error) - - // Open the connection, ready for use. - Open() error - // Sender opens a new sender on the DefaultSession. // // v can be a string, which is used as the Target address, or a SenderSettings // struct containing more details settings. - Sender(setting ...LinkSetting) (Sender, error) + Sender(...LinkSetting) (Sender, error) // Receiver opens a new Receiver on the DefaultSession(). // // v can be a string, which is used as the // Source address, or a ReceiverSettings struct containing more details // settings. - Receiver(setting ...LinkSetting) (Receiver, error) + Receiver(...LinkSetting) (Receiver, error) // DefaultSession() returns a default session for the connection. It is opened // on the first call to DefaultSession and returned on subsequent calls. DefaultSession() (Session, error) // Session opens a new session. - Session() (Session, error) + Session(...SessionSetting) (Session, error) // Container for the connection. Container() Container // Disconnect the connection abruptly with an error. Disconnect(error) -} -// AcceptEndpoint pass to Connection.Listen to accept all endpoints -func AcceptEndpoint(Endpoint) error { return nil } + // Wait waits for the connection to be disconnected. + Wait() error -// RejectEndpoint pass to Connection.Listen to reject all endpoints -func RejectEndpoint(Endpoint) error { - return amqp.Errorf(amqp.NotAllowed, "remote open rejected") + // WaitTimeout is like Wait but returns Timeout if the timeout expires. + WaitTimeout(time.Duration) error +} + +// ConnectionSetting can be passed when creating a connection. +// See functions that return ConnectionSetting for details +type ConnectionSetting func(*connection) + +// Server setting puts the connection in server mode. +// +// A server connection will do protocol negotiation to accept a incoming AMQP +// connection. Normally you would call this for a connection created by +// net.Listener.Accept() +// +func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } } + +// Accepter provides a function to be called when a connection receives an incoming +// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver. +// +// The accept() function must not block or use the accepted endpoint. +// It can pass the endpoint to another goroutine for processing. +// +// By default all incoming endpoints are rejected. +func Accepter(accept func(Incoming)) ConnectionSetting { + return func(c *connection) { c.accept = accept } } type connection struct { endpoint listenOnce, defaultSessionOnce, closeOnce sync.Once - // Set before Open() - container *container - conn net.Conn - accept func(Endpoint) error - - // Set by Open() + container *container + conn net.Conn + accept func(Incoming) handler *handler engine *proton.Engine err internal.ErrorHolder eConnection proton.Connection defaultSession Session + done chan struct{} } -func newConnection(conn net.Conn, cont *container) (*connection, error) { - c := &connection{container: cont, conn: conn, accept: RejectEndpoint} +func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) { + c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})} c.handler = newHandler(c) var err error c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) if err != nil { return nil, err } + for _, set := range setting { + set(c) + } c.str = c.engine.String() c.eConnection = c.engine.Connection() + go func() { c.engine.Run(); close(c.done) }() return c, nil } -func (c *connection) Server() { c.engine.Server() } - -func (c *connection) Listen(accept func(Endpoint) error) { c.accept = accept } - -func (c *connection) Open() error { - go c.engine.Run() - return nil -} - -func (c *connection) Close(err error) { c.engine.Close(err) } +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } -func (c *connection) Disconnect(err error) { c.engine.Disconnect(err) } +func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } -func (c *connection) closed(err error) { - // Call from another goroutine to initiate close without deadlock. - go c.Close(err) -} - -func (c *connection) Session() (Session, error) { +func (c *connection) Session(setting ...SessionSetting) (Session, error) { var s Session err := c.engine.InjectWait(func() error { eSession, err := c.engine.Connection().Session() if err == nil { eSession.Open() if err == nil { - s = newSession(c, eSession) + s = newSession(c, eSession, setting...) } } return err @@ -189,3 +172,47 @@ func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) { } func (c *connection) Connection() Connection { return c } + +func (c *connection) Wait() error { return c.WaitTimeout(Forever) } +func (c *connection) WaitTimeout(timeout time.Duration) error { + _, err := timedReceive(c.done, timeout) + if err == Timeout { + return Timeout + } + return c.Error() +} + +// Incoming is the interface for incoming requests to open an endpoint. +// Implementing types are IncomingSession, IncomingSender and IncomingReceiver. +type Incoming interface { + // Accept the endpoint with default settings. + // + // You must not use the returned endpoint in the accept() function that + // receives the Incoming value, but you can pass it to other goroutines. + // + // Implementing types provide type-specific Accept functions that take additional settings. + Accept() Endpoint + + // Reject the endpoint with an error + Reject(error) + + error() error +} + +type incoming struct { + err error + accepted bool +} + +func (i *incoming) Reject(err error) { i.err = err } + +func (i *incoming) error() error { + switch { + case i.err != nil: + return i.err + case !i.accepted: + return amqp.Errorf(amqp.NotAllowed, "remote open rejected") + default: + return nil + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go index 06a9a14..7bbc4b0 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go @@ -39,7 +39,7 @@ type Container interface { // setting any Connection properties you need to set. Note the net.Conn // can be an outgoing connection (e.g. made with net.Dial) or an incoming // connection (e.g. made with net.Listener.Accept()) - Connection(conn net.Conn) (Connection, error) + Connection(net.Conn, ...ConnectionSetting) (Connection, error) } type container struct { @@ -66,6 +66,6 @@ func (cont *container) nextLinkName() string { return cont.id + "@" + cont.linkNames.Next() } -func (cont *container) Connection(conn net.Conn) (Connection, error) { - return newConnection(conn, cont) +func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) { + return newConnection(conn, cont, setting...) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go index 1b07109..b518e42 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go @@ -27,7 +27,7 @@ import ( // NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. type handler struct { - delegator *proton.MessagingDelegator + delegator *proton.MessagingAdapter connection *connection links map[proton.Link]Link sentMessages map[proton.Delivery]*sentMessage @@ -41,8 +41,8 @@ func newHandler(c *connection) *handler { sentMessages: make(map[proton.Delivery]*sentMessage), sessions: make(map[proton.Session]*session), } - h.delegator = proton.NewMessagingDelegator(h) - // Disable auto features of MessagingDelegator, we do these ourselves. + h.delegator = proton.NewMessagingAdapter(h) + // Disable auto features of MessagingAdapter, we do these ourselves. h.delegator.Prefetch = 0 h.delegator.AutoAccept = false h.delegator.AutoSettle = false @@ -50,6 +50,10 @@ func newHandler(c *connection) *handler { return h } +func (h *handler) internalError(fmt string, arg ...interface{}) { + proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...)) +} + func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { switch t { @@ -57,9 +61,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) if r, ok := h.links[e.Link()].(*receiver); ok { r.message(e.Delivery()) } else { - proton.CloseError( - h.connection.eConnection, - amqp.Errorf(amqp.InternalError, "no receiver for link %s", e.Link())) + h.internalError("no receiver for link %s", e.Link()) } case proton.MSettled: @@ -68,19 +70,18 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } case proton.MSendable: - h.trySend(e.Link()) + if s, ok := h.links[e.Link()].(*sender); ok { + s.sendable() + } else { + h.internalError("no receiver for link %s", e.Link()) + } case proton.MSessionOpening: if e.Session().State().LocalUninit() { // Remotely opened - s := newSession(h.connection, e.Session()) - if err := h.connection.accept(s); err != nil { - proton.CloseError(e.Session(), (err)) - } else { - h.sessions[e.Session()] = s - if s.capacity > 0 { - e.Session().SetIncomingCapacity(s.capacity) - } - e.Session().Open() + incoming := &IncomingSession{h: h, pSession: e.Session()} + h.connection.accept(incoming) + if err := incoming.error(); err != nil { + proton.CloseError(e.Session(), err) } } @@ -95,33 +96,24 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) case proton.MLinkOpening: l := e.Link() - if l.State().LocalUninit() { // Remotely opened - ss := h.sessions[l.Session()] - if ss == nil { - proton.CloseError( - l, amqp.Errorf(amqp.InternalError, ("no session for link"))) - break - } - var link Link - if l.IsReceiver() { - r := &receiver{link: incomingLink(ss, l)} - link = r - r.inAccept = true - defer func() { r.inAccept = false }() - } else { - link = &sender{link: incomingLink(ss, l)} - } - if err := h.connection.accept(link); err != nil { - proton.CloseError(l, err) - break - } - link.open() + if l.State().LocalActive() { // Already opened locally. + break } - - case proton.MLinkOpened: - l := e.Link() - if l.IsSender() { - h.trySend(l) + ss := h.sessions[l.Session()] + if ss == nil { + h.internalError("no session for link %s", e.Link()) + break + } + var incoming Incoming + if l.IsReceiver() { + incoming = &IncomingReceiver{makeIncomingLink(ss, l)} + } else { + incoming = &IncomingSender{makeIncomingLink(ss, l)} + } + h.connection.accept(incoming) + if err := incoming.error(); err != nil { + proton.CloseError(l, err) + break } case proton.MLinkClosing: @@ -130,7 +122,17 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) case proton.MLinkClosed: h.linkClosed(e.Link(), proton.EndpointError(e.Link())) + case proton.MConnectionClosing: + h.connection.err.Set(e.Connection().RemoteCondition().Error()) + + case proton.MConnectionClosed: + h.connection.err.Set(Closed) // If no error already set, this is an orderly close. + case proton.MDisconnected: + h.connection.err.Set(e.Transport().Condition().Error()) + // If err not set at this point (e.g. to Closed) then this is unexpected. + h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)) + err := h.connection.Error() for l, _ := range h.links { h.linkClosed(l, err) @@ -154,19 +156,3 @@ func (h *handler) linkClosed(l proton.Link, err error) { func (h *handler) addLink(rl proton.Link, ll Link) { h.links[rl] = ll } - -func (h *handler) trySend(l proton.Link) { - if l.Credit() <= 0 { - return - } - if s, ok := h.links[l].(*sender); ok { - for ch := s.popBlocked(); l.Credit() > 0 && ch != nil; ch = s.popBlocked() { - if snd, ok := <-ch; ok { - s.doSend(snd) - } - } - } else { - h.connection.closed( - amqp.Errorf(amqp.InternalError, "cannot find sender for link %s", l)) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go index abc8431..4bef53b 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go @@ -60,39 +60,39 @@ type Link interface { open() } -// LinkSetting is a function that sets a link property. Passed when creating -// a Sender or Receiver, do not use at any other time. -type LinkSetting func(Link) +// LinkSetting can be passed when creating a sender or receiver. +// See functions that return LinkSetting for details +type LinkSetting func(*link) // Source sets address that messages are coming from. -func Source(s string) LinkSetting { return func(l Link) { l.(*link).source = s } } +func Source(s string) LinkSetting { return func(l *link) { l.source = s } } // Target sets address that messages are going to. -func Target(s string) LinkSetting { return func(l Link) { l.(*link).target = s } } +func Target(s string) LinkSetting { return func(l *link) { l.target = s } } // LinkName sets the link name. -func LinkName(s string) LinkSetting { return func(l Link) { l.(*link).target = s } } +func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } } // SndSettle sets the send settle mode -func SndSettle(m SndSettleMode) LinkSetting { return func(l Link) { l.(*link).sndSettle = m } } +func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } } // RcvSettle sets the send settle mode -func RcvSettle(m RcvSettleMode) LinkSetting { return func(l Link) { l.(*link).rcvSettle = m } } +func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } } // SndSettleMode defines when the sending end of the link settles message delivery. type SndSettleMode proton.SndSettleMode // Capacity sets the link capacity -func Capacity(n int) LinkSetting { return func(l Link) { l.(*link).capacity = n } } +func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } } // Prefetch sets a receivers pre-fetch flag. Not relevant for a sender. -func Prefetch(p bool) LinkSetting { return func(l Link) { l.(*link).prefetch = p } } +func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } } // AtMostOnce sets "fire and forget" mode, messages are sent but no // acknowledgment is received, messages can be lost if there is a network // failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst func AtMostOnce() LinkSetting { - return func(l Link) { + return func(l *link) { SndSettle(SndSettled)(l) RcvSettle(RcvFirst)(l) } @@ -104,7 +104,7 @@ func AtMostOnce() LinkSetting { // that the message will be received twice in this case. // Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst func AtLeastOnce() LinkSetting { - return func(l Link) { + return func(l *link) { SndSettle(SndUnsettled)(l) RcvSettle(RcvFirst)(l) } @@ -145,8 +145,6 @@ type link struct { session *session eLink proton.Link done chan struct{} // Closed when link is closed - - inAccept bool } func (l *link) Source() string { return l.source } @@ -163,8 +161,8 @@ func (l *link) engine() *proton.Engine { return l.session.connection.engine } func (l *link) handler() *handler { return l.session.connection.handler } // Set up link fields and open the proton.Link -func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error) { - l := &link{ +func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) { + l := link{ session: sn, isSender: isSender, capacity: 1, @@ -172,7 +170,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error done: make(chan struct{}), } for _, set := range setting { - set(l) + set(&l) } if l.linkName == "" { l.linkName = l.session.connection.container.nextLinkName() @@ -184,7 +182,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error } if l.eLink.IsNil() { l.err.Set(internal.Errorf("cannot create link %s", l)) - return nil, l.err.Get() + return l, l.err.Get() } l.eLink.Source().SetAddress(l.source) l.eLink.Target().SetAddress(l.target) @@ -195,20 +193,27 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error return l, nil } +type incomingLink struct { + incoming + link +} + // Set up a link from an incoming proton.Link. -func incomingLink(sn *session, eLink proton.Link) link { - l := link{ - session: sn, - isSender: eLink.IsSender(), - eLink: eLink, - source: eLink.RemoteSource().Address(), - target: eLink.RemoteTarget().Address(), - linkName: eLink.Name(), - sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()), - rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), - capacity: 1, - prefetch: false, - done: make(chan struct{}), +func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { + l := incomingLink{ + link: link{ + session: sn, + isSender: eLink.IsSender(), + eLink: eLink, + source: eLink.RemoteSource().Address(), + target: eLink.RemoteTarget().Address(), + linkName: eLink.Name(), + sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()), + rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), + capacity: 1, + prefetch: false, + done: make(chan struct{}), + }, } l.str = eLink.String() return l http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go index 92c0b90..59ac018 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go @@ -20,10 +20,9 @@ under the License. package electron import ( + "qpid.apache.org/amqp" "qpid.apache.org/internal" "qpid.apache.org/proton" - "qpid.apache.org/amqp" - "sync" "time" ) @@ -63,10 +62,6 @@ type Receiver interface { // Capacity is the size (number of messages) of the local message buffer // These are messages received but not yet returned to the application by a call to Receive() Capacity() int - - // SetCapacity sets Capacity and Prefetch of an accepted Receiver. - // May only be called in an accept() function, see Connection.Listen() - SetCapacity(capacity int, prefetch bool) } // Flow control policy for a receiver. @@ -120,18 +115,12 @@ func (p noPrefetchPolicy) Post(r *receiver, err error) { // Receiver implementation type receiver struct { link - buffer chan ReceivedMessage - policy policy - setupOnce sync.Once -} - -func (r *receiver) SetCapacity(capacity int, prefetch bool) { - internal.Assert(r.inAccept, "Receiver.SetCapacity called outside of accept function") - r.capacity = capacity - r.prefetch = prefetch + buffer chan ReceivedMessage + policy policy } -func (r *receiver) setup() { +func newReceiver(l link) *receiver { + r := &receiver{link: l} if r.capacity < 1 { r.capacity = 1 } @@ -141,6 +130,9 @@ func (r *receiver) setup() { r.policy = &noPrefetchPolicy{} } r.buffer = make(chan ReceivedMessage, r.capacity) + r.handler().addLink(r.eLink, r) + r.link.open() + return r } // call in proton goroutine. @@ -156,15 +148,14 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) { } func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { - r.setupOnce.Do(r.setup) internal.Assert(r.buffer != nil, "Receiver is not open: %s", r) r.policy.Pre(r) defer func() { r.policy.Post(r, err) }() - rmi, ok, timedout := timedReceive(r.buffer, timeout) - switch { - case timedout: + rmi, err := timedReceive(r.buffer, timeout) + switch err { + case Timeout: return ReceivedMessage{}, Timeout - case !ok: + case Closed: return ReceivedMessage{}, r.Error() default: return rmi.(ReceivedMessage), nil @@ -194,12 +185,6 @@ func (r *receiver) message(delivery proton.Delivery) { } } -func (r *receiver) open() { - r.setupOnce.Do(r.setup) - r.link.open() - r.handler().addLink(r.eLink, r) -} - func (r *receiver) closed(err error) { r.link.closed(err) if r.buffer != nil { @@ -230,3 +215,24 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) } // Reject is short for Acknowledge(Rejected) func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) } + +// IncomingReceiver is passed to the accept() function given to Connection.Listen() +// when there is an incoming request for a receiver link. +type IncomingReceiver struct { + incomingLink +} + +// Link provides information about the incoming link. +func (i *IncomingReceiver) Link() Link { return i } + +// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver. +func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver { + i.capacity = capacity + i.prefetch = prefetch + return i.Accept().(Receiver) +} + +func (i *IncomingReceiver) Accept() Endpoint { + i.accepted = true + return newReceiver(i.link) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go index 3124f74..68cfbb3 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go @@ -23,10 +23,9 @@ package electron import "C" import ( - "container/list" + "qpid.apache.org/amqp" "qpid.apache.org/internal" "qpid.apache.org/proton" - "qpid.apache.org/amqp" "reflect" "time" ) @@ -69,7 +68,7 @@ type sendMessage struct { type sender struct { link - blocked list.List // Channel of sendMessage for blocked senders. + credit chan struct{} // Signal available credit. } // Disposition indicates the outcome of a settled message delivery. @@ -102,31 +101,6 @@ func (d Disposition) String() string { } } -// Send a message, assumes there is credit -func (s *sender) doSend(snd sendMessage) { - delivery, err := s.eLink.Send(snd.m) - switch sm := snd.sm.(type) { - case nil: - delivery.Settle() - case *sentMessage: - sm.delivery = delivery - if err != nil { - sm.settled(err) - } else { - s.handler().sentMessages[delivery] = sm - } - default: - internal.Assert(false, "bad SentMessage type %T", snd.sm) - } -} - -func (s *sender) popBlocked() chan sendMessage { - if s.blocked.Len() > 0 { - return s.blocked.Remove(s.blocked.Front()).(chan sendMessage) - } - return nil -} - func (s *sender) Send(m amqp.Message) (SentMessage, error) { return s.SendTimeout(m, Forever) } @@ -152,52 +126,58 @@ func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error } func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) { - if s.Error() != nil { - return nil, s.Error() - } - var err error - if timeout == 0 { - err = s.engine().InjectWait(func() error { - if s.eLink.Credit() > 0 { - s.doSend(snd) - return nil - } - return Timeout - }) - } else { - buf := make(chan sendMessage) - done := make(chan struct{}) - defer close(buf) - s.engine().Inject(func() { // Runs concurrently - if s.eLink.Credit() > 0 { - s.doSend(snd) - close(done) // Signal already sent - } else { - s.blocked.PushBack(buf) - } - }) - select { - case <-done: // Sent without blocking - case buf <- snd: // Sent via blocking channel - case <-s.done: + if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit + if err == Closed { err = s.Error() - case <-After(timeout): - err = Timeout + internal.Assert(err != nil) } + return nil, err } - if err != nil { + if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil { return nil, err } return snd.sm, nil } +// Send a message. Handler goroutine +func (s *sender) doSend(snd sendMessage) { + delivery, err := s.eLink.Send(snd.m) + switch sm := snd.sm.(type) { + case nil: + delivery.Settle() + case *sentMessage: + sm.delivery = delivery + if err != nil { + sm.settled(err) + } else { + s.handler().sentMessages[delivery] = sm + } + default: + internal.Assert(false, "bad SentMessage type %T", snd.sm) + } + if s.eLink.Credit() > 0 { + s.sendable() // Signal credit. + } +} + +// Signal the sender has credit. Any goroutine. +func (s *sender) sendable() { + select { // Non-blocking + case s.credit <- struct{}{}: // Set the flag if not already set. + default: + } +} + func (s *sender) closed(err error) { s.link.closed(err) + close(s.credit) } -func (s *sender) open() { - s.link.open() +func newSender(l link) *sender { + s := &sender{link: l, credit: make(chan struct{}, 1)} s.handler().addLink(s.eLink, s) + s.link.open() + return s } // SentMessage represents a previously sent message. It allows you to wait for acknowledgement. @@ -285,7 +265,7 @@ func (sm *sentMessage) Disposition() (Disposition, error) { } func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) { - if _, _, timedout := timedReceive(sm.done, timeout); timedout { + if _, err := timedReceive(sm.done, timeout); err == Timeout { return sm.disposition, Timeout } else { return sm.disposition, sm.err @@ -317,3 +297,19 @@ func (sm *sentMessage) finish() { } func (sm *sentMessage) Error() error { return sm.err } + +// IncomingSender is passed to the accept() function given to Connection.Listen() +// when there is an incoming request for a sender link. +type IncomingSender struct { + incomingLink +} + +// Link provides information about the incoming link. +func (i *IncomingSender) Link() Link { return i } + +func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) } + +func (i *IncomingSender) Accept() Endpoint { + i.accepted = true + return newSender(i.link) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go index 612658a..3531da6 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go @@ -24,7 +24,6 @@ import ( ) // Session is an AMQP session, it contains Senders and Receivers. -// type Session interface { Endpoint @@ -36,10 +35,6 @@ type Session interface { // Source address, or a ReceiverSettings struct containing more details // settings. Receiver(...LinkSetting) (Receiver, error) - - // SetCapacity sets the session buffer capacity in bytes. - // Only has effect if called in an accept() function, see Connection.Listen() - SetCapacity(bytes uint) } type session struct { @@ -49,13 +44,27 @@ type session struct { capacity uint } +// SessionSetting can be passed when creating a sender or receiver. +// See functions that return SessionSetting for details +type SessionSetting func(*session) + +// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer.. +func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } } + // in proton goroutine -func newSession(c *connection, es proton.Session) *session { - return &session{ +func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session { + s := &session{ connection: c, eSession: es, endpoint: endpoint{str: es.String()}, } + for _, set := range setting { + set(s) + } + c.handler.sessions[s.eSession] = s + s.eSession.SetIncomingCapacity(s.capacity) + s.eSession.Open() + return s } func (s *session) Connection() Connection { return s.connection } @@ -71,8 +80,7 @@ func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) { err = s.engine().InjectWait(func() error { l, err := localLink(s, true, setting...) if err == nil { - snd = &sender{link: *l} - snd.(*sender).open() + snd = newSender(l) } return err }) @@ -83,8 +91,7 @@ func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) { err = s.engine().InjectWait(func() error { l, err := localLink(s, false, setting...) if err == nil { - rcv = &receiver{link: *l} - rcv.(*receiver).open() + rcv = newReceiver(l) } return err }) @@ -96,3 +103,23 @@ func (s *session) closed(err error) { s.err.Set(err) s.err.Set(Closed) } + +// IncomingSession is passed to the accept() function given to Connection.Listen() +// when there is an incoming session request. +type IncomingSession struct { + incoming + h *handler + pSession proton.Session + capacity uint +} + +// AcceptCapacity sets the session buffer capacity of an incoming session in bytes. +func (i *IncomingSession) AcceptSession(bytes uint) Session { + i.capacity = bytes + return i.Accept().(Session) +} + +func (i *IncomingSession) Accept() Endpoint { + i.accepted = true + return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/time.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go index ee61332..3407b82 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go @@ -49,24 +49,25 @@ const Forever time.Duration = -1 // timeout==0 means do a non-blocking receive attempt. timeout < 0 means block // forever. Other values mean block up to the timeout. // -func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) { +// Returns error Timeout on timeout, Closed on channel close. +func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) { cases := []reflect.SelectCase{ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)}, } - switch { - case timeout == 0: // Non-blocking receive + if timeout == 0 { // Non-blocking cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault}) - case timeout == Forever: // Block forever, nothing to add - default: // Block up to timeout + } else { // Block up to timeout cases = append(cases, - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))}) + reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))}) } - chosen, recv, recvOk := reflect.Select(cases) + chosen, value, ok := reflect.Select(cases) switch { + case !ok: + return nil, Closed case chosen == 0: - return recv.Interface(), recvOk, false + return value.Interface(), nil default: - return nil, false, true + return nil, Timeout } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go index e9d6d6f..51f70f8 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go @@ -23,9 +23,9 @@ You can write clients and servers using this library. This package is a port of the Proton C API into Go (see http://qpid.apache.org/proton) Go programmers may find the 'electron' package -more convenient, it provides a concurrent-safe API that allows you to do -procedural loops in goroutines rather than implementing event handlers that must -run in a single goroutine. +more convenient. qpid.apache.org/electron provides a concurrent-safe API that +allows you to run procedural loops in arbitrary goroutines rather than +implementing event handlers that must run in a single goroutine. Consult the C API documentation at http://qpid.apache.org/proton for more information about the types here. There is a 1-1 correspondence between C type @@ -42,21 +42,24 @@ goroutine that feeds events to a proton.MessagingHandler, which you must impleme See the Engine documentation for more. MessagingHandler defines an event handling interface that you can implement to -react to AMQP protocol events. (There is also a lower-level EventHandler, but -MessagingHandler provides a simpler set of events and automates common tasks for you.) +react to AMQP protocol events. There is also a lower-level EventHandler, but +MessagingHandler provides a simpler set of events and automates common tasks for you, +for most applications it will be more convenient. -All events generated by proton are handled in the single event-loop goroutine -associated with the Connection and Engine. You can use Engine.Inject() or -Engine.InjectWait() to inject additional functions into the event loop. Only -injected functions or handler functions can use proton types (such as Session, -Link etc.) Handlers and injected functions can set up channels to communicate -with other goroutines.. +NOTE: Methods on most types defined in this package (Sessions, Links etc.) can +*only* be called in the event handler goroutine of the relevant +Connection/Engine, either by the HandleEvent method of a handler type or in a +function injected into the goroutine via Inject() or InjectWait() Handlers and +injected functions can set up channels to communicate with other goroutines. +Note the Injecter associated with a handler available as part of the Event value +passed to HandleEvent. -Separate Connection and Engine instances are independent, and can run concurrently. +Separate Engine instances are independent, and can run concurrently. The 'electron' package is built on the proton package but instead offers a concurrent-safe API that can use simple procedural loops rather than event -handlers to express application logic. It may be easier to use w for some applications. +handlers to express application logic. It is easier to use for most +applications. */ package proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go index 5dc8727..2cebb49 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go @@ -39,15 +39,26 @@ import ( "unsafe" ) -// Injecter allows functions to be "injected" into an event-processing loop. +// Injecter allows functions to be "injected" into the event-processing loop, to +// be called in the same goroutine as event handlers. type Injecter interface { - // Inject a function into an event-loop concurrency context. + // Inject a function into the engine goroutine. // - // f() will be called in the same concurrency context as event handers, so it - // can safely use values that can used be used in that context. If f blocks it - // will block the event loop so be careful calling blocking functions in f. + // f() will be called in the same goroutine as event handlers, so it can safely + // use values belonging to event handlers without synchronization. f() should + // not block, no further events or injected functions can be processed until + // f() returns. // - // Returns a non-nil error if the function could not be injected. + // Returns a non-nil error if the function could not be injected and will + // never be called. Otherwise the function will eventually be called. + // + // Note that proton values (Link, Session, Connection etc.) that existed when + // Inject(f) was called may have become invalid by the time f() is executed. + // Handlers should handle keep track of Closed events to ensure proton values + // are not used after they become invalid. One technique is to have map from + // proton values to application values. Check that the map has the correct + // proton/application value pair at the start of the injected function and + // delete the value from the map when handling a Closed event. Inject(f func()) error // InjectWait is like Inject but does not return till f() has completed. @@ -72,19 +83,14 @@ func (b *bufferChan) buffer() []byte { } // Engine reads from a net.Conn, decodes AMQP events and calls the appropriate -// Handler functions in a single event-loop goroutine. Actions taken by Handler -// functions (such as sending messages) are encoded and written to the -// net.Conn. Create a engine with NewEngine() -// -// The Engine runs a proton event loop in the goroutine that calls Engine.Run() -// and creates goroutines to feed data to/from a net.Conn. You can create -// multiple Engines to handle multiple connections concurrently. +// Handler functions sequentially in a single goroutine. Actions taken by +// Handler functions (such as sending messages) are encoded and written to the +// net.Conn. You can create multiple Engines to handle multiple connections +// concurrently. // -// Methods on proton values defined in this package (Sessions, Links etc.) can -// only be called in the goroutine that executes the corresponding -// Engine.Run(). You implement the EventHandler or MessagingHandler interfaces -// and provide those values to NewEngine(). Their HandleEvent method will be -// called in the Engine goroutine, in typical event-driven style. +// You implement the EventHandler and/or MessagingHandler interfaces and provide +// those values to NewEngine(). Their HandleEvent method will be called in the +// event-handling goroutine. // // Handlers can pass values from an event (Connections, Links, Deliveries etc.) to // other goroutines, store them, or use them as map indexes. Effectively they are @@ -161,7 +167,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { } C.pn_connection_collect(eng.connection.pn, eng.collector) eng.connection.Open() - connectionContexts.Put(eng.connection, connectionContext{eng, eng.String()}) + connectionContexts.Put(eng.connection, connectionContext{eng.String()}) return eng, nil } @@ -223,6 +229,7 @@ func (eng *Engine) Server() { eng.transport.SetServer() } // Close the engine's connection, returns when the engine has exited. func (eng *Engine) Close(err error) { + eng.err.Set(err) eng.Inject(func() { CloseError(eng.connection, err) }) @@ -231,9 +238,7 @@ func (eng *Engine) Close(err error) { // Disconnect the engine's connection without and AMQP close, returns when the engine has exited. func (eng *Engine) Disconnect(err error) { - if err != nil { - eng.err.Set(err) - } + eng.err.Set(err) eng.conn.Close() <-eng.running } @@ -281,8 +286,8 @@ func (eng *Engine) Run() error { }() wbuf := eng.write.buffer()[:0] -loop: - for { + + for eng.err.Get() == nil { if len(wbuf) == 0 { eng.pop(&wbuf) } @@ -309,13 +314,12 @@ loop: eng.netError(err) } eng.process() - if eng.err.Get() != nil { - break loop - } } close(eng.write.buffers) eng.conn.Close() // Make sure connection is closed wait.Wait() + close(eng.running) // Signal goroutines have exited and Error is set. + connectionContexts.Delete(eng.connection) if !eng.connection.IsNil() { eng.connection.Free() @@ -332,15 +336,10 @@ loop: C.pn_handler_free(h.pn) } } - close(eng.running) // Signal goroutines have exited and Error is set. return eng.err.Get() } func (eng *Engine) netError(err error) { - if err == nil { - err = internal.Errorf("unknown network error") - } - eng.conn.Close() // Make sure both sides are closed eng.err.Set(err) eng.transport.CloseHead() eng.transport.CloseTail() @@ -389,17 +388,13 @@ func (eng *Engine) handle(e Event) { h.HandleEvent(e) } if e.Type() == ETransportClosed { - eng.err.Set(e.Connection().RemoteCondition().Error()) - eng.err.Set(e.Connection().Transport().Condition().Error()) - if eng.err.Get() == nil { - eng.err.Set(io.EOF) - } + eng.err.Set(io.EOF) } } func (eng *Engine) process() { for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { - eng.handle(makeEvent(ce)) + eng.handle(makeEvent(ce, eng)) C.pn_collector_pop(eng.collector) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go index 8a5cbf8..53b744c 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go @@ -191,7 +191,7 @@ type endpointDelegator struct { remoteOpen, remoteClose, localOpen, localClose EventType opening, opened, closing, closed, error MessagingEvent endpoint func(Event) Endpoint - delegator *MessagingDelegator + delegator *MessagingAdapter } // HandleEvent handles an open/close event for an endpoint in a generic way. @@ -240,10 +240,10 @@ func (d endpointDelegator) HandleEvent(e Event) { } } -// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. -// You can modify the exported fields before you pass the MessagingDelegator to +// MessagingAdapter implments a EventHandler and delegates to a MessagingHandler. +// You can modify the exported fields before you pass the MessagingAdapter to // a Engine. -type MessagingDelegator struct { +type MessagingAdapter struct { mhandler MessagingHandler connection, session, link endpointDelegator flowcontroller EventHandler @@ -261,8 +261,8 @@ type MessagingDelegator struct { PeerCloseError bool } -func NewMessagingDelegator(h MessagingHandler) *MessagingDelegator { - return &MessagingDelegator{ +func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter { + return &MessagingAdapter{ mhandler: h, flowcontroller: nil, AutoSettle: true, @@ -281,7 +281,7 @@ func handleIf(h EventHandler, e Event) { // Handle a proton event by passing the corresponding MessagingEvent(s) to // the MessagingHandler. -func (d *MessagingDelegator) HandleEvent(e Event) { +func (d *MessagingAdapter) HandleEvent(e Event) { handleIf(d.flowcontroller, e) switch e.Type() { @@ -352,7 +352,7 @@ func (d *MessagingDelegator) HandleEvent(e Event) { } } -func (d *MessagingDelegator) incoming(e Event) (err error) { +func (d *MessagingAdapter) incoming(e Event) (err error) { delivery := e.Delivery() if delivery.HasMessage() { d.mhandler.HandleMessagingEvent(MMessage, e) @@ -368,7 +368,7 @@ func (d *MessagingDelegator) incoming(e Event) (err error) { return } -func (d *MessagingDelegator) outgoing(e Event) (err error) { +func (d *MessagingAdapter) outgoing(e Event) (err error) { delivery := e.Delivery() if delivery.Updated() { switch delivery.Remote().Type() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go index 7d40890..45a6722 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -24,20 +24,20 @@ package proton //#include //#include -//#include -//#include //#include -//#include //#include -//#include //#include +//#include +//#include +//#include +//#include //#include import "C" import ( "fmt" - "qpid.apache.org/internal" "qpid.apache.org/amqp" + "qpid.apache.org/internal" "reflect" "time" "unsafe" @@ -45,34 +45,75 @@ import ( // TODO aconway 2015-05-05: Documentation for generated types. +// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the +// Go type implementing this interface. For low level, at-your-own-risk use only. +type CHandle interface { + // CPtr returns the unsafe C pointer, equivalent to a C void*. + CPtr() unsafe.Pointer +} + +// Incref increases the refcount of a proton value, which prevents the +// underlying C struct being freed until you call Decref(). +// +// It can be useful to "pin" a proton value in memory while it is in use by +// goroutines other than the event loop goroutine. For example if you Incref() a +// Link, the underlying object is not freed when the link is closed, so means +// other goroutines can continue to safely use it as an index in a map or inject +// it into the event loop goroutine. There will of course be an error if you try +// to use a link after it is closed, but not a segmentation fault. +func Incref(c CHandle) { + if p := c.CPtr(); p != nil { + C.pn_incref(p) + } +} + +// Decref decreases the refcount of a proton value, freeing the underlying C +// struct if this is the last reference. Only call this if you previously +// called Incref() for this value. +func Decref(c CHandle) { + if p := c.CPtr(); p != nil { + C.pn_decref(p) + } +} + // Event is an AMQP protocol event. type Event struct { pn *C.pn_event_t eventType EventType connection Connection + transport Transport session Session link Link delivery Delivery + injecter Injecter } -func makeEvent(pn *C.pn_event_t) Event { +func makeEvent(pn *C.pn_event_t, injecter Injecter) Event { return Event{ pn: pn, eventType: EventType(C.pn_event_type(pn)), connection: Connection{C.pn_event_connection(pn)}, + transport: Transport{C.pn_event_transport(pn)}, session: Session{C.pn_event_session(pn)}, link: Link{C.pn_event_link(pn)}, delivery: Delivery{C.pn_event_delivery(pn)}, + injecter: injecter, } } func (e Event) IsNil() bool { return e.eventType == EventType(0) } func (e Event) Type() EventType { return e.eventType } func (e Event) Connection() Connection { return e.connection } +func (e Event) Transport() Transport { return e.transport } func (e Event) Session() Session { return e.session } func (e Event) Link() Link { return e.link } func (e Event) Delivery() Delivery { return e.delivery } func (e Event) String() string { return e.Type().String() } +// Injecter should not be used in a handler function, but it can be passed to +// other goroutines (via a channel or to a goroutine started by handler +// functions) to let them inject functions back into the handlers goroutine. +func (e Event) Injecter() Injecter { return e.injecter } + // Data holds a pointer to decoded AMQP data. // Use amqp.marshal/unmarshal to access it as Go data types. // @@ -132,12 +173,14 @@ type Endpoint interface { String() string } -// CloseError sets an error condition on an endpoint and closes the endpoint. +// CloseError sets an error condition on an endpoint and closes the endpoint (if not already closed) func CloseError(e Endpoint, err error) { if err != nil { e.Condition().SetError(err) } - e.Close() + if e.State().RemoteActive() { + e.Close() + } } // EndpointError returns the remote error if there is one, the local error if not @@ -204,6 +247,20 @@ func (l Link) Delivery(tag string) Delivery { return Delivery{C.pn_delivery(l.pn, pnTag(tag))} } +func (l Link) Connection() Connection { return l.Session().Connection() } + +// Human-readable link description including name, source, target and direction. +func (l Link) String() string { + switch { + case l.IsNil(): + return fmt.Sprintf("") + case l.IsSender(): + return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address()) + default: + return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address()) + } +} + func cPtr(b []byte) *C.char { if len(b) == 0 { return nil @@ -229,20 +286,11 @@ func (s Session) Receiver(name string) Link { // Context information per connection. type connectionContext struct { - injecter Injecter - str string + str string } var connectionContexts = internal.MakeSafeMap() -// Injecter for event-loop associated with this connection. -func (c Connection) Injecter() Injecter { - if cc, ok := connectionContexts.Get(c).(connectionContext); ok { - return cc.injecter - } - return nil -} - // Unique (per process) string identifier for a connection, useful for debugging. func (c Connection) String() string { if cc, ok := connectionContexts.Get(c).(connectionContext); ok { @@ -279,20 +327,6 @@ func (s Session) String() string { return fmt.Sprintf("%s/%p", s.Connection(), s.pn) } -func (l Link) Connection() Connection { return l.Session().Connection() } - -// Human-readable link description including name, source, target and direction. -func (l Link) String() string { - switch { - case l.IsNil(): - return fmt.Sprintf("") - case l.IsSender(): - return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address()) - default: - return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address()) - } -} - // Error returns an instance of amqp.Error or nil. func (c Condition) Error() error { if c.IsNil() || !c.IsSet() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java index ac28b32..ef09837 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java @@ -48,7 +48,7 @@ public abstract class Terminus _dynamic = other._dynamic; if (other._dynamicNodeProperties != null) { // TODO: Do we need to copy or can we make a simple reference? - _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // FIXME + _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // yFIXME } if (other._capabilities != null) { _capabilities = other._capabilities.clone(); // FIXME? --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org