qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [38/50] [abbrv] qpid-proton git commit: PROTON-827: Re-create go worspace, cmake support for testing.
Date Mon, 28 Sep 2015 18:10:04 GMT
PROTON-827: Re-create go worspace, cmake support for testing.

Re-created the go workspace structure in the repository so it can be set as a GOPATH element.
ctest runs package tests and examples.
make install go code to share/gocode
config.sh sets env. vars for CGO compilation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/377ff9b1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/377ff9b1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/377ff9b1

Branch: refs/heads/proton-go
Commit: 377ff9b1ad8f76108d5926c427be34b013bdf79e
Parents: 867f607
Author: Alan Conway <aconway@redhat.com>
Authored: Wed May 20 15:30:38 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Mon Sep 28 14:08:23 2015 -0400

----------------------------------------------------------------------
 config.sh.in                                    |  23 +-
 examples/CMakeLists.txt                         |   3 +-
 examples/go/CMakeLists.txt                      |  29 +
 examples/go/README.md                           |  55 +-
 examples/go/event/broker.go                     | 255 -------
 examples/go/event_broker.go                     | 255 +++++++
 examples/go/example_test.go                     |  14 +-
 go                                              |   1 +
 go/README.md                                    | 142 ----
 go/amqp/doc.go                                  |  40 -
 go/amqp/interop                                 |   1 -
 go/amqp/interop_test.go                         | 308 --------
 go/amqp/marshal.go                              | 238 ------
 go/amqp/message.go                              | 342 ---------
 go/amqp/message_test.go                         |  90 ---
 go/amqp/types.go                                | 193 -----
 go/amqp/uid.go                                  |  40 -
 go/amqp/unmarshal.go                            | 552 --------------
 go/amqp/url.go                                  |  96 ---
 go/amqp/url_test.go                             |  51 --
 go/event/doc.go                                 |  38 -
 go/event/genwrap.go                             | 427 -----------
 go/event/handlers.go                            | 411 -----------
 go/event/message.go                             |  75 --
 go/event/pump.go                                | 357 ---------
 go/event/wrappers.go                            | 253 -------
 go/event/wrappers_gen.go                        | 732 -------------------
 go/internal/error.go                            | 125 ----
 go/messaging/doc.go                             |  28 -
 go/messaging/handler.go                         |  70 --
 go/messaging/messaging.go                       | 250 -------
 proton-c/bindings/CMakeLists.txt                |   9 +-
 proton-c/bindings/go/CMakeLists.txt             |  51 ++
 proton-c/bindings/go/README.md                  | 142 ++++
 proton-c/bindings/go/genwrap.go                 | 427 +++++++++++
 .../src/qpid.apache.org/proton/go/amqp/doc.go   |  40 +
 .../src/qpid.apache.org/proton/go/amqp/interop  |   1 +
 .../proton/go/amqp/interop_test.go              | 308 ++++++++
 .../qpid.apache.org/proton/go/amqp/marshal.go   | 238 ++++++
 .../qpid.apache.org/proton/go/amqp/message.go   | 342 +++++++++
 .../proton/go/amqp/message_test.go              |  90 +++
 .../src/qpid.apache.org/proton/go/amqp/types.go | 193 +++++
 .../src/qpid.apache.org/proton/go/amqp/uid.go   |  40 +
 .../qpid.apache.org/proton/go/amqp/unmarshal.go | 552 ++++++++++++++
 .../src/qpid.apache.org/proton/go/amqp/url.go   |  96 +++
 .../qpid.apache.org/proton/go/amqp/url_test.go  |  51 ++
 .../src/qpid.apache.org/proton/go/event/doc.go  |  38 +
 .../qpid.apache.org/proton/go/event/handlers.go | 411 +++++++++++
 .../qpid.apache.org/proton/go/event/message.go  |  75 ++
 .../src/qpid.apache.org/proton/go/event/pump.go | 357 +++++++++
 .../qpid.apache.org/proton/go/event/wrappers.go | 253 +++++++
 .../proton/go/event/wrappers_gen.go             | 732 +++++++++++++++++++
 .../qpid.apache.org/proton/go/internal/error.go | 125 ++++
 .../qpid.apache.org/proton/go/messaging/doc.go  |  28 +
 .../proton/go/messaging/handler.go              |  70 ++
 .../proton/go/messaging/messaging.go            | 250 +++++++
 56 files changed, 5256 insertions(+), 5157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 25d08b1..744ddb3 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -18,6 +18,16 @@
 # under the License.
 #
 
+merge_paths() {
+    # Merge paths, remove duplicates (keep first instance)
+    path=$(echo $* | sed 's/:/ /'g) # Split with spaces.
+    newpath=""
+    for d in $path; do		# Remove duplicates
+	{ echo $newpath | grep -q "\(:\|^\)$d\(:\|$\)"; } || newpath="$newpath:$d"
+    done
+    echo $newpath | sed 's/^://' # Remove leading :
+}
+
 PROTON_HOME=@CMAKE_SOURCE_DIR@
 PROTON_BUILD=@CMAKE_BINARY_DIR@
 
@@ -50,10 +60,17 @@ export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HO
 # Perl
 export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
 
+# Go
+export GOPATH="$(merge_paths $PROTON_HOME/proton-c/bindings/go $GOPATH)"
+# Help Go compiler find libraries and include files.
+export C_INCLUDE_PATH="$(merge_paths $PROTON_HOME/proton-c/include $PROTON_BUILD/proton-c/include
$C_INCLUDE_PATH)"
+export LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LIBRARY_PATH)"
+export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)"
+
+
+
 # test applications
-export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c"
-export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
-export PATH="$PATH:$PROTON_HOME/tests/python"
+export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c $PROTON_HOME/tests/tools/apps/python
$PROTON_HOME/tests/python)"
 
 # can the test harness use valgrind?
 if [[ -x "$(type -p valgrind)" ]] ; then

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 5724e59..35896a5 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -18,9 +18,10 @@
 #
 
 set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
-
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+
 add_subdirectory(c/messenger)
 if (BUILD_CPP)
   add_subdirectory(cpp)
 endif()
+add_subdirectory(go)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
new file mode 100644
index 0000000..464ed7c
--- /dev/null
+++ b/examples/go/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# FIXME aconway 2015-05-20:
+# - use proton build for Go includes & libs.
+# - pre-build go libraries? Respect user GOPATH?
+
+if(BUILD_GO)
+  add_test(
+    NAME go_example_test
+    COMMAND ${GO_TEST} example_test.go -rpath ${CMAKE_BINARY_DIR}/proton-c
+    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index 8acef36..52bd8e4 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -9,48 +9,33 @@ There are 3 go packages for proton:
 Most applications should use the `messaging` package. The `event` package is for
 applications that need low-level access to the proton engine.
 
-## messaging examples
+## Example programs
 
-- [receive.go](receive.go) receive from many connections concurrently.
-- [send.go](send.go) send to many connections concurrently.
+- [receive.go](receive.go) receive from many connections concurrently using messaging package.
+- [send.go](send.go) send to many connections concurrently using messaging package.
+- [event_broker.go](event_broker.go) simple mini-broker using event package.
 
-## event examples
+## Using the Go packages
 
-- [broker.go](event/broker.go) simple mini-broker, queues are created automatically.
+Set your GOPATH environment variable to include `/<path-to-proton>/proton-c/bindings/go`
 
-## Installing the proton Go packages
+The proton Go packages include C code so the cgo compiler needs to be able to
+find the proton library and include files.  There are a couple of ways to do this:
 
-You need to install proton in a standard place such as `/usr` or `/usr/local` so go
-can find the proton C headers and libraries to build the C parts of the packages.
+1. Build proton in directory `$BUILD`. Source the script `$BUILD/config.sh` to set your environment.
 
-You should create a go workspace and set GOPATH as described in https://golang.org/doc/code.html
+2. Install proton to a standard prefix such as `/usr` or `/usr/local`. No need for further
settings.
 
-To get the proton packages into your workspace you can clone the proton repository like this:
+3. Install proton to a non-standard prefix `$PREFIX`. Set the following:
 
-    git clone https://git.apache.org/qpid-proton.git $GOPATH/src/qpid.apache.org/proton
+        export LIBRARY_PATH=$PREFIX/lib:$LIBRARY_PATH
+        export C_INCLUDE_PATH=$PREFIX/include:$C_INCLUDE_PATH
+        export LD_LIBRARY_PATH=$PREFIX/lib:$LD_LIBRARY_PATH
 
-If you prefer to keep your proton clone elsewhere you can create a symlink to it in your
workspace.
-
-You can also use `go get` as follows:
-
-    go get qpid.apache.org/proton/go/messaging
-
-Once installed you can use godoc to look at docmentation on the commane line or start a
-godoc web server like this:
-
-	godoc -http=:6060
-
-And look at the docs in your browser.
-
-Right now the layout of the documentation is a bit messed up, showing the long
-path for imports, i.e.
-
-    qpid.apache.org/proton/proton-c/bindings/go/amqp
-
-In your code you should use:
-
-    qpid.apache.org/proton/go/amqp
+Once you are set up, the go tools will work as normal. You can see documentation
+in your web browser at `localhost:6060` by running:
 
+    godoc -http=:6060
 
 ## Running the examples
 
@@ -69,15 +54,15 @@ the example source have more details.
 
 First start the broker:
 
-    go run event/broker.go
+    go run event_broker.go
 
 Send messages concurrently to queues "foo" and "bar", 10 messages to each queue:
 
-    go run go/send.go -count 10 localhost:/foo localhost:/bar
+    go run send.go -count 10 localhost:/foo localhost:/bar
 
 Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 messages each on
2 queues:
 
-    go run go/receive.go -count 20 localhost:/foo localhost:/bar
+    go run receive.go -count 20 localhost:/foo localhost:/bar
 
 The broker and clients use the amqp port on the local host by default, to use a
 different address use the `-addr host:port` flag.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/go/event/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go
deleted file mode 100644
index 0cb4bfa..0000000
--- a/examples/go/event/broker.go
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-//
-// This is a simple AMQP broker implemented using the event-handler interface.
-//
-// It maintains a set of named in-memory queues of messages. Clients can send
-// messages to queues or subscribe to receive messages from them.
-//
-//
-
-package main
-
-import (
-	"container/list"
-	"flag"
-	"fmt"
-	"io"
-	"io/ioutil"
-	"log"
-	"net"
-	"os"
-	"path"
-	"qpid.apache.org/proton/go/amqp"
-	"qpid.apache.org/proton/go/event"
-	"sync"
-)
-
-// Command-line flags
-var addr = flag.String("addr", ":amqp", "Listening address")
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
-var full = flag.Bool("full", false, "Print full message not just body.")
-
-func main() {
-	flag.Usage = func() {
-		fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
-`, os.Args[0])
-		flag.PrintDefaults()
-	}
-	flag.Parse()
-	b := newBroker()
-	err := b.listen(*addr)
-	fatalIf(err)
-}
-
-// queue is a structure representing a queue.
-type queue struct {
-	name      string              // Name of queue
-	messages  *list.List          // List of event.Message
-	consumers map[event.Link]bool // Set of consumer links
-}
-
-type logLink event.Link // Wrapper to print links in format for logging
-
-func (ll logLink) String() string {
-	l := event.Link(ll)
-	return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
-}
-
-func (q *queue) subscribe(link event.Link) {
-	debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
-	q.consumers[link] = true
-}
-
-func (q *queue) unsubscribe(link event.Link) {
-	debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name)
-	delete(q.consumers, link)
-}
-
-func (q *queue) empty() bool {
-	return len(q.consumers) == 0 && q.messages.Len() == 0
-}
-
-func (q *queue) push(context *event.Pump, message amqp.Message) {
-	q.messages.PushBack(message)
-	q.pop(context)
-}
-
-func (q *queue) popTo(context *event.Pump, link event.Link) bool {
-	if q.messages.Len() != 0 && link.Credit() > 0 {
-		message := q.messages.Remove(q.messages.Front()).(amqp.Message)
-		debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message})
-		// The first return parameter is an event.Delivery.
-		// The Deliver can be used to track message status, e.g. so we can re-delver on failure.
-		// This demo broker doesn't do that.
-		linkPump := link.Session().Connection().Pump()
-		if context == linkPump {
-			if context == nil {
-				log.Fatal("pop in nil context")
-			}
-			link.Send(message) // link is in the current pump, safe to call Send() direct
-		} else {
-			linkPump.Inject <- func() { // Inject to link's pump
-				link.Send(message) // FIXME aconway 2015-05-04: error handlig
-			}
-		}
-		return true
-	}
-	return false
-}
-
-func (q *queue) pop(context *event.Pump) (popped bool) {
-	for c, _ := range q.consumers {
-		popped = popped || q.popTo(context, c)
-	}
-	return
-}
-
-// broker implements event.MessagingHandler and reacts to events by moving messages on or
off queues.
-type broker struct {
-	queues map[string]*queue
-	lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming...
-}
-
-func newBroker() *broker {
-	return &broker{queues: make(map[string]*queue)}
-}
-
-func (b *broker) getQueue(name string) *queue {
-	q := b.queues[name]
-	if q == nil {
-		debug.Printf("Create queue %s", name)
-		q = &queue{name, list.New(), make(map[event.Link]bool)}
-		b.queues[name] = q
-	}
-	return q
-}
-
-func (b *broker) unsubscribe(l event.Link) {
-	if l.IsSender() {
-		q := b.queues[l.RemoteSource().Address()]
-		if q != nil {
-			q.unsubscribe(l)
-			if q.empty() {
-				debug.Printf("Delete queue %s", q.name)
-				delete(b.queues, q.name)
-			}
-		}
-	}
-}
-
-func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error {
-	// FIXME aconway 2015-05-04: locking is un-golike, better example coming soon.
-	// Needed because the same handler is used for multiple connections concurrently
-	// and the queue data structures are not thread safe.
-	b.lock.Lock()
-	defer b.lock.Unlock()
-
-	switch t {
-
-	case event.MLinkOpening:
-		if e.Link().IsSender() {
-			q := b.getQueue(e.Link().RemoteSource().Address())
-			q.subscribe(e.Link())
-		}
-
-	case event.MLinkDisconnected, event.MLinkClosing:
-		b.unsubscribe(e.Link())
-
-	case event.MSendable:
-		q := b.getQueue(e.Link().RemoteSource().Address())
-		q.popTo(e.Connection().Pump(), e.Link())
-
-	case event.MMessage:
-		m, err := event.DecodeMessage(e)
-		fatalIf(err)
-		qname := e.Link().RemoteTarget().Address()
-		debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m})
-		b.getQueue(qname).push(e.Connection().Pump(), m)
-	}
-	return nil
-}
-
-func (b *broker) listen(addr string) (err error) {
-	// Use the standard Go "net" package to listen for connections.
-	listener, err := net.Listen("tcp", addr)
-	if err != nil {
-		return err
-	}
-	info.Printf("Listening on %s", listener.Addr())
-	defer listener.Close()
-	for {
-		conn, err := listener.Accept()
-		if err != nil {
-			info.Printf("Accept error: %s", err)
-			continue
-		}
-		pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
-		fatalIf(err)
-		info.Printf("Accepted %s[%p]", pump, pump)
-		pump.Server()
-		go func() {
-			pump.Run()
-			if pump.Error == nil {
-				info.Printf("Closed %s", pump)
-			} else {
-				info.Printf("Closed %s: %s", pump, pump.Error)
-			}
-		}()
-	}
-}
-
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-	if *verbose >= level {
-		return log.New(w, prefix, 0)
-	}
-	return log.New(ioutil.Discard, "", 0)
-}
-
-var info, debug *log.Logger
-
-func init() {
-	flag.Parse()
-	name := path.Base(os.Args[0])
-	log.SetFlags(0)
-	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
-	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
-	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
-}
-
-// Simple error handling for demo.
-func fatalIf(err error) {
-	if err != nil {
-		log.Fatal(err)
-	}
-}
-
-type formatMessage struct{ m amqp.Message }
-
-func (fm formatMessage) String() string {
-	if *full {
-		return fmt.Sprintf("%#v", fm.m)
-	} else {
-		return fmt.Sprintf("%#v", fm.m.Body())
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/go/event_broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go
new file mode 100644
index 0000000..0cb4bfa
--- /dev/null
+++ b/examples/go/event_broker.go
@@ -0,0 +1,255 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the event-handler interface.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+//
+
+package main
+
+import (
+	"container/list"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"net"
+	"os"
+	"path"
+	"qpid.apache.org/proton/go/amqp"
+	"qpid.apache.org/proton/go/event"
+	"sync"
+)
+
+// Command-line flags
+var addr = flag.String("addr", ":amqp", "Listening address")
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
+var full = flag.Bool("full", false, "Print full message not just body.")
+
+func main() {
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+	b := newBroker()
+	err := b.listen(*addr)
+	fatalIf(err)
+}
+
+// queue is a structure representing a queue.
+type queue struct {
+	name      string              // Name of queue
+	messages  *list.List          // List of event.Message
+	consumers map[event.Link]bool // Set of consumer links
+}
+
+type logLink event.Link // Wrapper to print links in format for logging
+
+func (ll logLink) String() string {
+	l := event.Link(ll)
+	return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
+}
+
+func (q *queue) subscribe(link event.Link) {
+	debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
+	q.consumers[link] = true
+}
+
+func (q *queue) unsubscribe(link event.Link) {
+	debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name)
+	delete(q.consumers, link)
+}
+
+func (q *queue) empty() bool {
+	return len(q.consumers) == 0 && q.messages.Len() == 0
+}
+
+func (q *queue) push(context *event.Pump, message amqp.Message) {
+	q.messages.PushBack(message)
+	q.pop(context)
+}
+
+func (q *queue) popTo(context *event.Pump, link event.Link) bool {
+	if q.messages.Len() != 0 && link.Credit() > 0 {
+		message := q.messages.Remove(q.messages.Front()).(amqp.Message)
+		debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message})
+		// The first return parameter is an event.Delivery.
+		// The Deliver can be used to track message status, e.g. so we can re-delver on failure.
+		// This demo broker doesn't do that.
+		linkPump := link.Session().Connection().Pump()
+		if context == linkPump {
+			if context == nil {
+				log.Fatal("pop in nil context")
+			}
+			link.Send(message) // link is in the current pump, safe to call Send() direct
+		} else {
+			linkPump.Inject <- func() { // Inject to link's pump
+				link.Send(message) // FIXME aconway 2015-05-04: error handlig
+			}
+		}
+		return true
+	}
+	return false
+}
+
+func (q *queue) pop(context *event.Pump) (popped bool) {
+	for c, _ := range q.consumers {
+		popped = popped || q.popTo(context, c)
+	}
+	return
+}
+
+// broker implements event.MessagingHandler and reacts to events by moving messages on or
off queues.
+type broker struct {
+	queues map[string]*queue
+	lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming...
+}
+
+func newBroker() *broker {
+	return &broker{queues: make(map[string]*queue)}
+}
+
+func (b *broker) getQueue(name string) *queue {
+	q := b.queues[name]
+	if q == nil {
+		debug.Printf("Create queue %s", name)
+		q = &queue{name, list.New(), make(map[event.Link]bool)}
+		b.queues[name] = q
+	}
+	return q
+}
+
+func (b *broker) unsubscribe(l event.Link) {
+	if l.IsSender() {
+		q := b.queues[l.RemoteSource().Address()]
+		if q != nil {
+			q.unsubscribe(l)
+			if q.empty() {
+				debug.Printf("Delete queue %s", q.name)
+				delete(b.queues, q.name)
+			}
+		}
+	}
+}
+
+func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error {
+	// FIXME aconway 2015-05-04: locking is un-golike, better example coming soon.
+	// Needed because the same handler is used for multiple connections concurrently
+	// and the queue data structures are not thread safe.
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
+	switch t {
+
+	case event.MLinkOpening:
+		if e.Link().IsSender() {
+			q := b.getQueue(e.Link().RemoteSource().Address())
+			q.subscribe(e.Link())
+		}
+
+	case event.MLinkDisconnected, event.MLinkClosing:
+		b.unsubscribe(e.Link())
+
+	case event.MSendable:
+		q := b.getQueue(e.Link().RemoteSource().Address())
+		q.popTo(e.Connection().Pump(), e.Link())
+
+	case event.MMessage:
+		m, err := event.DecodeMessage(e)
+		fatalIf(err)
+		qname := e.Link().RemoteTarget().Address()
+		debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m})
+		b.getQueue(qname).push(e.Connection().Pump(), m)
+	}
+	return nil
+}
+
+func (b *broker) listen(addr string) (err error) {
+	// Use the standard Go "net" package to listen for connections.
+	listener, err := net.Listen("tcp", addr)
+	if err != nil {
+		return err
+	}
+	info.Printf("Listening on %s", listener.Addr())
+	defer listener.Close()
+	for {
+		conn, err := listener.Accept()
+		if err != nil {
+			info.Printf("Accept error: %s", err)
+			continue
+		}
+		pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
+		fatalIf(err)
+		info.Printf("Accepted %s[%p]", pump, pump)
+		pump.Server()
+		go func() {
+			pump.Run()
+			if pump.Error == nil {
+				info.Printf("Closed %s", pump)
+			} else {
+				info.Printf("Closed %s: %s", pump, pump.Error)
+			}
+		}()
+	}
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
+	}
+	return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+	flag.Parse()
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+type formatMessage struct{ m amqp.Message }
+
+func (fm formatMessage) String() string {
+	if *full {
+		return fmt.Sprintf("%#v", fm.m)
+	} else {
+		return fmt.Sprintf("%#v", fm.m.Body())
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/377ff9b1/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index e059c28..a4b4c2c 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -25,6 +25,7 @@ package main
 import (
 	"bufio"
 	"bytes"
+	"flag"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -77,11 +78,11 @@ func (b *broker) check() error {
 
 // Start the demo broker, wait till it is listening on *addr. No-op if already started.
 func (b *broker) start() error {
-	build("event/broker.go")
+	build("event_broker.go")
 	if b.cmd == nil { // Not already started
 		// FIXME aconway 2015-04-30: better way to pick/configure a broker port.
 		b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000)
-		b.cmd = exec.Command(exepath("broker"), "-addr", b.addr, "-verbose", "0")
+		b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, "-verbose", "0")
 		b.runerr = make(chan error)
 		// Change the -verbose setting above to see broker output on stdout/stderr.
 		b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout
@@ -246,7 +247,12 @@ func init() {
 
 func build(prog string) {
 	if !built[prog] {
-		build := exec.Command("go", "build", path.Join(exampleDir, prog))
+		args := []string{"build"}
+		if *rpath != "" {
+			args = append(args, "-ldflags", "-r "+*rpath)
+		}
+		args = append(args, path.Join(exampleDir, prog))
+		build := exec.Command("go", args...)
 		build.Dir = binDir
 		out, err := build.CombinedOutput()
 		if err != nil {
@@ -256,6 +262,8 @@ func build(prog string) {
 	}
 }
 
+var rpath = flag.String("rpath", "", "Runtime path for test executables")
+
 func TestMain(m *testing.M) {
 	rand.Seed(time.Now().UTC().UnixNano())
 	var err error


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message