qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-827: Simplified examples and Connection error handling.
Date Fri, 29 May 2015 15:51:36 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/go1 c9257f470 -> 6113e247c


PROTON-827: Simplified examples and Connection error handling.

- Simplified non-relevant code in examples (logging, argument handling)
- Improved error handling on API see Connection.Error(). Need more on other endpoints.
- Added -debug flag to example_test to help debug example problems.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a301dea4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a301dea4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a301dea4

Branch: refs/heads/go1
Commit: a301dea4ef9649beec631486a0a50d4990a11c28
Parents: c9257f4
Author: Alan Conway <aconway@redhat.com>
Authored: Mon May 25 17:14:22 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Fri May 29 11:47:54 2015 -0400

----------------------------------------------------------------------
 examples/go/event_broker.go                     | 199 +++++++++----------
 examples/go/example_test.go                     |  30 +--
 examples/go/receive.go                          | 140 +++++--------
 examples/go/send.go                             | 108 ++++------
 proton-c/bindings/go/genwrap.go                 | 108 +++++-----
 .../src/qpid.apache.org/proton/go/event/pump.go |   5 +-
 .../qpid.apache.org/proton/go/internal/error.go |  25 +--
 .../proton/go/messaging/messaging.go            |  51 +++--
 8 files changed, 307 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/examples/go/event_broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go
index 0cb4bfa..2578bd5 100644
--- a/examples/go/event_broker.go
+++ b/examples/go/event_broker.go
@@ -31,97 +31,32 @@ import (
 	"container/list"
 	"flag"
 	"fmt"
-	"io"
-	"io/ioutil"
-	"log"
 	"net"
 	"os"
-	"path"
 	"qpid.apache.org/proton/go/amqp"
 	"qpid.apache.org/proton/go/event"
 	"sync"
 )
 
-// Command-line flags
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var debug = flag.Bool("debug", false, "Print detailed debug output")
 var addr = flag.String("addr", ":amqp", "Listening address")
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
 var full = flag.Bool("full", false, "Print full message not just body.")
 
 func main() {
-	flag.Usage = func() {
-		fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
-`, os.Args[0])
-		flag.PrintDefaults()
-	}
+	flag.Usage = usage
 	flag.Parse()
 	b := newBroker()
 	err := b.listen(*addr)
-	fatalIf(err)
-}
-
-// queue is a structure representing a queue.
-type queue struct {
-	name      string              // Name of queue
-	messages  *list.List          // List of event.Message
-	consumers map[event.Link]bool // Set of consumer links
-}
-
-type logLink event.Link // Wrapper to print links in format for logging
-
-func (ll logLink) String() string {
-	l := event.Link(ll)
-	return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
-}
-
-func (q *queue) subscribe(link event.Link) {
-	debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
-	q.consumers[link] = true
-}
-
-func (q *queue) unsubscribe(link event.Link) {
-	debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name)
-	delete(q.consumers, link)
-}
-
-func (q *queue) empty() bool {
-	return len(q.consumers) == 0 && q.messages.Len() == 0
-}
-
-func (q *queue) push(context *event.Pump, message amqp.Message) {
-	q.messages.PushBack(message)
-	q.pop(context)
-}
-
-func (q *queue) popTo(context *event.Pump, link event.Link) bool {
-	if q.messages.Len() != 0 && link.Credit() > 0 {
-		message := q.messages.Remove(q.messages.Front()).(amqp.Message)
-		debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message})
-		// The first return parameter is an event.Delivery.
-		// The Deliver can be used to track message status, e.g. so we can re-delver on failure.
-		// This demo broker doesn't do that.
-		linkPump := link.Session().Connection().Pump()
-		if context == linkPump {
-			if context == nil {
-				log.Fatal("pop in nil context")
-			}
-			link.Send(message) // link is in the current pump, safe to call Send() direct
-		} else {
-			linkPump.Inject <- func() { // Inject to link's pump
-				link.Send(message) // FIXME aconway 2015-05-04: error handlig
-			}
-		}
-		return true
-	}
-	return false
-}
-
-func (q *queue) pop(context *event.Pump) (popped bool) {
-	for c, _ := range q.consumers {
-		popped = popped || q.popTo(context, c)
-	}
-	return
+	exitIf(err)
 }
 
 // broker implements event.MessagingHandler and reacts to events by moving messages on or
off queues.
@@ -137,7 +72,7 @@ func newBroker() *broker {
 func (b *broker) getQueue(name string) *queue {
 	q := b.queues[name]
 	if q == nil {
-		debug.Printf("Create queue %s", name)
+		debugf("Create queue %s\n", name)
 		q = &queue{name, list.New(), make(map[event.Link]bool)}
 		b.queues[name] = q
 	}
@@ -150,7 +85,7 @@ func (b *broker) unsubscribe(l event.Link) {
 		if q != nil {
 			q.unsubscribe(l)
 			if q.empty() {
-				debug.Printf("Delete queue %s", q.name)
+				debugf("Delete queue %s\n", q.name)
 				delete(b.queues, q.name)
 			}
 		}
@@ -181,9 +116,9 @@ func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event)
 
 	case event.MMessage:
 		m, err := event.DecodeMessage(e)
-		fatalIf(err)
+		exitIf(err)
 		qname := e.Link().RemoteTarget().Address()
-		debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m})
+		debugf("link %s -> queue %s: %s\n", logLink(e.Link()), qname, formatMessage(m))
 		b.getQueue(qname).push(e.Connection().Pump(), m)
 	}
 	return nil
@@ -195,61 +130,111 @@ func (b *broker) listen(addr string) (err error) {
 	if err != nil {
 		return err
 	}
-	info.Printf("Listening on %s", listener.Addr())
+	fmt.Printf("Listening on %s\n", listener.Addr())
 	defer listener.Close()
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
-			info.Printf("Accept error: %s", err)
+			fmt.Fprintf(os.Stderr, "Accept error: %s\n", err)
 			continue
 		}
 		pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
-		fatalIf(err)
-		info.Printf("Accepted %s[%p]", pump, pump)
+		exitIf(err)
+		debugf("Accepted %s[%p]\n", pump, pump)
 		pump.Server()
 		go func() {
 			pump.Run()
 			if pump.Error == nil {
-				info.Printf("Closed %s", pump)
+				debugf("Closed %s\n", pump)
 			} else {
-				info.Printf("Closed %s: %s", pump, pump.Error)
+				debugf("Closed %s: %s\n", pump, pump.Error)
 			}
 		}()
 	}
 }
 
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-	if *verbose >= level {
-		return log.New(w, prefix, 0)
+// queue is a structure representing a queue.
+type queue struct {
+	name      string              // Name of queue
+	messages  *list.List          // List of event.Message
+	consumers map[event.Link]bool // Set of consumer links
+}
+
+type logLink event.Link // Wrapper to print links in useful format for logging
+
+func (ll logLink) String() string {
+	l := event.Link(ll)
+	return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
+}
+
+func (q *queue) subscribe(link event.Link) {
+	debugf("link %s subscribed to queue %s\n", logLink(link), q.name)
+	q.consumers[link] = true
+}
+
+func (q *queue) unsubscribe(link event.Link) {
+	debugf("link %s unsubscribed from queue %s\n", logLink(link), q.name)
+	delete(q.consumers, link)
+}
+
+func (q *queue) empty() bool {
+	return len(q.consumers) == 0 && q.messages.Len() == 0
+}
+
+func (q *queue) push(context *event.Pump, message amqp.Message) {
+	q.messages.PushBack(message)
+	q.pop(context)
+}
+
+func (q *queue) popTo(context *event.Pump, link event.Link) bool {
+	if q.messages.Len() != 0 && link.Credit() > 0 {
+		message := q.messages.Remove(q.messages.Front()).(amqp.Message)
+		debugf("link %s <- queue %s: %s\n", logLink(link), q.name, formatMessage(message))
+		// The first return parameter is an event.Delivery.
+		// The Deliver can be used to track message status, e.g. so we can re-delver on failure.
+		// This demo broker doesn't do that.
+		linkPump := link.Session().Connection().Pump()
+		if context == linkPump {
+			if context == nil {
+				exitIf(fmt.Errorf("pop in nil context"))
+			}
+			link.Send(message) // link is in the current pump, safe to call Send() direct
+		} else {
+			linkPump.Inject <- func() { // Inject to link's pump
+				link.Send(message) // FIXME aconway 2015-05-04: error handlig
+			}
+		}
+		return true
 	}
-	return log.New(ioutil.Discard, "", 0)
+	return false
 }
 
-var info, debug *log.Logger
+func (q *queue) pop(context *event.Pump) (popped bool) {
+	for c, _ := range q.consumers {
+		popped = popped || q.popTo(context, c)
+	}
+	return
+}
 
-func init() {
-	flag.Parse()
-	name := path.Base(os.Args[0])
-	log.SetFlags(0)
-	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
-	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
-	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+// Simple debug logging
+func debugf(format string, data ...interface{}) {
+	if *debug {
+		fmt.Fprintf(os.Stderr, format, data...)
+	}
 }
 
 // Simple error handling for demo.
-func fatalIf(err error) {
+func exitIf(err error) {
 	if err != nil {
-		log.Fatal(err)
+		fmt.Fprintln(os.Stderr, err)
+		os.Exit(1)
 	}
 }
 
-type formatMessage struct{ m amqp.Message }
-
-func (fm formatMessage) String() string {
+func formatMessage(m amqp.Message) string {
 	if *full {
-		return fmt.Sprintf("%#v", fm.m)
+		return fmt.Sprintf("%#v", m)
 	} else {
-		return fmt.Sprintf("%#v", fm.m.Body())
+		return fmt.Sprintf("%#v", m.Body())
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index a4b4c2c..8879c38 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -78,13 +78,11 @@ func (b *broker) check() error {
 
 // Start the demo broker, wait till it is listening on *addr. No-op if already started.
 func (b *broker) start() error {
-	build("event_broker.go")
 	if b.cmd == nil { // Not already started
 		// FIXME aconway 2015-04-30: better way to pick/configure a broker port.
 		b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000)
-		b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, "-verbose", "0")
+		b.cmd = exampleCommand("event_broker", "-addr", b.addr)
 		b.runerr = make(chan error)
-		// Change the -verbose setting above to see broker output on stdout/stderr.
 		b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout
 		go func() {
 			b.runerr <- b.cmd.Run()
@@ -111,7 +109,12 @@ func checkEqual(want interface{}, got interface{}) error {
 // runCommand returns an exec.Cmd to run an example.
 func exampleCommand(prog string, arg ...string) *exec.Cmd {
 	build(prog + ".go")
-	cmd := exec.Command(exepath(prog), arg...)
+	args := []string{}
+	if *debug {
+		args = append(args, "-debug=true")
+	}
+	args = append(args, arg...)
+	cmd := exec.Command(exepath(prog), args...)
 	cmd.Stderr = os.Stderr
 	return cmd
 }
@@ -149,16 +152,16 @@ func TestExampleSendReceive(t *testing.T) {
 	}
 	testBroker.start()
 	err := runExampleWant(
-		"send: Received all 15 acknowledgements\n",
+		"Received all 15 acknowledgements\n",
 		"send",
-		exampleArgs("-count", "5", "-verbose", "1")...)
+		exampleArgs("-count", "5")...)
 	if err != nil {
 		t.Fatal(err)
 	}
 	err = runExampleWant(
-		"receive: Listening\nreceive: Received 15 messages\n",
+		"Listening on 3 connections\nReceived 15 messages\n",
 		"receive",
-		exampleArgs("-verbose", "1", "-count", "15")...)
+		exampleArgs("-count", "15")...)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -187,7 +190,7 @@ func goReceiveWant(errchan chan<- error, want string, arg ...string)
*exec.Cmd {
 			errchan <- err
 			return
 		}
-		listening := "receive: Listening\n"
+		listening := "Listening on 3 connections\n"
 		if line != listening {
 			errchan <- checkEqual(listening, line)
 			return
@@ -209,8 +212,8 @@ func TestExampleReceiveSend(t *testing.T) {
 	testBroker.start()
 	recvErr := make(chan error)
 	recvCmd := goReceiveWant(recvErr,
-		"receive: Received 15 messages\n",
-		exampleArgs("-count", "15", "-verbose", "1")...)
+		"Received 15 messages\n",
+		exampleArgs("-count", "15")...)
 	defer func() {
 		recvCmd.Process.Kill()
 		recvCmd.Wait()
@@ -219,9 +222,9 @@ func TestExampleReceiveSend(t *testing.T) {
 		t.Fatal(err)
 	}
 	err := runExampleWant(
-		"send: Received all 15 acknowledgements\n",
+		"Received all 15 acknowledgements\n",
 		"send",
-		exampleArgs("-count", "5", "-verbose", "1")...)
+		exampleArgs("-count", "5")...)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -263,6 +266,7 @@ func build(prog string) {
 }
 
 var rpath = flag.String("rpath", "", "Runtime path for test executables")
+var debug = flag.Bool("debug", false, "Debugging output from examples")
 
 func TestMain(m *testing.M) {
 	rand.Seed(time.Now().UTC().UnixNano())

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
index e31862b..b1eb309 100644
--- a/examples/go/receive.go
+++ b/examples/go/receive.go
@@ -22,87 +22,65 @@ package main
 import (
 	"flag"
 	"fmt"
-	"io"
-	"io/ioutil"
-	"log"
-	"math"
 	"net"
 	"os"
-	"path"
 	"qpid.apache.org/proton/go/amqp"
 	"qpid.apache.org/proton/go/messaging"
 	"sync"
-	"time"
 )
 
-// Command-line flags
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
-var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.")
-var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.")
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
 var full = flag.Bool("full", false, "Print full message not just body.")
 
 func main() {
-	// Parse flags and arguments, print usage message on error.
-	flag.Usage = func() {
-		fmt.Fprintf(os.Stderr, `
-Usage: %s url [url ...]
-Receive messages from all the listed URLs concurrently and print them.
-`, os.Args[0])
-		flag.PrintDefaults()
-	}
+	flag.Usage = usage
 	flag.Parse()
+
 	urls := flag.Args() // Non-flag arguments are URLs to receive from
 	if len(urls) == 0 {
-		flag.Usage()
-		fmt.Fprintf(os.Stderr, "No URL provided")
+		fmt.Fprintln(os.Stderr, "No URL provided")
+		usage()
 		os.Exit(1)
 	}
-	duration := time.Duration(*timeout) * time.Second
-	if duration == 0 {
-		duration = time.Duration(math.MaxInt64) // Not forever, but 290 years is close enough.
-	}
-	if *count == 0 {
-		*count = math.MaxInt64
-	}
-
-	// Create a goroutine for each URL that receives messages and sends them to
-	// the messages channel. main() receives and prints them.
 
 	messages := make(chan amqp.Message) // Channel for messages from goroutines to main()
 	stop := make(chan struct{})         // Closing this channel means the program is stopping.
+	var wait sync.WaitGroup             // Used by main() to wait for all goroutines to end.
+	wait.Add(len(urls))                 // Wait for one goroutine per URL.
 
-	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
-
-	wait.Add(len(urls)) // Wait for one goroutine per URL.
-
-	// Arrange to close all connections on exit
-	connections := make([]*messaging.Connection, len(urls))
-	defer func() {
-		for _, c := range connections {
-			if c != nil {
-				c.Close()
-			}
-		}
-	}()
+	connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on
exit
 
+	// Start a goroutine to for each URL to receive messages and send them to the messages channel.
+	// main() receives and prints them.
 	for i, urlStr := range urls {
-		debug.Printf("Connecting to %s", urlStr)
-		go func(urlStr string) {
-			defer wait.Done()                 // Notify main() that this goroutine is done.
+		debugf("debug: Connecting to %s\n", urlStr)
+		go func(urlStr string) { // Start the goroutine
+
+			defer wait.Done()                 // Notify main() when this goroutine is done.
 			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			fatalIf(err)
+			exitIf(err)
 
 			// Open a standard Go net.Conn and and AMQP connection using it.
 			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			fatalIf(err)
+			exitIf(err)
 			pc, err := messaging.Connect(conn) // This is our AMQP connection.
-			fatalIf(err)
-			connections[i] = pc // So we can close it when main() ends
+			exitIf(err)
+			connections[i] = pc // Save connection so it will be closed when main() ends
 
 			// Create a receiver using the path of the URL as the AMQP address
 			r, err := pc.Receiver(url.Path)
-			fatalIf(err)
+			exitIf(err)
 
+			// Loop receiving messages
 			for {
 				var m amqp.Message
 				select { // Receive a message or stop.
@@ -118,57 +96,43 @@ Receive messages from all the listed URLs concurrently and print them.
 			}
 		}(urlStr)
 	}
-	info.Printf("Listening")
-
-	// time.After() returns a channel that will close when the timeout is up.
-	timer := time.After(duration)
-
-	// main() prints each message and checks for count or timeout being exceeded.
-	for i := int64(0); i < *count; i++ {
-		select {
-		case m := <-messages:
-			debug.Print(formatMessage{m})
-		case <-timer: // Timeout has expired
-			i = 0
-		}
+
+	// All goroutines are started, we are receiving messages.
+	fmt.Printf("Listening on %d connections\n", len(urls))
+
+	// print each message until the count is exceeded.
+	for i := uint64(0); i < *count; i++ {
+		debugf("%s\n", formatMessage(<-messages))
 	}
-	info.Printf("Received %d messages", *count)
+	fmt.Printf("Received %d messages\n", *count)
 	close(stop) // Signal all goroutines to stop.
 	wait.Wait() // Wait for all goroutines to finish.
-}
-
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-	if *verbose >= level {
-		return log.New(w, prefix, 0)
+	close(messages)
+	for _, c := range connections { // Close all connections
+		if c != nil {
+			c.Close()
+		}
 	}
-	return log.New(ioutil.Discard, "", 0)
 }
 
-var info, debug *log.Logger
-
-func init() {
-	flag.Parse()
-	name := path.Base(os.Args[0])
-	log.SetFlags(0)                                               // Use default logger for
errors.
-	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
-	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
-	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+// Simple debug logging
+func debugf(format string, data ...interface{}) {
+	if *debug {
+		fmt.Fprintf(os.Stderr, format, data...)
+	}
 }
 
 // Simple error handling for demo.
-func fatalIf(err error) {
+func exitIf(err error) {
 	if err != nil {
-		log.Fatal(err)
+		fmt.Fprintln(os.Stderr, err)
 	}
 }
 
-type formatMessage struct{ m amqp.Message }
-
-func (fm formatMessage) String() string {
+func formatMessage(m amqp.Message) string {
 	if *full {
-		return fmt.Sprintf("%#v", fm.m)
+		return fmt.Sprintf("%#v", m)
 	} else {
-		return fmt.Sprintf("%#v", fm.m.Body())
+		return fmt.Sprintf("%#v", m.Body())
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
index 4aaeb43..98acefa 100644
--- a/examples/go/send.go
+++ b/examples/go/send.go
@@ -22,21 +22,23 @@ package main
 import (
 	"flag"
 	"fmt"
-	"io"
-	"io/ioutil"
-	"log"
-	"math"
 	"net"
 	"os"
-	"path"
 	"qpid.apache.org/proton/go/amqp"
 	"qpid.apache.org/proton/go/messaging"
 	"sync"
 )
 
-// Command-line flags
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
-var count = flag.Int64("count", 1, "Send this may messages per address. 0 means unlimited.")
+// Usage and command-line flags
+func usage() {
+	fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Send messages to each URL concurrently with body "<url-path>-<n>" where n is
the message number.
+`, os.Args[0])
+	flag.PrintDefaults()
+}
+
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var count = flag.Int64("count", 1, "Send this may messages per address.")
 
 // Ack associates an info string with an acknowledgement
 type Ack struct {
@@ -45,67 +47,51 @@ type Ack struct {
 }
 
 func main() {
-	// Parse flags and arguments, print usage message on error.
-	flag.Usage = func() {
-		fmt.Fprintf(os.Stderr, `
-Usage: %s url [url ...]
-Send messages to all the listed URLs concurrently.
-To each URL, send the string "path-n" where n is the message number.
-`, os.Args[0])
-		flag.PrintDefaults()
-	}
+	flag.Usage = usage
 	flag.Parse()
+
 	urls := flag.Args() // Non-flag arguments are URLs to receive from
 	if len(urls) == 0 {
+		fmt.Fprintln(os.Stderr, "No URL provided")
 		flag.Usage()
-		fmt.Fprintf(os.Stderr, "No URL provided\n")
 		os.Exit(1)
 	}
-	if *count == 0 {
-		*count = math.MaxInt64
-	}
-
-	// Create a channel to receive all the acknowledgements
-	acks := make(chan Ack)
 
-	// Create a goroutine for each URL that sends messages.
+	acks := make(chan Ack)  // Channel to receive all the acknowledgements
 	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
 	wait.Add(len(urls))     // Wait for one goroutine per URL.
 
-	// Arrange to close all connections on exit
-	connections := make([]*messaging.Connection, len(urls))
-	defer func() {
-		for _, c := range connections {
-			if c != nil {
-				c.Close()
-			}
-		}
-	}()
+	connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on
exit
 
+	// Start a goroutine for each URL to send messages, receive the acknowledgements and
+	// send them to the acks channel.
 	for i, urlStr := range urls {
-		debug.Printf("Connecting to %v", urlStr)
+		debugf("Connecting to %v\n", urlStr)
 		go func(urlStr string) {
+
 			defer wait.Done()                 // Notify main() that this goroutine is done.
 			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			fatalIf(err)
+			exitIf(err)
 
 			// Open a standard Go net.Conn and and AMQP connection using it.
 			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			fatalIf(err)
+			exitIf(err)
 			pc, err := messaging.Connect(conn) // This is our AMQP connection.
-			fatalIf(err)
-			connections[i] = pc // So we can close it when main() ends
+			exitIf(err)
+			connections[i] = pc // Save connection so it will be closed when main() ends
 
 			// Create a sender using the path of the URL as the AMQP address
 			s, err := pc.Sender(url.Path)
-			fatalIf(err)
+			exitIf(err)
 
+			// Loop sending messages, receiving acknowledgements and sending them to the acks channel.
 			for i := int64(0); i < *count; i++ {
 				m := amqp.NewMessage()
 				body := fmt.Sprintf("%v-%v", url.Path, i)
 				m.SetBody(body)
+				// Note Send is *asynchronous*, ack is a channel that will receive the acknowledgement.
 				ack, err := s.Send(m)
-				fatalIf(err)
+				exitIf(err)
 				acks <- Ack{ack, body} // Send the acknowledgement to main()
 			}
 		}(urlStr)
@@ -113,44 +99,38 @@ To each URL, send the string "path-n" where n is the message number.
 
 	// Wait for all the acknowledgements
 	expect := int(*count) * len(urls)
-	debug.Printf("Started senders, expect %v acknowledgements", expect)
+	debugf("Started senders, expect %v acknowledgements\n", expect)
 	for i := 0; i < expect; i++ {
 		ack, ok := <-acks
 		if !ok {
-			info.Fatalf("acks channel closed after only %d acks\n", i)
+			exitIf(fmt.Errorf("acks channel closed after only %d acks\n", i))
 		}
 		d := <-ack.ack
-		debug.Printf("acknowledgement[%v] %v", i, ack.info)
+		debugf("acknowledgement[%v] %v\n", i, ack.info)
 		if d != messaging.Accepted {
-			info.Printf("Unexpected disposition %v", d)
+			fmt.Printf("Unexpected disposition %v\n", d)
 		}
 	}
-	info.Printf("Received all %v acknowledgements", expect)
-	wait.Wait() // Wait for all goroutines to finish.
-}
+	fmt.Printf("Received all %v acknowledgements\n", expect)
 
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-	if *verbose >= level {
-		return log.New(w, prefix, 0)
+	wait.Wait()                     // Wait for all goroutines to finish.
+	for _, c := range connections { // Close all connections
+		if c != nil {
+			c.Close()
+		}
 	}
-	return log.New(ioutil.Discard, "", 0)
 }
 
-var info, debug *log.Logger
-
-func init() {
-	flag.Parse()
-	name := path.Base(os.Args[0])
-	log.SetFlags(0)                                               // Use default logger for
errors.
-	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
-	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
-	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+// Simple debug logging
+func debugf(format string, data ...interface{}) {
+	if *debug {
+		fmt.Fprintf(os.Stderr, format, data...)
+	}
 }
 
 // Simple error handling for demo.
-func fatalIf(err error) {
+func exitIf(err error) {
 	if err != nil {
-		log.Fatal(err)
+		fmt.Fprintln(os.Stderr, err)
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index 094b196..27e5966 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -35,6 +35,59 @@ import (
 	"text/template"
 )
 
+var includeProton = "../../include/proton"
+var outpath = "src/qpid.apache.org/proton/go/event/wrappers_gen.go"
+
+func main() {
+	flag.Parse()
+	out, err := os.Create(outpath)
+	panicIf(err)
+	defer out.Close()
+
+	apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus",
"connection"}
+	fmt.Fprintln(out, copyright)
+	fmt.Fprint(out, `
+package event
+
+import (
+	"time"
+  "unsafe"
+  "qpid.apache.org/proton/go/internal"
+)
+
+// #include <proton/types.h>
+// #include <proton/event.h>
+// #include <stdlib.h>
+`)
+	for _, api := range apis {
+		fmt.Fprintf(out, "// #include <proton/%s.h>\n", api)
+	}
+	fmt.Fprintln(out, `import "C"`)
+
+	event(out)
+
+	for _, api := range apis {
+		fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api)
+		header := readHeader(api)
+		enums := findEnums(header)
+		for _, e := range enums {
+			genEnum(out, e.Name, e.Values)
+		}
+		apiWrapFns(api, header, out)
+	}
+	out.Close()
+
+	// Run gofmt.
+	cmd := exec.Command("gofmt", "-w", outpath)
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+	err = cmd.Run()
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "gofmt: %s", err)
+		os.Exit(1)
+	}
+}
+
 func mixedCase(s string) string {
 	result := ""
 	for _, w := range strings.Split(s, "_") {
@@ -96,7 +149,7 @@ func panicIf(err error) {
 }
 
 func readHeader(name string) string {
-	file, err := os.Open(path.Join(*includeProton, name+".h"))
+	file, err := os.Open(path.Join(includeProton, name+".h"))
 	panicIf(err)
 	defer file.Close()
 	s, err := ioutil.ReadAll(file)
@@ -372,56 +425,3 @@ func apiWrapFns(api, header string, out io.Writer) {
 		fmt.Fprintf(out, "}\n")
 	}
 }
-
-var includeProton = flag.String("include", "", "path to proton include files, including /proton")
-
-func main() {
-	flag.Parse()
-	outpath := "wrappers_gen.go"
-	out, err := os.Create(outpath)
-	panicIf(err)
-	defer out.Close()
-
-	apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus",
"connection"}
-	fmt.Fprintln(out, copyright)
-	fmt.Fprint(out, `
-package event
-
-import (
-	"time"
-  "unsafe"
-  "qpid.apache.org/proton/go/internal"
-)
-
-// #include <proton/types.h>
-// #include <proton/event.h>
-// #include <stdlib.h>
-`)
-	for _, api := range apis {
-		fmt.Fprintf(out, "// #include <proton/%s.h>\n", api)
-	}
-	fmt.Fprintln(out, `import "C"`)
-
-	event(out)
-
-	for _, api := range apis {
-		fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api)
-		header := readHeader(api)
-		enums := findEnums(header)
-		for _, e := range enums {
-			genEnum(out, e.Name, e.Values)
-		}
-		apiWrapFns(api, header, out)
-	}
-	out.Close()
-
-	// Run gofmt.
-	cmd := exec.Command("gofmt", "-w", outpath)
-	cmd.Stdout = os.Stdout
-	cmd.Stderr = os.Stderr
-	err = cmd.Run()
-	if err != nil {
-		fmt.Fprintf(os.Stderr, "gofmt: %s", err)
-		os.Exit(1)
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
index db022ff..73db513 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go
@@ -101,7 +101,7 @@ you are doing something fairly low-level it is probably a better choice.
 */
 type Pump struct {
 	// Error is set on exit from Run() if there was an error.
-	Error error
+	Error error // FIXME aconway 2015-05-26: make it a function
 	// Channel to inject functions to be executed in the Pump's proton event loop.
 	Inject chan func()
 
@@ -212,6 +212,9 @@ func (p *Pump) Close() error {
 	}
 	delete(pumps, p.connection)
 	p.free()
+	if p.Error == io.EOF {
+		return nil
+	}
 	return p.Error
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
index 01ba890..f3f3307 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go
@@ -29,7 +29,6 @@ import (
 	"fmt"
 	"runtime"
 	"sync"
-	"sync/atomic"
 	"unsafe"
 )
 
@@ -105,21 +104,23 @@ func panicIf(condition bool, fmt string, args ...interface{}) {
 
 // FirstError is a goroutine-safe error holder that keeps the first error that is set.
 type FirstError struct {
-	err  atomic.Value
-	once sync.Once
+	err  error
+	lock sync.Mutex
 }
 
-// Set the error if not allread set.
-func (e *FirstError) Set(err error) {
-	e.once.Do(func() { e.err.Store(err) })
+// Set the error if not already set, return the error.
+func (e *FirstError) Set(err error) error {
+	e.lock.Lock()
+	defer e.lock.Unlock()
+	if e.err == nil {
+		e.err = err
+	}
+	return e.err
 }
 
 // Get the error.
 func (e *FirstError) Get() error {
-	v := e.err.Load()
-	if v != nil {
-		return v.(error)
-	} else {
-		return nil
-	}
+	e.lock.Lock()
+	defer e.lock.Unlock()
+	return e.err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a301dea4/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
index e653de2..e4b117d 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go
@@ -23,11 +23,16 @@ package messaging
 import "C"
 
 import (
+	"io"
 	"net"
 	"qpid.apache.org/proton/go/amqp"
 	"qpid.apache.org/proton/go/event"
+	"qpid.apache.org/proton/go/internal"
 )
 
+// Closed is an alias for io.EOF. It indicates orderly closure of an endpoint.
+var Closed = io.EOF
+
 // Connection is a connection to a remote AMQP endpoint.
 //
 // You can set exported fields to configure the connection before calling
@@ -42,27 +47,27 @@ type Connection struct {
 	handler *handler
 	pump    *event.Pump
 	session Session
+	err     internal.FirstError
 }
 
+// Error returns nil if the connection is open, messaging.Closed if was closed cleanly
+// or an error value if it was closed due to an error.
+func (c *Connection) Error() error { return c.err.Get() }
+
 // Make an AMQP connection over a net.Conn connection.
-//
-// Use Connection.Close() to close the Connection, this will also close conn.
-// Using conn.Close() directly will cause an abrupt disconnect rather than an
-// orderly AMQP close.
-//
+// You must call c.Close() to close the connection and clean up its resources.
 func (c *Connection) Open(conn net.Conn) (err error) {
 	c.handler = newHandler(c)
 	c.pump, err = event.NewPump(conn,
 		event.NewMessagingDelegator(c.handler),
 	)
-	if err != nil {
-		return err
-	}
-	if c.Server {
-		c.pump.Server()
+	if err == nil {
+		if c.Server {
+			c.pump.Server()
+		}
+		go c.pump.Run()
 	}
-	go c.pump.Run()
-	return nil
+	return c.err.Set(err)
 }
 
 // Connect opens a default client connection. It is a shortcut for
@@ -71,14 +76,16 @@ func (c *Connection) Open(conn net.Conn) (err error) {
 //
 func Connect(conn net.Conn) (*Connection, error) {
 	c := &Connection{}
-	err := c.Open(conn)
-	return c, err
+	c.err.Set(c.Open(conn))
+	return c, c.Error()
 }
 
-// Close the connection.
-//
-// Connections must be closed to clean up resources and stop associated goroutines.
-func (c *Connection) Close() error { return c.pump.Close() }
+// Close cleans up resources and closes the associated net.Conn connection.
+func (c *Connection) Close() error {
+	err := c.pump.Close()   // Will be nil on close OK
+	c.err.Set(c.pump.Error) // Will be io.EOF on close OK
+	return err
+}
 
 // DefaultSession returns a default session for the connection.
 //
@@ -86,6 +93,9 @@ func (c *Connection) Close() error { return c.pump.Close() }
 // Use Session() for more control over creating sessions.
 //
 func (c *Connection) DefaultSession() (s Session, err error) {
+	if c.Error() != nil {
+		return Session{}, c.Error()
+	}
 	if c.session.e.IsNil() {
 		c.session, err = c.Session()
 	}
@@ -237,14 +247,15 @@ func (s *Sender) Send(m amqp.Message) (ack Acknowledgement, err error)
{
 // Close the sender.
 func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free
 
-// Receiver receives messages via the channel Receive.
 type Receiver struct {
 	Link
-	// Channel of messag
+	// Channel to receive messages. When it closes, check Receiver.Error() for an error.
 	Receive <-chan amqp.Message
 }
 
 // FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method?
 
+// FIXME aconway 2015-05-25:  Close must unblock Receive() calls.
+
 // Close the Receiver.
 func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message