qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [30/50] [abbrv] qpid-proton git commit: PROTON-827: go binding: minor example cleanup.
Date Mon, 28 Sep 2015 21:22:49 GMT
PROTON-827: go binding: minor example cleanup.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d8129acb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d8129acb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d8129acb

Branch: refs/heads/proton-go
Commit: d8129acb0a7677eceb2bf88b8835f7b9dddfb466
Parents: 5598d50
Author: Alan Conway <aconway@redhat.com>
Authored: Mon May 11 17:34:13 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Mon Sep 28 17:14:37 2015 -0400

----------------------------------------------------------------------
 examples/go/receive.go | 174 ++++++++++++++++++++++++++++++++++++++++++++
 examples/go/send.go    | 156 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d8129acb/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
new file mode 100644
index 0000000..2545eab
--- /dev/null
+++ b/examples/go/receive.go
@@ -0,0 +1,174 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"math"
+	"net"
+	"os"
+	"path"
+	"qpid.apache.org/proton"
+	"qpid.apache.org/proton/messaging"
+	"sync"
+	"time"
+)
+
+// Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
+var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.")
+var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.")
+var full = flag.Bool("full", false, "Print full message not just body.")
+
+func main() {
+	// Parse flags and arguments, print usage message on error.
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		flag.Usage()
+		fmt.Fprintf(os.Stderr, "No URL provided")
+		os.Exit(1)
+	}
+	duration := time.Duration(*timeout) * time.Second
+	if duration == 0 {
+		duration = time.Duration(math.MaxInt64) // Not forever, but 290 years is close enough.
+	}
+	if *count == 0 {
+		*count = math.MaxInt64
+	}
+
+	// Create a goroutine for each URL that receives messages and sends them to
+	// the messages channel. main() receives and prints them.
+
+	messages := make(chan proton.Message) // Channel for messages from goroutines to main()
+	stop := make(chan struct{})           // Closing this channel means the program is stopping.
+
+	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+
+	wait.Add(len(urls)) // Wait for one goroutine per URL.
+
+	// Arrange to close all connections on exit
+	connections := make([]*messaging.Connection, len(urls))
+	defer func() {
+		for _, c := range connections {
+			if c != nil {
+				c.Close()
+			}
+		}
+	}()
+
+	for i, urlStr := range urls {
+		debug.Printf("Connecting to %s", urlStr)
+		go func(urlStr string) {
+			defer wait.Done()                   // Notify main() that this goroutine is done.
+			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			fatalIf(err)
+
+			// Open a standard Go net.Conn and and AMQP connection using it.
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			fatalIf(err)
+			pc, err := messaging.Connect(conn) // This is our AMQP connection.
+			fatalIf(err)
+			connections[i] = pc // So we can close it when main() ends
+
+			// Create a receiver using the path of the URL as the AMQP address
+			r, err := pc.Receiver(url.Path)
+			fatalIf(err)
+
+			for {
+				var m proton.Message
+				select { // Receive a message or stop.
+				case m = <-r.Receive:
+				case <-stop: // The program is stopping.
+					return
+				}
+				select { // Send m to main() or stop
+				case messages <- m: // Send m to main()
+				case <-stop: // The program is stopping.
+					return
+				}
+			}
+		}(urlStr)
+	}
+	info.Printf("Listening")
+
+	// time.After() returns a channel that will close when the timeout is up.
+	timer := time.After(duration)
+
+	// main() prints each message and checks for count or timeout being exceeded.
+	for i := int64(0); i < *count; i++ {
+		select {
+		case m := <-messages:
+			debug.Print(formatMessage{m})
+		case <-timer: // Timeout has expired
+			i = 0
+		}
+	}
+	info.Printf("Received %d messages", *count)
+	close(stop) // Signal all goroutines to stop.
+	wait.Wait() // Wait for all goroutines to finish.
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
+	}
+	return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+	flag.Parse()
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)                                               // Use default logger for
errors.
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+}
+
+type formatMessage struct{ m proton.Message }
+
+func (fm formatMessage) String() string {
+	if *full {
+		return fmt.Sprintf("%#v", fm.m)
+	} else {
+		return fmt.Sprintf("%#v", fm.m.Body())
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d8129acb/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
new file mode 100644
index 0000000..c4db7cd
--- /dev/null
+++ b/examples/go/send.go
@@ -0,0 +1,156 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"math"
+	"net"
+	"os"
+	"path"
+	"qpid.apache.org/proton"
+	"qpid.apache.org/proton/messaging"
+	"sync"
+)
+
+// Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more")
+var count = flag.Int64("count", 1, "Send this may messages per address. 0 means unlimited.")
+
+// Ack associates an info string with an acknowledgement
+type Ack struct {
+	ack  messaging.Acknowledgement
+	info string
+}
+
+func main() {
+	// Parse flags and arguments, print usage message on error.
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Send messages to all the listed URLs concurrently.
+To each URL, send the string "path-n" where n is the message number.
+`, os.Args[0])
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		flag.Usage()
+		fmt.Fprintf(os.Stderr, "No URL provided\n")
+		os.Exit(1)
+	}
+	if *count == 0 {
+		*count = math.MaxInt64
+	}
+
+	// Create a channel to receive all the acknowledgements
+	acks := make(chan Ack)
+
+	// Create a goroutine for each URL that sends messages.
+	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+	wait.Add(len(urls))     // Wait for one goroutine per URL.
+
+	// Arrange to close all connections on exit
+	connections := make([]*messaging.Connection, len(urls))
+	defer func() {
+		for _, c := range connections {
+			if c != nil {
+				c.Close()
+			}
+		}
+	}()
+
+	for i, urlStr := range urls {
+		debug.Printf("Connecting to %v", urlStr)
+		go func(urlStr string) {
+			defer wait.Done()                   // Notify main() that this goroutine is done.
+			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			fatalIf(err)
+
+			// Open a standard Go net.Conn and and AMQP connection using it.
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			fatalIf(err)
+			pc, err := messaging.Connect(conn) // This is our AMQP connection.
+			fatalIf(err)
+			connections[i] = pc // So we can close it when main() ends
+
+			// Create a sender using the path of the URL as the AMQP address
+			s, err := pc.Sender(url.Path)
+			fatalIf(err)
+
+			for i := int64(0); i < *count; i++ {
+				m := proton.NewMessage()
+				body := fmt.Sprintf("%v-%v", url.Path, i)
+				m.SetBody(body)
+				ack, err := s.Send(m)
+				fatalIf(err)
+				acks <- Ack{ack, body} // Send the acknowledgement to main()
+			}
+		}(urlStr)
+	}
+
+	// Wait for all the acknowledgements
+	expect := int(*count) * len(urls)
+	debug.Printf("Started senders, expect %v acknowledgements", expect)
+	for i := 0; i < expect; i++ {
+		ack, ok := <-acks
+		if !ok {
+			info.Fatalf("acks channel closed after only %d acks\n", i)
+		}
+		d := <-ack.ack
+		debug.Printf("acknowledgement[%v] %v", i, ack.info)
+		if d != messaging.Accepted {
+			info.Printf("Unexpected disposition %v", d)
+		}
+	}
+	info.Printf("Received all %v acknowledgements", expect)
+	wait.Wait() // Wait for all goroutines to finish.
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+	if *verbose >= level {
+		return log.New(w, prefix, 0)
+	}
+	return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+	flag.Parse()
+	name := path.Base(os.Args[0])
+	log.SetFlags(0)                                               // Use default logger for
errors.
+	log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log errors on stderr.
+	info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log info on stdout.
+	debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+	if err != nil {
+		log.Fatal(err)
+	}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message