qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: PROTON-827: go binding: examples for the concurrent Go API.
Date Fri, 17 Apr 2015 20:44:28 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 8744409e2 -> 828713eab


PROTON-827: go binding: examples for the concurrent Go API.

Examples of what a concurrent Go API for proton would look like:

- receive.go receives concurrently from many addresses and prints messages as they are received.
- send.go sends messages concurrently.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/828713ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/828713ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/828713ea

Branch: refs/heads/master
Commit: 828713eaba72d411ea121e58232c739219c37752
Parents: 8744409
Author: Alan Conway <aconway@redhat.com>
Authored: Thu Apr 9 17:40:11 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Fri Apr 17 16:43:47 2015 -0400

----------------------------------------------------------------------
 examples/go/receive.go                          | 129 +++++++++++++++++++
 examples/go/send.go                             |  97 ++++++++++++++
 proton-c/bindings/go/README.md                  |  24 ++--
 .../go/src/qpid.apache.org/proton/dummy.go      |  67 +++++++---
 4 files changed, 292 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828713ea/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
new file mode 100644
index 0000000..231e0ce
--- /dev/null
+++ b/examples/go/receive.go
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"math"
+	"net"
+	"os"
+	"qpid.apache.org/proton"
+	"sync"
+	"time"
+)
+
+// Simplistic error handling for demo. Not recommended.
+func panicIf(err error) {
+	if err != nil {
+		panic(err)
+	}
+}
+
+// Command-line flags
+var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.")
+var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.")
+var short = flag.Bool("short", false, "Short format of message: body only")
+
+func main() {
+	// Parse flags and arguments, print usage message on error.
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		flag.Usage()
+		fmt.Fprintf(os.Stderr, "No URL provided")
+		os.Exit(1)
+	}
+	duration := time.Duration(*timeout) * time.Second
+	if duration == 0 {
+		duration = time.Duration(math.MaxInt64) // Not forever, but 290 years is close enough.
+	}
+	if *count == 0 {
+		*count = math.MaxInt64
+	}
+
+	// Create a goroutine for each URL that receives messages and sends them to
+	// the messages channel. main() receives and prints them.
+
+	messages := make(chan proton.Message) // Channel for messages from goroutines to main()
+	stop := make(chan struct{})           // Closing this channel means the program is stopping.
+
+	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+
+	wait.Add(len(urls)) // Wait for one goroutine per URL.
+
+	for _, urlStr := range urls {
+		// Start a goroutine to receive from urlStr
+		go func(urlStr string) {
+			defer wait.Done()                   // Notify main() that this goroutine is done.
+			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			panicIf(err)
+
+			// Open a standard Go net.Conn for the AMQP connection
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			panicIf(err)
+			defer conn.Close() // Close conn on goroutine exit.
+
+			pc, err := proton.Connect(conn) // This is our AMQP connection.
+			panicIf(err)
+			// We could 'defer pc.Close()' but conn.close() will automatically close the proton connection.
+
+			// For convenience a proton.Connection provides a DefaultSession()
+			// pc.Receiver() is equivalent to pc.DefaultSession().Receiver()
+			r, err := pc.Receiver(url.Path)
+			panicIf(err)
+
+			for m := range r.Receive { // r.Receive is a channel to receive messages.
+				select {
+				case messages <- m: // Send m to main()
+				case <-stop: // The program is stopping.
+					return
+				}
+			}
+		}(urlStr)
+	}
+
+	// time.After() returns a channel that will close when the timeout is up.
+	timer := time.After(duration)
+
+	// main() prints each message and checks for count or timeout being exceeded.
+	for i := *count; i > 0; i-- {
+		select {
+		case m := <-messages:
+			if *short {
+				fmt.Println(m.Body())
+			} else {
+				fmt.Printf("%#v\n\n", m)
+			}
+		case <-timer: // Timeout has expired
+			i = 0
+		}
+	}
+
+	close(stop) // Signal all goroutines to stop.
+	wait.Wait() // Wait for all goroutines to finish.
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828713ea/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
new file mode 100644
index 0000000..4a7f947
--- /dev/null
+++ b/examples/go/send.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+	"flag"
+	"fmt"
+	"math"
+	"net"
+	"os"
+	"qpid.apache.org/proton"
+	"sync"
+)
+
+// Simplistic error handling for demo. Not recommended.
+func panicIf(err error) {
+	if err != nil {
+		panic(err)
+	}
+}
+
+// Command-line flags
+var count = flag.Int64("count", 0, "Send this may messages per address. 0 means unlimited.")
+
+func main() {
+	// Parse flags and arguments, print usage message on error.
+	flag.Usage = func() {
+		fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Send messages to all the listed URLs concurrently.
+To each URL, send the string "path-n" where n is the message number.
+`, os.Args[0])
+		flag.PrintDefaults()
+	}
+	flag.Parse()
+	urls := flag.Args() // Non-flag arguments are URLs to receive from
+	if len(urls) == 0 {
+		flag.Usage()
+		fmt.Fprintf(os.Stderr, "No URL provided")
+		os.Exit(1)
+	}
+	if *count == 0 {
+		*count = math.MaxInt64
+	}
+
+	// Create a goroutine for each URL that sends messages.
+	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+	wait.Add(len(urls))     // Wait for one goroutine per URL.
+
+	for _, urlStr := range urls {
+		// Start a goroutine to receive from urlStr
+		go func(urlStr string) {
+			defer wait.Done()                   // Notify main() that this goroutine is done.
+			url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+			panicIf(err)
+
+			// Open a standard Go net.Conn for the AMQP connection
+			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			panicIf(err)
+			defer conn.Close() // Close conn on goroutine exit.
+
+			pc, err := proton.Connect(conn) // This is our AMQP connection.
+			panicIf(err)
+			// We could 'defer pc.Close()' but conn.close() will automatically close the proton connection.
+
+			// For convenience a proton.Connection provides a DefaultSession()
+			// pc.Sender() is equivalent to pc.DefaultSession().Sender()
+			s, err := pc.Sender(url.Path)
+			panicIf(err)
+
+			for i := int64(0); i < *count; i++ {
+				m := proton.NewMessage()
+				m.SetBody(fmt.Sprintf("%v-%v", url.Path, i))
+				err := s.Send(m)
+				panicIf(err)
+			}
+		}(urlStr)
+	}
+	wait.Wait() // Wait for all goroutines to finish.
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828713ea/proton-c/bindings/go/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
index 41948ac..fc04d79 100644
--- a/proton-c/bindings/go/README.md
+++ b/proton-c/bindings/go/README.md
@@ -26,25 +26,29 @@ There are two types of developer we want to support
 
 2. AMQP-aware developers using Go as an implementation language:
    - Go types to exactly represent all AMQP types and encoding details.
-   - Full access to AMQP concepts like connections, sessions and links.
-
-We will follow conventions of the C and python API where possible to help
-cross-language developers but idiomatic Go is the overriding consideration.
+   - Full access to detailed AMQP concepts: sessions, links, deliveries etc.
 
 ## Status
 
-The current code is a foundation, not an implementation of the target API.
-
 There are two Go modules so far. See the documentation using
 
     godoc apache.org/proton
     godoc apache.org/proton/event
 
-The event module contains a straightforward mapping of the proton event API and
-the simplified MessagingHandler python API.
+The proton module maps between AMQP and Go types and has a Go representation of
+an AMQP message. It is the beginning of the "real" Go API. For examples of what
+this API will look like see:
+
+- [receive.go](../../../examples/go/receive.go) uses channels and goroutines to receive concurrently.
+- [send.go](../../../examples/go/send.go) less interesting but there for symmetry.
+
+The event module is a port of the proton C and python MessagingHandler APIs. It
+provides low-level, goroutine-unsafe but (mostly) complete access to proton. It
+is the foundation for building the Go API and may be useful for advanced AMQP
+projects or cross-langauge proton development in future.
 
-The proton module contains the mapping between AMQP types and messages and Go
-types.
+The event API is functional but not completely complete. The Go API doesn't
+exist yet, there is some dummy code so the examples will compile and run.
 
 ## The event driven API
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/828713ea/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go b/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
index 823fb7f..2f83760 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go
@@ -19,27 +19,64 @@ under the License.
 
 package proton
 
-type Connection struct{}
+import (
+	"fmt"
+	"net"
+)
 
-func (c Connection) Open(string) {}
-func (c Connection) Close()      {}
-func (c Connection) Receiver(addr string) *Receiver {
-	r := &Receiver{make(chan Message)}
-	// FIXME aconway 2015-04-10: dummy implementation to test initial example, will be removed.
+// Placeholder definitions to allow examples to compile.
+
+type Connection struct {
+	Server bool // Server connection does protocol negotiation
+	// FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc.
+}
+
+// Map an AMQP connection using conn
+func (c Connection) Connect(conn net.Conn) error { return nil }
+func (c Connection) Close() error                { return nil }
+
+func (c Connection) Receiver(addr string) (*Receiver, error) {
+	// FIXME aconway 2015-04-10: dummy implementation to test examples, returns endless messages.
+	r := &Receiver{make(chan Message), make(chan struct{})}
 	go func() {
-		m := NewMessage()
-		m.SetBody(addr)
-		r.Receive <- m
-		m = NewMessage()
-		m.SetBody(addr)
-		m.SetSubject("stop")
-		r.Receive <- m
+		for i := 0; ; i++ {
+			m := NewMessage()
+			m.SetBody(fmt.Sprintf("%v-%v", addr, i))
+			select {
+			case r.Receive <- m:
+			case <-r.closed:
+				return
+			}
+		}
 	}()
-	return r
+	return r, nil
+}
+
+func (c Connection) Sender(addr string) (*Sender, error) {
+	return &Sender{}, nil
 }
 
 type Receiver struct {
 	Receive chan Message
+	closed  chan struct{}
 }
 
-func (r Receiver) Close() {}
+func (r Receiver) Close() error { return nil }
+
+type Sender struct{}
+
+func (s Sender) Send(m Message) error { fmt.Println(m.Body()); return nil }
+func (s Sender) Close() error         { return nil }
+
+// Connect makes a default client connection using conn.
+//
+// For more control do:
+//     c := Connection{}
+//     // set parameters on c
+//     c.Connect(conn)
+//
+func Connect(conn net.Conn) (Connection, error) {
+	c := Connection{}
+	c.Connect(conn)
+	return c, nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message