Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03B281826B for ; Fri, 22 May 2015 19:46:14 +0000 (UTC) Received: (qmail 84748 invoked by uid 500); 22 May 2015 19:46:13 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 84697 invoked by uid 500); 22 May 2015 19:46:13 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 84428 invoked by uid 99); 22 May 2015 19:46:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 May 2015 19:46:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9F677DFF7C; Fri, 22 May 2015 19:46:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Fri, 22 May 2015 19:46:19 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/8] qpid-proton git commit: PROTON-827: Re-create go worspace, cmake support for testing. PROTON-827: Re-create go worspace, cmake support for testing. Re-created the go workspace structure in the repository so it can be set as a GOPATH element. ctest runs package tests and examples. make install go code to share/gocode config.sh sets env. vars for CGO compilation. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c9257f47 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c9257f47 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c9257f47 Branch: refs/heads/go1 Commit: c9257f470bda6ce30e0c985cc35a166141dfe0c4 Parents: 5ea911e Author: Alan Conway Authored: Wed May 20 15:30:38 2015 -0400 Committer: Alan Conway Committed: Fri May 22 15:38:27 2015 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 1 + config.sh.in | 23 +- examples/CMakeLists.txt | 3 +- examples/go/CMakeLists.txt | 29 + examples/go/README.md | 72 +- examples/go/event/broker.go | 255 ------- examples/go/event_broker.go | 255 +++++++ examples/go/example_test.go | 14 +- go | 1 + go/README.md | 138 ---- go/amqp/doc.go | 40 - go/amqp/interop | 1 - go/amqp/interop_test.go | 308 -------- go/amqp/marshal.go | 238 ------ go/amqp/message.go | 342 --------- go/amqp/message_test.go | 90 --- go/amqp/types.go | 193 ----- go/amqp/uid.go | 40 - go/amqp/unmarshal.go | 552 -------------- go/amqp/url.go | 96 --- go/amqp/url_test.go | 51 -- go/event/doc.go | 38 - go/event/genwrap.go | 427 ----------- go/event/handlers.go | 411 ----------- go/event/message.go | 75 -- go/event/pump.go | 357 --------- go/event/wrappers.go | 253 ------- go/event/wrappers_gen.go | 732 ------------------- go/internal/error.go | 125 ---- go/messaging/doc.go | 28 - go/messaging/handler.go | 70 -- go/messaging/messaging.go | 250 ------- proton-c/CMakeLists.txt | 3 - proton-c/bindings/CMakeLists.txt | 8 +- proton-c/bindings/go | 1 - proton-c/bindings/go/CMakeLists.txt | 51 ++ proton-c/bindings/go/README.md | 137 ++++ proton-c/bindings/go/genwrap.go | 427 +++++++++++ .../src/qpid.apache.org/proton/go/amqp/doc.go | 40 + .../src/qpid.apache.org/proton/go/amqp/interop | 1 + .../proton/go/amqp/interop_test.go | 308 ++++++++ .../qpid.apache.org/proton/go/amqp/marshal.go | 238 ++++++ .../qpid.apache.org/proton/go/amqp/message.go | 342 +++++++++ .../proton/go/amqp/message_test.go | 90 +++ .../src/qpid.apache.org/proton/go/amqp/types.go | 193 +++++ .../src/qpid.apache.org/proton/go/amqp/uid.go | 40 + .../qpid.apache.org/proton/go/amqp/unmarshal.go | 552 ++++++++++++++ .../src/qpid.apache.org/proton/go/amqp/url.go | 96 +++ .../qpid.apache.org/proton/go/amqp/url_test.go | 51 ++ .../src/qpid.apache.org/proton/go/event/doc.go | 38 + .../qpid.apache.org/proton/go/event/handlers.go | 411 +++++++++++ .../qpid.apache.org/proton/go/event/message.go | 75 ++ .../src/qpid.apache.org/proton/go/event/pump.go | 357 +++++++++ .../qpid.apache.org/proton/go/event/wrappers.go | 253 +++++++ .../proton/go/event/wrappers_gen.go | 732 +++++++++++++++++++ .../qpid.apache.org/proton/go/internal/error.go | 125 ++++ .../qpid.apache.org/proton/go/messaging/doc.go | 28 + .../proton/go/messaging/handler.go | 70 ++ .../proton/go/messaging/messaging.go | 250 +++++++ 59 files changed, 5268 insertions(+), 5157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 2df2dfb..8790c57 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -141,6 +141,7 @@ if (BUILD_JAVA) endif() add_subdirectory(proton-c) +add_subdirectory(examples) install (FILES LICENSE README.md TODO DESTINATION ${PROTON_SHARE}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/config.sh.in ---------------------------------------------------------------------- diff --git a/config.sh.in b/config.sh.in index 4b60b2f..3ad8ba8 100755 --- a/config.sh.in +++ b/config.sh.in @@ -18,6 +18,16 @@ # under the License. # +merge_paths() { + # Merge paths, remove duplicates (keep first instance) + path=$(echo $* | sed 's/:/ /'g) # Split with spaces. + newpath="" + for d in $path; do # Remove duplicates + { echo $newpath | grep -q "\(:\|^\)$d\(:\|$\)"; } || newpath="$newpath:$d" + done + echo $newpath | sed 's/^://' # Remove leading : +} + PROTON_HOME=@CMAKE_SOURCE_DIR@ PROTON_BUILD=@CMAKE_BINARY_DIR@ @@ -50,10 +60,17 @@ export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HO # Perl export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib +# Go +export GOPATH="$(merge_paths $PROTON_HOME/proton-c/bindings/go $GOPATH)" +# Help Go compiler find libraries and include files. +export C_INCLUDE_PATH="$(merge_paths $PROTON_HOME/proton-c/include $PROTON_BUILD/proton-c/include $C_INCLUDE_PATH)" +export LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LIBRARY_PATH)" +export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)" + + + # test applications -export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c" -export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python" -export PATH="$PATH:$PROTON_HOME/tests/python" +export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)" # can the test harness use valgrind? if [[ -x "$(type -p valgrind)" ]] ; then http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index feac758..21878eb 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -18,6 +18,7 @@ # set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR}) - include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) + add_subdirectory(c/messenger) +add_subdirectory(go) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt new file mode 100644 index 0000000..464ed7c --- /dev/null +++ b/examples/go/CMakeLists.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# FIXME aconway 2015-05-20: +# - use proton build for Go includes & libs. +# - pre-build go libraries? Respect user GOPATH? + +if(BUILD_GO) + add_test( + NAME go_example_test + COMMAND ${GO_TEST} example_test.go -rpath ${CMAKE_BINARY_DIR}/proton-c + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/README.md ---------------------------------------------------------------------- diff --git a/examples/go/README.md b/examples/go/README.md index c81e8d3..9d0d738 100644 --- a/examples/go/README.md +++ b/examples/go/README.md @@ -9,48 +9,33 @@ There are 3 go packages for proton: Most applications should use the `messaging` package. The `event` package is for applications that need low-level access to the proton engine. -## messaging examples +## Example programs -- [receive.go](receive.go) receive from many connections concurrently. -- [send.go](send.go) send to many connections concurrently. +- [receive.go](receive.go) receive from many connections concurrently using messaging package. +- [send.go](send.go) send to many connections concurrently using messaging package. +- [event_broker.go](event_broker.go) simple mini-broker using event package. -## event examples +## Using the Go packages -- [broker.go](event/broker.go) simple mini-broker, queues are created automatically. +Set your GOPATH environment variable to include `//proton-c/bindings/go` -## Installing the proton Go packages +The proton Go packages include C code so the cgo compiler needs to be able to +find the proton library and include files. There are a couple of ways to do this: -You need to install proton in a standard place such as `/usr` or `/usr/local` so go -can find the proton C headers and libraries to build the C parts of the packages. +1. Build proton in directory `$BUILD`. Source the script `$BUILD/config.sh` to set your environment. -You should create a go workspace and set GOPATH as described in https://golang.org/doc/code.html +2. Install proton to a standard prefix such as `/usr` or `/usr/local`. No need for further settings. -To get the proton packages into your workspace you can clone the proton repository like this: +3. Install proton to a non-standard prefix `$PREFIX`. Set the following: - git clone https://git.apache.org/qpid-proton.git $GOPATH/src/qpid.apache.org/proton + export LIBRARY_PATH=$PREFIX/lib:$LIBRARY_PATH + export C_INCLUDE_PATH=$PREFIX/include:$C_INCLUDE_PATH + export LD_LIBRARY_PATH=$PREFIX/lib:$LD_LIBRARY_PATH -If you prefer to keep your proton clone elsewhere you can create a symlink to it in your workspace. - -You can also use `go get` as follows: - - go get qpid.apache.org/proton/go/messaging - -Once installed you can use godoc to look at docmentation on the commane line or start a -godoc web server like this: - - godoc -http=:6060 - -And look at the docs in your browser. - -Right now the layout of the documentation is a bit messed up, showing the long -path for imports, i.e. - - qpid.apache.org/proton/proton-c/bindings/go/amqp - -In your code you should use: - - qpid.apache.org/proton/go/amqp +Once you are set up, the go tools will work as normal. You can see documentation +in your web browser at `localhost:6060` by running: + godoc -http=:6060 ## Running the examples @@ -69,15 +54,15 @@ the example source have more details. First start the broker: - go run event/broker.go + go run event_broker.go Send messages concurrently to queues "foo" and "bar", 10 messages to each queue: - go run go/send.go -count 10 localhost:/foo localhost:/bar + go run send.go -count 10 localhost:/foo localhost:/bar Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 messages each on 2 queues: - go run go/receive.go -count 20 localhost:/foo localhost:/bar + go run receive.go -count 20 localhost:/foo localhost:/bar The broker and clients use the amqp port on the local host by default, to use a different address use the `-addr host:port` flag. @@ -91,3 +76,20 @@ Or use the Go broker and the python clients: python ../python/simple_send.py python ../python/simple_recv.py`. + +# Experimental `go get` support. + +BROKEN - DO NOT USE unless you are interested in helping us fix it :) + +We have `go get` meta tags set up at qpid.apache.org so you can do this: + + go get qpid.apache.org/proton/go/messaging + +This pulls the entire proton repo into your workspace. There is a "go" symlink in the +repo root so imports work correctly: `import qpid.apache.org/proton/go/messaging` + +However godoc (and I believe some other go tools) doesn't handle symlinks and is +confused by the other files in the repo. For example `godoc -http` will not show +the proper package names (because it ignores the symlink) but instead shows them +with the full `proton/proton-c/bindings/go/src...` path. It also shows bogus +empty packages corresponding to the examples/go directory. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/event/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go deleted file mode 100644 index 0cb4bfa..0000000 --- a/examples/go/event/broker.go +++ /dev/null @@ -1,255 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// This is a simple AMQP broker implemented using the event-handler interface. -// -// It maintains a set of named in-memory queues of messages. Clients can send -// messages to queues or subscribe to receive messages from them. -// -// - -package main - -import ( - "container/list" - "flag" - "fmt" - "io" - "io/ioutil" - "log" - "net" - "os" - "path" - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/event" - "sync" -) - -// Command-line flags -var addr = flag.String("addr", ":amqp", "Listening address") -var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") -var full = flag.Bool("full", false, "Print full message not just body.") - -func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, ` -Usage: %s -A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. -`, os.Args[0]) - flag.PrintDefaults() - } - flag.Parse() - b := newBroker() - err := b.listen(*addr) - fatalIf(err) -} - -// queue is a structure representing a queue. -type queue struct { - name string // Name of queue - messages *list.List // List of event.Message - consumers map[event.Link]bool // Set of consumer links -} - -type logLink event.Link // Wrapper to print links in format for logging - -func (ll logLink) String() string { - l := event.Link(ll) - return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) -} - -func (q *queue) subscribe(link event.Link) { - debug.Printf("link %s subscribed to queue %s", logLink(link), q.name) - q.consumers[link] = true -} - -func (q *queue) unsubscribe(link event.Link) { - debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name) - delete(q.consumers, link) -} - -func (q *queue) empty() bool { - return len(q.consumers) == 0 && q.messages.Len() == 0 -} - -func (q *queue) push(context *event.Pump, message amqp.Message) { - q.messages.PushBack(message) - q.pop(context) -} - -func (q *queue) popTo(context *event.Pump, link event.Link) bool { - if q.messages.Len() != 0 && link.Credit() > 0 { - message := q.messages.Remove(q.messages.Front()).(amqp.Message) - debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message}) - // The first return parameter is an event.Delivery. - // The Deliver can be used to track message status, e.g. so we can re-delver on failure. - // This demo broker doesn't do that. - linkPump := link.Session().Connection().Pump() - if context == linkPump { - if context == nil { - log.Fatal("pop in nil context") - } - link.Send(message) // link is in the current pump, safe to call Send() direct - } else { - linkPump.Inject <- func() { // Inject to link's pump - link.Send(message) // FIXME aconway 2015-05-04: error handlig - } - } - return true - } - return false -} - -func (q *queue) pop(context *event.Pump) (popped bool) { - for c, _ := range q.consumers { - popped = popped || q.popTo(context, c) - } - return -} - -// broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. -type broker struct { - queues map[string]*queue - lock sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming... -} - -func newBroker() *broker { - return &broker{queues: make(map[string]*queue)} -} - -func (b *broker) getQueue(name string) *queue { - q := b.queues[name] - if q == nil { - debug.Printf("Create queue %s", name) - q = &queue{name, list.New(), make(map[event.Link]bool)} - b.queues[name] = q - } - return q -} - -func (b *broker) unsubscribe(l event.Link) { - if l.IsSender() { - q := b.queues[l.RemoteSource().Address()] - if q != nil { - q.unsubscribe(l) - if q.empty() { - debug.Printf("Delete queue %s", q.name) - delete(b.queues, q.name) - } - } - } -} - -func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { - // FIXME aconway 2015-05-04: locking is un-golike, better example coming soon. - // Needed because the same handler is used for multiple connections concurrently - // and the queue data structures are not thread safe. - b.lock.Lock() - defer b.lock.Unlock() - - switch t { - - case event.MLinkOpening: - if e.Link().IsSender() { - q := b.getQueue(e.Link().RemoteSource().Address()) - q.subscribe(e.Link()) - } - - case event.MLinkDisconnected, event.MLinkClosing: - b.unsubscribe(e.Link()) - - case event.MSendable: - q := b.getQueue(e.Link().RemoteSource().Address()) - q.popTo(e.Connection().Pump(), e.Link()) - - case event.MMessage: - m, err := event.DecodeMessage(e) - fatalIf(err) - qname := e.Link().RemoteTarget().Address() - debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m}) - b.getQueue(qname).push(e.Connection().Pump(), m) - } - return nil -} - -func (b *broker) listen(addr string) (err error) { - // Use the standard Go "net" package to listen for connections. - listener, err := net.Listen("tcp", addr) - if err != nil { - return err - } - info.Printf("Listening on %s", listener.Addr()) - defer listener.Close() - for { - conn, err := listener.Accept() - if err != nil { - info.Printf("Accept error: %s", err) - continue - } - pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) - fatalIf(err) - info.Printf("Accepted %s[%p]", pump, pump) - pump.Server() - go func() { - pump.Run() - if pump.Error == nil { - info.Printf("Closed %s", pump) - } else { - info.Printf("Closed %s: %s", pump, pump.Error) - } - }() - } -} - -// Logging -func logger(prefix string, level int, w io.Writer) *log.Logger { - if *verbose >= level { - return log.New(w, prefix, 0) - } - return log.New(ioutil.Discard, "", 0) -} - -var info, debug *log.Logger - -func init() { - flag.Parse() - name := path.Base(os.Args[0]) - log.SetFlags(0) - log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. - info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. - debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. -} - -// Simple error handling for demo. -func fatalIf(err error) { - if err != nil { - log.Fatal(err) - } -} - -type formatMessage struct{ m amqp.Message } - -func (fm formatMessage) String() string { - if *full { - return fmt.Sprintf("%#v", fm.m) - } else { - return fmt.Sprintf("%#v", fm.m.Body()) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/event_broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go new file mode 100644 index 0000000..0cb4bfa --- /dev/null +++ b/examples/go/event_broker.go @@ -0,0 +1,255 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// +// This is a simple AMQP broker implemented using the event-handler interface. +// +// It maintains a set of named in-memory queues of messages. Clients can send +// messages to queues or subscribe to receive messages from them. +// +// + +package main + +import ( + "container/list" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "path" + "qpid.apache.org/proton/go/amqp" + "qpid.apache.org/proton/go/event" + "sync" +) + +// Command-line flags +var addr = flag.String("addr", ":amqp", "Listening address") +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") +var full = flag.Bool("full", false, "Print full message not just body.") + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, ` +Usage: %s +A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. +`, os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + b := newBroker() + err := b.listen(*addr) + fatalIf(err) +} + +// queue is a structure representing a queue. +type queue struct { + name string // Name of queue + messages *list.List // List of event.Message + consumers map[event.Link]bool // Set of consumer links +} + +type logLink event.Link // Wrapper to print links in format for logging + +func (ll logLink) String() string { + l := event.Link(ll) + return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) +} + +func (q *queue) subscribe(link event.Link) { + debug.Printf("link %s subscribed to queue %s", logLink(link), q.name) + q.consumers[link] = true +} + +func (q *queue) unsubscribe(link event.Link) { + debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name) + delete(q.consumers, link) +} + +func (q *queue) empty() bool { + return len(q.consumers) == 0 && q.messages.Len() == 0 +} + +func (q *queue) push(context *event.Pump, message amqp.Message) { + q.messages.PushBack(message) + q.pop(context) +} + +func (q *queue) popTo(context *event.Pump, link event.Link) bool { + if q.messages.Len() != 0 && link.Credit() > 0 { + message := q.messages.Remove(q.messages.Front()).(amqp.Message) + debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message}) + // The first return parameter is an event.Delivery. + // The Deliver can be used to track message status, e.g. so we can re-delver on failure. + // This demo broker doesn't do that. + linkPump := link.Session().Connection().Pump() + if context == linkPump { + if context == nil { + log.Fatal("pop in nil context") + } + link.Send(message) // link is in the current pump, safe to call Send() direct + } else { + linkPump.Inject <- func() { // Inject to link's pump + link.Send(message) // FIXME aconway 2015-05-04: error handlig + } + } + return true + } + return false +} + +func (q *queue) pop(context *event.Pump) (popped bool) { + for c, _ := range q.consumers { + popped = popped || q.popTo(context, c) + } + return +} + +// broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. +type broker struct { + queues map[string]*queue + lock sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming... +} + +func newBroker() *broker { + return &broker{queues: make(map[string]*queue)} +} + +func (b *broker) getQueue(name string) *queue { + q := b.queues[name] + if q == nil { + debug.Printf("Create queue %s", name) + q = &queue{name, list.New(), make(map[event.Link]bool)} + b.queues[name] = q + } + return q +} + +func (b *broker) unsubscribe(l event.Link) { + if l.IsSender() { + q := b.queues[l.RemoteSource().Address()] + if q != nil { + q.unsubscribe(l) + if q.empty() { + debug.Printf("Delete queue %s", q.name) + delete(b.queues, q.name) + } + } + } +} + +func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { + // FIXME aconway 2015-05-04: locking is un-golike, better example coming soon. + // Needed because the same handler is used for multiple connections concurrently + // and the queue data structures are not thread safe. + b.lock.Lock() + defer b.lock.Unlock() + + switch t { + + case event.MLinkOpening: + if e.Link().IsSender() { + q := b.getQueue(e.Link().RemoteSource().Address()) + q.subscribe(e.Link()) + } + + case event.MLinkDisconnected, event.MLinkClosing: + b.unsubscribe(e.Link()) + + case event.MSendable: + q := b.getQueue(e.Link().RemoteSource().Address()) + q.popTo(e.Connection().Pump(), e.Link()) + + case event.MMessage: + m, err := event.DecodeMessage(e) + fatalIf(err) + qname := e.Link().RemoteTarget().Address() + debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m}) + b.getQueue(qname).push(e.Connection().Pump(), m) + } + return nil +} + +func (b *broker) listen(addr string) (err error) { + // Use the standard Go "net" package to listen for connections. + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + info.Printf("Listening on %s", listener.Addr()) + defer listener.Close() + for { + conn, err := listener.Accept() + if err != nil { + info.Printf("Accept error: %s", err) + continue + } + pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) + fatalIf(err) + info.Printf("Accepted %s[%p]", pump, pump) + pump.Server() + go func() { + pump.Run() + if pump.Error == nil { + info.Printf("Closed %s", pump) + } else { + info.Printf("Closed %s: %s", pump, pump.Error) + } + }() + } +} + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} + +type formatMessage struct{ m amqp.Message } + +func (fm formatMessage) String() string { + if *full { + return fmt.Sprintf("%#v", fm.m) + } else { + return fmt.Sprintf("%#v", fm.m.Body()) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/example_test.go ---------------------------------------------------------------------- diff --git a/examples/go/example_test.go b/examples/go/example_test.go index e059c28..a4b4c2c 100644 --- a/examples/go/example_test.go +++ b/examples/go/example_test.go @@ -25,6 +25,7 @@ package main import ( "bufio" "bytes" + "flag" "fmt" "io" "io/ioutil" @@ -77,11 +78,11 @@ func (b *broker) check() error { // Start the demo broker, wait till it is listening on *addr. No-op if already started. func (b *broker) start() error { - build("event/broker.go") + build("event_broker.go") if b.cmd == nil { // Not already started // FIXME aconway 2015-04-30: better way to pick/configure a broker port. b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000) - b.cmd = exec.Command(exepath("broker"), "-addr", b.addr, "-verbose", "0") + b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, "-verbose", "0") b.runerr = make(chan error) // Change the -verbose setting above to see broker output on stdout/stderr. b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout @@ -246,7 +247,12 @@ func init() { func build(prog string) { if !built[prog] { - build := exec.Command("go", "build", path.Join(exampleDir, prog)) + args := []string{"build"} + if *rpath != "" { + args = append(args, "-ldflags", "-r "+*rpath) + } + args = append(args, path.Join(exampleDir, prog)) + build := exec.Command("go", args...) build.Dir = binDir out, err := build.CombinedOutput() if err != nil { @@ -256,6 +262,8 @@ func build(prog string) { } } +var rpath = flag.String("rpath", "", "Runtime path for test executables") + func TestMain(m *testing.M) { rand.Seed(time.Now().UTC().UnixNano()) var err error --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org