Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E109910148 for ; Tue, 8 Sep 2015 16:37:20 +0000 (UTC) Received: (qmail 30363 invoked by uid 500); 8 Sep 2015 16:37:20 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 30265 invoked by uid 500); 8 Sep 2015 16:37:20 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 29734 invoked by uid 99); 8 Sep 2015 16:37:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Sep 2015 16:37:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37CA2E0514; Tue, 8 Sep 2015 16:37:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Tue, 08 Sep 2015 16:37:52 -0000 Message-Id: <7eb67b7e98914f86b6bacbdc03153878@git.apache.org> In-Reply-To: <4391ed4e15704d9c85eeba9305b98280@git.apache.org> References: <4391ed4e15704d9c85eeba9305b98280@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] qpid-proton git commit: PROTON-827: Re-create go worspace, cmake support for testing. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/82befba0/proton-c/bindings/go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go b/proton-c/bindings/go deleted file mode 120000 index 6ffb7db..0000000 --- a/proton-c/bindings/go +++ /dev/null @@ -1 +0,0 @@ -../../go \ No newline at end of file diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt new file mode 100644 index 0000000..ea6238f --- /dev/null +++ b/proton-c/bindings/go/CMakeLists.txt @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# FIXME aconway 2015-05-20: install targets for go source. + +set(GO_BUILD_FLAGS "" CACHE STRING "Flags for 'go build'") +set(GO_TEST_FLAGS "-v" CACHE STRING "Flags for 'go test'") + +separate_arguments(GO_BUILD_FLAGS) +separate_arguments(GO_TEST_FLAGS) +list(APPEND GO_BUILD_FLAGS "-ldflags=-r ${CMAKE_BINARY_DIR}/proton-c") + +if (BUILD_GO) + # Build in the source tree, go tools aren't friendly otherwise. + # All build output goes in git-ignored pkg or bin subdirectories. + set(qgo "qpid.apache.org/proton/go") + set(packages ${qgo}/amqp ${qgo}/event ${qgo}/messaging) + + # Following are CACHE INTERNAL so examples/CMakeLists.txt can see them. + set(GO_ENV ${env_py} -- + "GOPATH=${CMAKE_CURRENT_SOURCE_DIR}" + "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR}/proton-c/include" + "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/proton-c" + ${GO_EXE} CACHE INTERNAL "Run go with environment set" + ) + set(GO_BUILD ${GO_ENV} build ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go build") + set(GO_TEST ${GO_ENV} test ${GO_BUILD_FLAGS} ${GO_TEST_FLAGS} CACHE INTERNAL "Run go test") + + add_test(NAME go_package_test COMMAND ${GO_TEST} ${packages}) + + set (GO_INSTALL_DIR ${SHARE_INSTALL_DIR}/gocode/src CACHE PATH "Installation directory for Go code") + mark_as_advanced (GO_INSTALL_DIR) + install(DIRECTORY src/qpid.apache.org DESTINATION ${GO_INSTALL_DIR} COMPONENT Go) + +endif(BUILD_GO) diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md new file mode 100644 index 0000000..2b73583 --- /dev/null +++ b/proton-c/bindings/go/README.md @@ -0,0 +1,142 @@ +# *EXPERIMENTAL* Go binding for proton + + NOTE: The go directory is in the proton repository root with a symlink at + proton-c/bindings/go to facilitate use of go tools: go get, godep and godoc. + There may be a cleaner solution to this in future. + +This is the beginning of a [Go](http://golang.org) binding for proton. + +This work is in early *experimental* stages, *everything* may change in future. +Comments and contributions are strongly encouraged, this experiment is public so +early feedback can guide development. + +- Email +- Create issues , attach patches to an issue. + +There are working [examples](../examples/go/README.md) and the examples README file +explains how to install the packages in your go workspace and read the documentation. + +## Goals + +The API should + +- be idiomatic, unsurprising, and easy to use for Go developers. +- support client and server development. +- make simple tasks simple. +- provide deep access to AMQP protocol when that is required. + +There are two types of developer we want to support + +1. Go developers using AMQP as a message transport: + - Straightforward conversions between Go built-in types and AMQP types. + - Easy message exchange via Go channels to support use in goroutines. + +2. AMQP-aware developers using Go as an implementation language: + - Go types to exactly represent all AMQP types and encoding details. + - Full access to detailed AMQP concepts: sessions, links, deliveries etc. + +## Status + +There are 3 go packages for proton: + +- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. +- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. +- qpid.apache.org/proton/go/event: full low-level access to the proton engine. + +Most applications should use the `messaging` package. The `event` package is for +applications that need low-level access to the proton engine. + +The `event` package is fairly complete, with the exception of the proton +reactor. It's unclear if the reactor is important for go. + +The `messaging` package can run the examples but probably not much else. There +is work to do on error handling and the API may change. + +There are working [examples](../../../examples/go/README.md) of a broker using `event` and +a sender and receiver using `messaging`. + +## The event driven API + +See the package documentation for details. + +## The Go API + +The goal: A procedural API that allows any user goroutine to send and receive +AMQP messages and other information (acknowledgments, flow control instructions +etc.) using channels. There will be no user-visible locks and no need to run +user code in special goroutines, e.g. as handlers in a proton event loop. + +See the package documentation for emerging details. + +Currently using a channel to receive messages, a function to send them (channels +internally) and a channel as a "future" for acknowledgements to senders. This +may change. + +## Why a separate API for Go? + +Go is a concurrent language and encourages applications to be divided into +concurrent *goroutines*. It provides traditional locking but it encourages the +use *channels* to communicate between goroutines without explicit locks: + + "Share memory by communicating, don't communicate by sharing memory" + +The idea is that a given value is only operated on by one goroutine at a time, +but values can easily be passed from one goroutine to another. This removes much +of the need for locking. + +Go literature distinguishes between: + +- *concurrency*: "keeping track of things that could be done in parallel" +- *parallelism*: "actually doing things in parallel" + +The application expresses concurrency by starting goroutines for potentially +concurrent tasks. The Go run-times schedule the activity of goroutines onto a +small number (possibly one) of actual parallel executions. + +Even with *no* parallelism, concurrency lets the Go run-times *order* work with +respect to events like file descriptors being readable/writable, channels having +data, timers firing etc. Go automatically takes care of switching out goroutines +that block or sleep so it is normal to write code in terms of blocking calls. + +Event-driven API (like poll, epoll, select or the proton event API) also +channel unpredictably ordered events to actions in one or a small pool of +execution threads. However this requires a different style of programming: +"event-driven" or "reactive" programming. Go developers call it "inside-out" +programming. In an event-driven architecture blocking is a big problem as it +consumes a scarce thread of execution, so actions that take time to complete +have to be re-structured in terms of future event delivery. + +The promise of Go is that you can express your application in concurrent, +procedural terms with simple blocking calls and the Go run-times will turn it +inside-out for you. Write as many goroutines as you want, and let Go interleave +and schedule them efficiently. + +For example: the Go equivalent of listening for connections is a goroutine with +a simple endless loop that calls a blocking Listen() function and starts a +goroutine for each new connection. Each connection has its own goroutine that +deals with just that connection till it closes. + +The benefit is that the variables and logic live closer together. Once you're in +a goroutine, you have everything you need in local variables, and they are +preserved across blocking calls. There's no need to store details in context +objects that you have to look up when handling a later event to figure out how +to continue where you left off. + +So a Go-like proton API does not force the users code to run in an event-loop +goroutine. Instead user goroutines communicate with the event loop(s) via +channels. There's no need to funnel connections into one event loop, in fact it +makes no sense. Connections can be processed concurrently so they should be +processed in separate goroutines and left to Go to schedule. User goroutines can +have simple loops that block channels till messages are available, the user can +start as many or as few such goroutines as they wish to implement concurrency as +simple or as complex as they wish. For example blocking request-response +vs. asynchronous flows of messages and acknowledgments. + +## New to Go? + +If you are new to Go then these are a good place to start: + +- [A Tour of Go](http://tour.golang.org) +- [Effective Go](http://golang.org/doc/effective_go.html) + +Then look at the tools and library docs at as you need them. diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go new file mode 100644 index 0000000..094b196 --- /dev/null +++ b/proton-c/bindings/go/genwrap.go @@ -0,0 +1,427 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Code generator to generate a thin Go wrapper API around the C proton API. +// + +package main + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "regexp" + "strings" + "text/template" +) + +func mixedCase(s string) string { + result := "" + for _, w := range strings.Split(s, "_") { + if w != "" { + result = result + strings.ToUpper(w[0:1]) + strings.ToLower(w[1:]) + } + } + return result +} + +func mixedCaseTrim(s, prefix string) string { + return mixedCase(strings.TrimPrefix(s, prefix)) +} + +var templateFuncs = template.FuncMap{"mixedCase": mixedCase, "mixedCaseTrim": mixedCaseTrim} + +func doTemplate(out io.Writer, data interface{}, tmpl string) { + panicIf(template.Must(template.New("").Funcs(templateFuncs).Parse(tmpl)).Execute(out, data)) +} + +type enumType struct { + Name string + Values []string +} + +// Find enums in a header file return map of enum name to values. +func findEnums(header string) (enums []enumType) { + for _, enum := range enumDefRe.FindAllStringSubmatch(header, -1) { + enums = append(enums, enumType{enum[2], enumValRe.FindAllString(enum[1], -1)}) + } + return enums +} + +func genEnum(out io.Writer, name string, values []string) { + doTemplate(out, []interface{}{name, values}, `{{$enumName := index . 0}}{{$values := index . 1}} +type {{mixedCase $enumName}} C.pn_{{$enumName}}_t +const ({{range $values}} + {{mixedCase .}} {{mixedCase $enumName}} = C.{{.}} {{end}} +) + +func (e {{mixedCase $enumName}}) String() string { + switch e { +{{range $values}} + case C.{{.}}: return "{{mixedCaseTrim . "PN_"}}" {{end}} + } + return "unknown" +} +`) +} + +var ( + reSpace = regexp.MustCompile("\\s+") +) + +func panicIf(err error) { + if err != nil { + panic(err) + } +} + +func readHeader(name string) string { + file, err := os.Open(path.Join(*includeProton, name+".h")) + panicIf(err) + defer file.Close() + s, err := ioutil.ReadAll(file) + panicIf(err) + return string(s) +} + +var copyright string = `/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// +// NOTE: This file was generated by genwrap.go, do not edit it by hand. +// +` + +type eventType struct { + // C, function and interface names for the event + Name, Cname, Fname, Iname string +} + +func newEventType(cName string) eventType { + var etype eventType + etype.Cname = cName + etype.Name = mixedCaseTrim(cName, "PN_") + etype.Fname = "On" + etype.Name + etype.Iname = etype.Fname + "Interface" + return etype +} + +var ( + enumDefRe = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;") + enumValRe = regexp.MustCompile("PN_[A-Z_]+") + skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER") + skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport") +) + +// Generate event wrappers. +func event(out io.Writer) { + event_h := readHeader("event") + + // Event is implented by hand in wrappers.go + + // Get all the pn_event_type_t enum values + var etypes []eventType + enums := findEnums(event_h) + for _, e := range enums[0].Values { + if skipEventRe.FindStringSubmatch(e) == nil { + etypes = append(etypes, newEventType(e)) + } + } + + doTemplate(out, etypes, ` +type EventType int +const ({{range .}} + E{{.Name}} EventType = C.{{.Cname}}{{end}} +) +`) + + doTemplate(out, etypes, ` +func (e EventType) String() string { + switch e { +{{range .}} + case C.{{.Cname}}: return "{{.Name}}"{{end}} + } + return "Unknown" +} +`) +} + +type genType struct { + Ctype, Gotype string + ToGo func(value string) string + ToC func(value string) string + Assign func(value string) string +} + +func (g genType) printBody(out io.Writer, value string) { + if g.Gotype != "" { + fmt.Fprintf(out, "return %s", g.ToGo(value)) + } else { + fmt.Fprintf(out, "%s", value) + } +} + +func (g genType) goLiteral(value string) string { + return fmt.Sprintf("%s{%s}", g.Gotype, value) +} + +func (g genType) goConvert(value string) string { + switch g.Gotype { + case "string": + return fmt.Sprintf("C.GoString(%s)", value) + case "Event": + return fmt.Sprintf("makeEvent(%s)", value) + default: + return fmt.Sprintf("%s(%s)", g.Gotype, value) + } +} + +var notStruct = map[string]bool{ + "EventType": true, + "SndSettleMode": true, + "RcvSettleMode": true, + "TerminusType": true, + "State": true, + "Durability": true, + "ExpiryPolicy": true, + "DistributionMode": true, +} + +func mapType(ctype string) (g genType) { + g.Ctype = "C." + strings.Trim(ctype, " \n") + + switch g.Ctype { + case "C.void": + g.Gotype = "" + case "C.size_t": + g.Gotype = "uint" + case "C.int": + g.Gotype = "int" + case "C.void *": + g.Gotype = "unsafe.Pointer" + g.Ctype = "unsafe.Pointer" + case "C.bool": + g.Gotype = "bool" + case "C.ssize_t": + g.Gotype = "int" + case "C.uint64_t": + g.Gotype = "uint64" + case "C.uint32_t": + g.Gotype = "uint16" + case "C.uint16_t": + g.Gotype = "uint32" + case "C.const char *": + fallthrough + case "C.char *": + g.Gotype = "string" + g.Ctype = "C.CString" + g.ToC = func(v string) string { return fmt.Sprintf("%sC", v) } + g.Assign = func(v string) string { + return fmt.Sprintf("%sC := C.CString(%s)\n defer C.free(unsafe.Pointer(%sC))\n", v, v, v) + } + case "C.pn_seconds_t": + g.Gotype = "time.Duration" + g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) } + case "C.pn_error_t *": + g.Gotype = "error" + g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) } + default: + pnId := regexp.MustCompile(" *pn_([a-z_]+)_t *\\*? *") + match := pnId.FindStringSubmatch(g.Ctype) + if match == nil { + panic(fmt.Errorf("unknown C type %#v", g.Ctype)) + } + g.Gotype = mixedCase(match[1]) + if !notStruct[g.Gotype] { + g.ToGo = g.goLiteral + g.ToC = func(v string) string { return v + ".pn" } + } + } + if g.ToGo == nil { + g.ToGo = g.goConvert // Use conversion by default. + } + if g.ToC == nil { + g.ToC = func(v string) string { return fmt.Sprintf("%s(%s)", g.Ctype, v) } + } + return +} + +type genArg struct { + Name string + genType +} + +var typeNameRe = regexp.MustCompile("^(.*( |\\*))([^ *]+)$") + +func splitArgs(argstr string) []genArg { + argstr = strings.Trim(argstr, " \n") + if argstr == "" { + return []genArg{} + } + args := make([]genArg, 0) + for _, item := range strings.Split(argstr, ",") { + item = strings.Trim(item, " \n") + typeName := typeNameRe.FindStringSubmatch(item) + if typeName == nil { + panic(fmt.Errorf("Can't split argument type/name %#v", item)) + } + cType := strings.Trim(typeName[1], " \n") + name := strings.Trim(typeName[3], " \n") + if name == "type" { + name = "type_" + } + args = append(args, genArg{name, mapType(cType)}) + } + return args +} + +func goArgs(args []genArg) string { + l := "" + for i, arg := range args { + if i != 0 { + l += ", " + } + l += arg.Name + " " + arg.Gotype + } + return l +} + +func cArgs(args []genArg) string { + l := "" + for _, arg := range args { + l += fmt.Sprintf(", %s", arg.ToC(arg.Name)) + } + return l +} + +func cAssigns(args []genArg) string { + l := "\n" + for _, arg := range args { + if arg.Assign != nil { + l += fmt.Sprintf("%s\n", arg.Assign(arg.Name)) + } + } + return l +} + +// Return the go name of the function or "" to skip the function. +func goFnName(api, fname string) string { + // Skip class, context and attachment functions. + if skipFnRe.FindStringSubmatch(fname) != nil { + return "" + } + switch api + "." + fname { + case "link.get_drain": + return "IsDrain" + default: + return mixedCaseTrim(fname, "get_") + } +} + +func apiWrapFns(api, header string, out io.Writer) { + fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api) + fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) + fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api)) + for _, m := range fn.FindAllStringSubmatch(header, -1) { + rtype, fname, argstr := mapType(m[1]), m[2], m[3] + gname := goFnName(api, fname) + if gname == "" { // Skip + continue + } + args := splitArgs(argstr) + fmt.Fprintf(out, "func (%c %s) %s", api[0], mixedCase(api), gname) + fmt.Fprintf(out, "(%s) %s { ", goArgs(args), rtype.Gotype) + fmt.Fprint(out, cAssigns(args)) + rtype.printBody(out, fmt.Sprintf("C.pn_%s_%s(%c.pn%s)", api, fname, api[0], cArgs(args))) + fmt.Fprintf(out, "}\n") + } +} + +var includeProton = flag.String("include", "", "path to proton include files, including /proton") + +func main() { + flag.Parse() + outpath := "wrappers_gen.go" + out, err := os.Create(outpath) + panicIf(err) + defer out.Close() + + apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} + fmt.Fprintln(out, copyright) + fmt.Fprint(out, ` +package event + +import ( + "time" + "unsafe" + "qpid.apache.org/proton/go/internal" +) + +// #include +// #include +// #include +`) + for _, api := range apis { + fmt.Fprintf(out, "// #include \n", api) + } + fmt.Fprintln(out, `import "C"`) + + event(out) + + for _, api := range apis { + fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api) + header := readHeader(api) + enums := findEnums(header) + for _, e := range enums { + genEnum(out, e.Name, e.Values) + } + apiWrapFns(api, header, out) + } + out.Close() + + // Run gofmt. + cmd := exec.Command("gofmt", "-w", outpath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "gofmt: %s", err) + os.Exit(1) + } +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go new file mode 100644 index 0000000..7c00aa0 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package amqp encodes and decodes AMQP messages and data as Go types. + +It follows the standard 'encoding' libraries pattern. The mapping between AMQP +and Go types is described in the documentation of the Marshal and Unmarshal +functions. + +The sub-packages 'event' and 'messaging' provide two alternative ways to write +AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' +gives complete low-level control of the underlying proton C engine. + +AMQP is an open standard for inter-operable message exchange, see +*/ +package amqp + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// This file is just for the package comment. + +// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop new file mode 120000 index 0000000..cc3641d --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop @@ -0,0 +1 @@ +../../../../../../../../tests/interop \ No newline at end of file diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go new file mode 100644 index 0000000..11049f7 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go @@ -0,0 +1,308 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Test that conversion of Go type to/from AMQP is compatible with other +// bindings. +// +package amqp + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" +) + +func assertEqual(want interface{}, got interface{}) { + if !reflect.DeepEqual(want, got) { + panic(fmt.Errorf("%#v != %#v", want, got)) + } +} + +func assertNil(err interface{}) { + if err != nil { + panic(err) + } +} + +func getReader(name string) (r io.Reader) { + r, err := os.Open("interop/" + name + ".amqp") + if err != nil { + panic(fmt.Errorf("Can't open %#v: %v", name, err)) + } + return +} + +func remaining(d *Decoder) string { + remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) + return string(remainder) +} + +// assertDecode: want is the expected value, gotPtr is a pointer to a +// instance of the same type for Decode. +func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) { + + assertNil(d.Decode(gotPtr)) + + got := reflect.ValueOf(gotPtr).Elem().Interface() + assertEqual(want, got) + + // Try round trip encoding + bytes, err := Marshal(want, nil) + assertNil(err) + n, err := Unmarshal(bytes, gotPtr) + assertNil(err) + assertEqual(n, len(bytes)) + got = reflect.ValueOf(gotPtr).Elem().Interface() + assertEqual(want, got) +} + +func TestUnmarshal(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader("strings")) + if err != nil { + t.Error(err) + } + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + n, err := Unmarshal(bytes, &got) + if err != nil { + t.Error(err) + } + if want != got { + t.Errorf("%#v != %#v", want, got) + } + bytes = bytes[n:] + } +} + +func TestPrimitivesExact(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into exact types + var b bool + assertDecode(d, true, &b) + assertDecode(d, false, &b) + var u8 uint8 + assertDecode(d, uint8(42), &u8) + var u16 uint16 + assertDecode(d, uint16(42), &u16) + var i16 int16 + assertDecode(d, int16(-42), &i16) + var u32 uint32 + assertDecode(d, uint32(12345), &u32) + var i32 int32 + assertDecode(d, int32(-12345), &i32) + var u64 uint64 + assertDecode(d, uint64(12345), &u64) + var i64 int64 + assertDecode(d, int64(-12345), &i64) + var f32 float32 + assertDecode(d, float32(0.125), &f32) + var f64 float64 + assertDecode(d, float64(0.125), &f64) +} + +func TestPrimitivesCompatible(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into compatible types + var b bool + var i int + var u uint + var f float64 + assertDecode(d, true, &b) + assertDecode(d, false, &b) + assertDecode(d, uint(42), &u) + assertDecode(d, uint(42), &u) + assertDecode(d, -42, &i) + assertDecode(d, uint(12345), &u) + assertDecode(d, -12345, &i) + assertDecode(d, uint(12345), &u) + assertDecode(d, -12345, &i) + assertDecode(d, 0.125, &f) + assertDecode(d, 0.125, &f) +} + +// assertDecodeValue: want is the expected value, decode into a reflect.Value +func assertDecodeInterface(d *Decoder, want interface{}) { + + var got, got2 interface{} + assertNil(d.Decode(&got)) + + assertEqual(want, got) + + // Try round trip encoding + bytes, err := Marshal(got, nil) + assertNil(err) + n, err := Unmarshal(bytes, &got2) + assertNil(err) + assertEqual(n, len(bytes)) + assertEqual(want, got2) +} + +func TestPrimitivesInterface(t *testing.T) { + d := NewDecoder(getReader("primitives")) + assertDecodeInterface(d, true) + assertDecodeInterface(d, false) + assertDecodeInterface(d, uint8(42)) + assertDecodeInterface(d, uint16(42)) + assertDecodeInterface(d, int16(-42)) + assertDecodeInterface(d, uint32(12345)) + assertDecodeInterface(d, int32(-12345)) + assertDecodeInterface(d, uint64(12345)) + assertDecodeInterface(d, int64(-12345)) + assertDecodeInterface(d, float32(0.125)) + assertDecodeInterface(d, float64(0.125)) +} + +func TestStrings(t *testing.T) { + d := NewDecoder(getReader("strings")) + // Test decoding as plain Go strings + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + assertDecode(d, want, &got) + } + remains := remaining(d) + if remains != "" { + t.Errorf("leftover: %s", remains) + } + + // Test decoding as specific string types + d = NewDecoder(getReader("strings")) + var bytes []byte + var str, sym string + assertDecode(d, []byte("abc\000defg"), &bytes) + assertDecode(d, "abcdefg", &str) + assertDecode(d, "abcdefg", &sym) + assertDecode(d, make([]byte, 0), &bytes) + assertDecode(d, "", &str) + assertDecode(d, "", &sym) + remains = remaining(d) + if remains != "" { + t.Fatalf("leftover: %s", remains) + } + + // Test some error handling + d = NewDecoder(getReader("strings")) + var s string + err := d.Decode(s) + if err == nil { + t.Fatal("Expected error") + } + if !strings.Contains(err.Error(), "not a pointer") { + t.Error(err) + } + var i int + err = d.Decode(&i) + if !strings.Contains(err.Error(), "cannot unmarshal") { + t.Error(err) + } + _, err = Unmarshal([]byte{}, nil) + if !strings.Contains(err.Error(), "not enough data") { + t.Error(err) + } + _, err = Unmarshal([]byte("foobar"), nil) + if !strings.Contains(err.Error(), "invalid-argument") { + t.Error(err) + } +} + +func TestEncodeDecode(t *testing.T) { + type data struct { + s string + i int + u8 uint8 + b bool + f float32 + v interface{} + } + + in := data{"foo", 42, 9, true, 1.234, "thing"} + + buf := bytes.Buffer{} + e := NewEncoder(&buf) + assertNil(e.Encode(in.s)) + assertNil(e.Encode(in.i)) + assertNil(e.Encode(in.u8)) + assertNil(e.Encode(in.b)) + assertNil(e.Encode(in.f)) + assertNil(e.Encode(in.v)) + + var out data + d := NewDecoder(&buf) + assertNil(d.Decode(&out.s)) + assertNil(d.Decode(&out.i)) + assertNil(d.Decode(&out.u8)) + assertNil(d.Decode(&out.b)) + assertNil(d.Decode(&out.f)) + assertNil(d.Decode(&out.v)) + + assertEqual(in, out) +} + +func TestMap(t *testing.T) { + d := NewDecoder(getReader("maps")) + + // Generic map + var m Map + assertDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m) + + // Interface as map + var i interface{} + assertDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i) + + d = NewDecoder(getReader("maps")) + // Specific typed map + var m2 map[string]int + assertDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2) + + // Round trip a nested map + m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} + bytes, err := Marshal(m, nil) + assertNil(err) + _, err = Unmarshal(bytes, &i) + assertNil(err) + assertEqual(m, i) +} + +func TestList(t *testing.T) { + d := NewDecoder(getReader("lists")) + var l List + assertDecode(d, List{int32(32), "foo", true}, &l) + assertDecode(d, List{}, &l) +} + +func FIXMETestMessage(t *testing.T) { + // FIXME aconway 2015-04-09: integrate Message encoding under marshal/unmarshal API. + bytes, err := ioutil.ReadAll(getReader("message")) + assertNil(err) + m, err := DecodeMessage(bytes) + assertNil(err) + fmt.Printf("%+v\n", m) + assertEqual(m.Body(), "hello") + + bytes2 := make([]byte, len(bytes)) + bytes2, err = m.Encode(bytes2) + assertNil(err) + assertEqual(bytes, bytes2) +} + +// FIXME aconway 2015-03-13: finish the full interop test diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go new file mode 100644 index 0000000..e5c2945 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go @@ -0,0 +1,238 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include +import "C" + +import ( + "io" + "qpid.apache.org/proton/go/internal" + "reflect" + "unsafe" +) + +func dataError(prefix string, data *C.pn_data_t) error { + err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) + if err != nil { + err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) + } + return err +} + +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ + |[]T |list with T converted as above | + +-------------------------------------+--------------------------------------------+ + |List |list, may have mixed types values | + +-------------------------------------+--------------------------------------------+ + +TODO Go types: array, slice, struct + +Go types that cannot be marshaled: complex64/128, uintptr, function, interface, channel +*/ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { + defer internal.DoRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + put(data, v) + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataError("marshal error", data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. +var overflow = internal.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + +func put(data *C.pn_data_t, v interface{}) { + switch v := v.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) + case Map: // Special map type + C.pn_data_put_map(data) + C.pn_data_enter(data) + for key, val := range v { + put(data, key) + put(data, val) + } + C.pn_data_exit(data) + default: + switch reflect.TypeOf(v).Kind() { + case reflect.Map: + putMap(data, v) + case reflect.Slice: + putList(data, v) + default: + panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + } + } + err := dataError("marshal", data) + if err != nil { + panic(err) + } + return +} + +func putMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v) + C.pn_data_put_map(data) + C.pn_data_enter(data) + for _, key := range mapValue.MapKeys() { + put(data, key.Interface()) + put(data, mapValue.MapIndex(key).Interface()) + } + C.pn_data_exit(data) +} + +func putList(data *C.pn_data_t, v interface{}) { + listValue := reflect.ValueOf(v) + C.pn_data_put_list(data) + C.pn_data_enter(data) + for i := 0; i < listValue.Len(); i++ { + put(data, listValue.Index(i).Interface()) + } + C.pn_data_exit(data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + e.writer.Write(e.buffer) + } + return err +} + +func replace(data *C.pn_data_t, v interface{}) { + C.pn_data_clear(data) + put(data, v) +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go new file mode 100644 index 0000000..87093f5 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go @@ -0,0 +1,342 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include +// #include +// #include +import "C" + +import ( + "qpid.apache.org/proton/go/internal" + "time" + "unsafe" +) + +// FIXME aconway 2015-04-28: Do we need the interface or can we just export the struct? + +// Message is the interface to an AMQP message. +// Instances of this interface contain a pointer to the underlying struct. +type Message interface { + /** + * Inferred indicates how the message content + * is encoded into AMQP sections. If inferred is true then binary and + * list values in the body of the message will be encoded as AMQP DATA + * and AMQP SEQUENCE sections, respectively. If inferred is false, + * then all values in the body of the message will be encoded as AMQP + * VALUE sections regardless of their type. + */ + Inferred() bool + SetInferred(bool) + + /** + * Durable indicates that any parties taking responsibility + * for the message must durably store the content. + */ + Durable() bool + SetDurable(bool) + + /** + * Priority impacts ordering guarantees. Within a + * given ordered context, higher priority messages may jump ahead of + * lower priority messages. + */ + Priority() uint8 + SetPriority(uint8) + + /** + * TTL or Time To Live, a message it may be dropped after this duration + */ + TTL() time.Duration + SetTTL(time.Duration) + + /** + * FirstAcquirer indicates + * that the recipient of the message is the first recipient to acquire + * the message, i.e. there have been no failed delivery attempts to + * other acquirers. Note that this does not mean the message has not + * been delivered to, but not acquired, by other recipients. + */ + FirstAcquirer() bool + SetFirstAcquirer(bool) + + /** + * DeliveryCount tracks how many attempts have been made to + * delivery a message. + */ + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + /** + * MessageId provides a unique identifier for a message. + * it can be an a string, an unsigned long, a uuid or a + * binary value. + */ + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + /** + * CorrelationId is set on correlated request and response messages. It can be an a string, an unsigned long, a uuid or a + * binary value. + */ + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + + // ExpiryTime indicates an absoulte time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + /** + * Instructions can be used to access or modify AMQP delivery instructions. + */ + Instructions() *map[string]interface{} + + /** + * Annotations can be used to access or modify AMQP annotations. + */ + Annotations() *map[string]interface{} + + /** + * Properties can be used to access or modify the application properties of a message. + */ + Properties() *map[string]interface{} + + /** + * Body of the message can be any AMQP encodable type. + */ + Body() interface{} + SetBody(interface{}) + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) +} + +// NewMessage creates a new message instance. The returned interface contains a pointer. +func NewMessage() Message { + pn := C.pn_message() // Pick up default setting from C message. + defer C.pn_message_free(pn) + return goMessage(pn) +} + +// Message implementation copies all message data into Go space so it can be proprely +// memory managed. +// +type message struct { + inferred, durable, firstAcquirer bool + priority uint8 + ttl time.Duration + deliveryCount uint32 + messageId interface{} + userId, address, subject, replyTo string + contentType, contentEncoding string + groupId, replyToGroupId string + creationTime, expiryTime time.Time + groupSequence int32 + correlationId interface{} + instructions, annotations, properties map[string]interface{} + body interface{} +} + +func (m *message) Inferred() bool { return m.inferred } +func (m *message) SetInferred(b bool) { m.inferred = b } +func (m *message) Durable() bool { return m.durable } +func (m *message) SetDurable(b bool) { m.durable = b } +func (m *message) Priority() uint8 { return m.priority } +func (m *message) SetPriority(b uint8) { m.priority = b } +func (m *message) TTL() time.Duration { return m.ttl } +func (m *message) SetTTL(d time.Duration) { m.ttl = d } +func (m *message) FirstAcquirer() bool { return m.firstAcquirer } +func (m *message) SetFirstAcquirer(b bool) { m.firstAcquirer = b } +func (m *message) DeliveryCount() uint32 { return m.deliveryCount } +func (m *message) SetDeliveryCount(c uint32) { m.deliveryCount = c } +func (m *message) MessageId() interface{} { return m.messageId } +func (m *message) SetMessageId(id interface{}) { m.messageId = id } +func (m *message) UserId() string { return m.userId } +func (m *message) SetUserId(s string) { m.userId = s } +func (m *message) Address() string { return m.address } +func (m *message) SetAddress(s string) { m.address = s } +func (m *message) Subject() string { return m.subject } +func (m *message) SetSubject(s string) { m.subject = s } +func (m *message) ReplyTo() string { return m.replyTo } +func (m *message) SetReplyTo(s string) { m.replyTo = s } +func (m *message) CorrelationId() interface{} { return m.correlationId } +func (m *message) SetCorrelationId(c interface{}) { m.correlationId = c } +func (m *message) ContentType() string { return m.contentType } +func (m *message) SetContentType(s string) { m.contentType = s } +func (m *message) ContentEncoding() string { return m.contentEncoding } +func (m *message) SetContentEncoding(s string) { m.contentEncoding = s } +func (m *message) ExpiryTime() time.Time { return m.expiryTime } +func (m *message) SetExpiryTime(t time.Time) { m.expiryTime = t } +func (m *message) CreationTime() time.Time { return m.creationTime } +func (m *message) SetCreationTime(t time.Time) { m.creationTime = t } +func (m *message) GroupId() string { return m.groupId } +func (m *message) SetGroupId(s string) { m.groupId = s } +func (m *message) GroupSequence() int32 { return m.groupSequence } +func (m *message) SetGroupSequence(s int32) { m.groupSequence = s } +func (m *message) ReplyToGroupId() string { return m.replyToGroupId } +func (m *message) SetReplyToGroupId(s string) { m.replyToGroupId = s } +func (m *message) Instructions() *map[string]interface{} { return &m.instructions } +func (m *message) Annotations() *map[string]interface{} { return &m.annotations } +func (m *message) Properties() *map[string]interface{} { return &m.properties } +func (m *message) Body() interface{} { return m.body } +func (m *message) SetBody(b interface{}) { m.body = b } + +// rewindGet rewinds and then gets the value from a data object. +func rewindGet(data *C.pn_data_t, v interface{}) { + if data != nil && C.pn_data_size(data) > 0 { + C.pn_data_rewind(data) + C.pn_data_next(data) + get(data, v) + } +} + +// goMessage populates a Go message from a pn_message_t +func goMessage(pn *C.pn_message_t) *message { + m := &message{ + inferred: bool(C.pn_message_is_inferred(pn)), + durable: bool(C.pn_message_is_durable(pn)), + priority: uint8(C.pn_message_get_priority(pn)), + ttl: time.Duration(C.pn_message_get_ttl(pn)) * time.Millisecond, + firstAcquirer: bool(C.pn_message_is_first_acquirer(pn)), + deliveryCount: uint32(C.pn_message_get_delivery_count(pn)), + userId: goString(C.pn_message_get_user_id(pn)), + address: C.GoString(C.pn_message_get_address(pn)), + subject: C.GoString(C.pn_message_get_subject(pn)), + replyTo: C.GoString(C.pn_message_get_reply_to(pn)), + contentType: C.GoString(C.pn_message_get_content_type(pn)), + contentEncoding: C.GoString(C.pn_message_get_content_encoding(pn)), + expiryTime: time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(pn)))), + creationTime: time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(pn))), + groupId: C.GoString(C.pn_message_get_group_id(pn)), + groupSequence: int32(C.pn_message_get_group_sequence(pn)), + replyToGroupId: C.GoString(C.pn_message_get_reply_to_group_id(pn)), + messageId: nil, + correlationId: nil, + instructions: make(map[string]interface{}), + annotations: make(map[string]interface{}), + properties: make(map[string]interface{}), + } + rewindGet(C.pn_message_id(pn), &m.messageId) + rewindGet(C.pn_message_correlation_id(pn), &m.correlationId) + rewindGet(C.pn_message_instructions(pn), &m.instructions) + rewindGet(C.pn_message_annotations(pn), &m.annotations) + rewindGet(C.pn_message_properties(pn), &m.properties) + rewindGet(C.pn_message_body(pn), &m.body) + return m +} + +// pnMessage populates a pn_message_t from a Go message. +func (m *message) pnMessage() *C.pn_message_t { + pn := C.pn_message() + C.pn_message_set_inferred(pn, C.bool(m.Inferred())) + C.pn_message_set_durable(pn, C.bool(m.Durable())) + C.pn_message_set_priority(pn, C.uint8_t(m.priority)) + C.pn_message_set_ttl(pn, C.pn_millis_t(m.TTL()/time.Millisecond)) + C.pn_message_set_first_acquirer(pn, C.bool(m.FirstAcquirer())) + C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) + replace(C.pn_message_id(pn), m.MessageId()) + C.pn_message_set_user_id(pn, pnBytes([]byte(m.UserId()))) + C.pn_message_set_address(pn, C.CString(m.Address())) + C.pn_message_set_subject(pn, C.CString(m.Subject())) + C.pn_message_set_reply_to(pn, C.CString(m.ReplyTo())) + replace(C.pn_message_correlation_id(pn), m.CorrelationId()) + C.pn_message_set_content_type(pn, C.CString(m.ContentType())) + C.pn_message_set_content_encoding(pn, C.CString(m.ContentEncoding())) + C.pn_message_set_expiry_time(pn, pnTime(m.ExpiryTime())) + C.pn_message_set_creation_time(pn, pnTime(m.CreationTime())) + C.pn_message_set_group_id(pn, C.CString(m.GroupId())) + C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.GroupSequence())) + C.pn_message_set_reply_to_group_id(pn, C.CString(m.ReplyToGroupId())) + replace(C.pn_message_instructions(pn), *m.Instructions()) + replace(C.pn_message_annotations(pn), *m.Annotations()) + replace(C.pn_message_properties(pn), *m.Properties()) + replace(C.pn_message_body(pn), m.Body()) + return pn +} + +// FIXME aconway 2015-04-08: Move message encode/decode under Marshal/Unmarshal interfaces. + +// DecodeMessage decodes bytes as a message +func DecodeMessage(data []byte) (Message, error) { + pnMsg := C.pn_message() + defer C.pn_message_free(pnMsg) + if len(data) == 0 { + return nil, internal.Errorf("empty buffer for decode") + } + if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 { + return nil, internal.Errorf("decoding message: %s", + internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg)))) + } + return goMessage(pnMsg), nil +} + +// Encode the message into bufffer. +// If buffer is nil or len(buffer) is not sufficient to encode the message a larger +// buffer will be returned. +func (m *message) Encode(buffer []byte) ([]byte, error) { + pn := m.pnMessage() + defer C.pn_message_free(pn) + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: + return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go new file mode 100644 index 0000000..46e26de --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go @@ -0,0 +1,90 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "reflect" + "testing" + "time" +) + +func roundTrip(t *testing.T, m Message) { + buffer, err := m.Encode(nil) + if err != nil { + t.Fatalf("Encode failed: %v", err) + } + m2, err := DecodeMessage(buffer) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + if !reflect.DeepEqual(m, m2) { + t.Errorf("Message mismatch got\n%#v\nwant\n%#v", m, m2) + } +} + +func TestDefaultMessageRoundTrip(t *testing.T) { + m := NewMessage() + // Check defaults + assertEqual(m.Inferred(), false) + assertEqual(m.Durable(), false) + assertEqual(m.Priority(), uint8(4)) + assertEqual(m.TTL(), time.Duration(0)) + assertEqual(m.UserId(), "") + assertEqual(m.Address(), "") + assertEqual(m.Subject(), "") + assertEqual(m.ReplyTo(), "") + assertEqual(m.ContentType(), "") + assertEqual(m.ContentEncoding(), "") + assertEqual(m.GroupId(), "") + assertEqual(m.GroupSequence(), int32(0)) + assertEqual(m.ReplyToGroupId(), "") + assertEqual(m.MessageId(), nil) + assertEqual(m.CorrelationId(), nil) + assertEqual(*m.Instructions(), map[string]interface{}{}) + assertEqual(*m.Annotations(), map[string]interface{}{}) + assertEqual(*m.Properties(), map[string]interface{}{}) + assertEqual(m.Body(), nil) + + roundTrip(t, m) +} + +func TestMessageRoundTrip(t *testing.T) { + m := NewMessage() + m.SetInferred(false) + m.SetDurable(true) + m.SetPriority(42) + m.SetTTL(0) + m.SetUserId("user") + m.SetAddress("address") + m.SetSubject("subject") + m.SetReplyTo("replyto") + m.SetContentType("content") + m.SetContentEncoding("encoding") + m.SetGroupId("group") + m.SetGroupSequence(42) + m.SetReplyToGroupId("replytogroup") + m.SetMessageId("id") + m.SetCorrelationId("correlation") + *m.Instructions() = map[string]interface{}{"instructions": "foo"} + *m.Annotations() = map[string]interface{}{"annotations": "foo"} + *m.Properties() = map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"} + m.SetBody("hello") + roundTrip(t, m) +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go new file mode 100644 index 0000000..8713520 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/types.go @@ -0,0 +1,193 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include +// const pn_type_t PN_DATA_TYPE_ERROR = (pn_type_t) -1; +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + +func pnTypeString(pt C.pn_type_t) string { + switch pt { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" + case C.PN_DATA_TYPE_ERROR: + return "no-data" + default: + return fmt.Sprintf("unknown-type(%d)", pt) + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// FIXME aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go new file mode 100644 index 0000000..944bf6f --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/uid.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Generating unique IDs for various things. + +package amqp + +import ( + "strconv" + "sync/atomic" +) + +// A simple atomic counter to generate unique 64 bit IDs. +type UidCounter struct{ count uint64 } + +// NextInt gets the next uint64 value from the atomic counter. +func (uc *UidCounter) NextInt() uint64 { + return atomic.AddUint64(&uc.count, 1) +} + +// Next gets the next integer value encoded as a base32 string, safe for NUL terminated C strings. +func (uc *UidCounter) Next() string { + return strconv.FormatUint(uc.NextInt(), 32) +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go new file mode 100644 index 0000000..89ab64a --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/unmarshal.go @@ -0,0 +1,552 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +oor more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include +import "C" + +import ( + "bytes" + "fmt" + "io" + "qpid.apache.org/proton/go/internal" + "reflect" + "unsafe" +) + +const minDecode = 1024 + +// Error returned if AMQP data cannot be unmarshaled as the desired Go type. +type BadUnmarshal struct { + // The name of the AMQP type. + AMQPType string + // The Go type. + GoType reflect.Type +} + +func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { + return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)} +} + +func (e BadUnmarshal) Error() string { + if e.GoType.Kind() != reflect.Ptr { + return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) + } else { + return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) + } +} + +// +// Decoding from a pn_data_t +// +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. +// + +// Decoder decodes AMQP values from an io.Reader. +// +type Decoder struct { + reader io.Reader + buffer bytes.Buffer +} + +// NewDecoder returns a new decoder that reads from r. +// +// The decoder has it's own buffer and may read more data than required for the +// AMQP values requested. Use Buffered to see if there is data left in the +// buffer. +// +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{r, bytes.Buffer{}} +} + +// Buffered returns a reader of the data remaining in the Decoder's buffer. The +// reader is valid until the next call to Decode. +// +func (d *Decoder) Buffered() io.Reader { + return bytes.NewReader(d.buffer.Bytes()) +} + +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. +// +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. +// +func (d *Decoder) Decode(v interface{}) (err error) { + defer internal.DoRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + var n int + for n == 0 && err == nil { + n = unmarshal(data, d.buffer.Bytes(), v) + if n == 0 { // n == 0 means not enough data, read more + err = d.more() + if err != nil { + return + } + } + } + d.buffer.Next(n) + return +} + +/* +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. +Types are converted as follows: + + +---------------------------+----------------------------------------------------------------------+ + |To Go types |From AMQP types | + +===========================+======================================================================+ + |bool |bool | + +---------------------------+----------------------------------------------------------------------+ + |int, int8, int16, |Equivalent or smaller signed integer type: char, byte, short, int, | + |int32, int64 |long. | + +---------------------------+----------------------------------------------------------------------+ + |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: char, ubyte, ushort, | + |uint32, uint64 types |uint, ulong | + +---------------------------+----------------------------------------------------------------------+ + |float32, float64 |Equivalent or smaller float or double. | + +---------------------------+----------------------------------------------------------------------+ + |string, []byte |string, symbol or binary. | + +---------------------------+----------------------------------------------------------------------+ + |Symbol |symbol | + +---------------------------+----------------------------------------------------------------------+ + |map[K]T |map, provided all keys and values can unmarshal to types K, T | + +---------------------------+----------------------------------------------------------------------+ + |Map |map, any AMQP map | + +---------------------------+----------------------------------------------------------------------+ + |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | + | +------------------------+---------------------------------------------+ + | |AMQP Type |Go Type in interface{} | + | +========================+=============================================+ + | |bool |bool | + | +------------------------+---------------------------------------------+ + | |char |unint8 | + | +------------------------+---------------------------------------------+ + | |byte,short,int,long |int8,int16,int32,int64 | + | +------------------------+---------------------------------------------+ + | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | + | +------------------------+---------------------------------------------+ + | |float, double |float32, float64 | + | +------------------------+---------------------------------------------+ + | |string |string | + | +------------------------+---------------------------------------------+ + | |symbol |Symbol | + | +------------------------+---------------------------------------------+ + | |binary |Binary | + | +------------------------+---------------------------------------------+ + | |nulll |nil | + | +------------------------+---------------------------------------------+ + | |map |Map | + | +------------------------+---------------------------------------------+ + | |list |List | + +---------------------------+------------------------+---------------------------------------------+ + +The following Go types cannot be unmarshaled: complex64/128, uintptr, function, interface, channel. + +TODO types + +AMQP: timestamp, decimal32/64/128, uuid, described, array. + +Go: array, struct. + +Maps: currently we cannot unmarshal AMQP maps with unhashable key types, need an alternate +representation for those. +*/ +func Unmarshal(bytes []byte, v interface{}) (n int, err error) { + defer internal.DoRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + n = unmarshal(data, bytes, v) + if n == 0 { + err = internal.Errorf("not enough data") + } + return +} + +// more reads more data when we can't parse a complete AMQP type +func (d *Decoder) more() error { + var readSize int64 = minDecode + if int64(d.buffer.Len()) > readSize { // Grow by doubling + readSize = int64(d.buffer.Len()) + } + var n int64 + n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) + if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 + err = io.EOF + } + return err +} + +// unmarshal decodes from bytes and converts into the value pointed to by v. +// Used by Unmarshal and Decode +// +// Returns the number of bytes decoded or 0 if not enough data. +// +func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) { + n = decode(data, bytes) + if n == 0 { + return 0 + } + get(data, v) + return +} + +// get value from data into value pointed at by v. +func get(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + + switch v := v.(type) { + case *bool: + switch pnType { + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *int8: + switch pnType { + case C.PN_CHAR: + *v = int8(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *uint8: + switch pnType { + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *int16: + switch pnType { + case C.PN_CHAR: + *v = int16(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int16(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *uint16: + switch pnType { + case C.PN_CHAR: + *v = uint16(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint16(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *int32: + switch pnType { + case C.PN_CHAR: + *v = int32(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int32(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int32(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + case *uint32: + switch pnType { + case C.PN_CHAR: + *v = uint32(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint32(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint32(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *int64: + switch pnType { + case C.PN_CHAR: + *v = int64(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int64(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int64(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int64(C.pn_data_get_int(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *uint64: + switch pnType { + case C.PN_CHAR: + *v = uint64(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint64(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint64(C.pn_data_get_ushort(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *int: + switch pnType { + case C.PN_CHAR: + *v = int(C.pn_data_get_char(data)) + case C.PN_BYTE: + *v = int(C.pn_data_get_byte(data)) + case C.PN_SHORT: + *v = int(C.pn_data_get_short(data)) + case C.PN_INT: + *v = int(C.pn_data_get_int(data)) + case C.PN_LONG: + if unsafe.Sizeof(0) == 8 { + *v = int(C.pn_data_get_long(data)) + } else { + panic(newBadUnmarshal(pnType, v)) + } + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *uint: + switch pnType { + case C.PN_CHAR: + *v = uint(C.pn_data_get_char(data)) + case C.PN_UBYTE: + *v = uint(C.pn_data_get_ubyte(data)) + case C.PN_USHORT: + *v = uint(C.pn_data_get_ushort(data)) + case C.PN_UINT: + *v = uint(C.pn_data_get_uint(data)) + case C.PN_ULONG: + if unsafe.Sizeof(0) == 8 { + *v = uint(C.pn_data_get_ulong(data)) + } else { + panic(newBadUnmarshal(pnType, v)) + } + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *float32: + switch pnType { + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *float64: + switch pnType { + case C.PN_FLOAT: + *v = float64(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *string: + switch pnType { + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goString(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goString(C.pn_data_get_binary(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *[]byte: + switch pnType { + case C.PN_STRING: + *v = goBytes(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = goBytes(C.pn_data_get_symbol(data)) + case C.PN_BINARY: + *v = goBytes(C.pn_data_get_binary(data)) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *Binary: + switch pnType { + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *Symbol: + switch pnType { + case C.PN_SYMBOL: + *v = Symbol(goBytes(C.pn_data_get_symbol(data))) + default: + panic(newBadUnmarshal(pnType, v)) + } + + case *interface{}: + getInterface(data, v) + + default: + if reflect.TypeOf(v).Kind() != reflect.Ptr { + panic(newBadUnmarshal(pnType, v)) + } + switch reflect.TypeOf(v).Elem().Kind() { + case reflect.Map: + getMap(data, v) + case reflect.Slice: + getList(data, v) + default: + panic(newBadUnmarshal(pnType, v)) + } + } + err := dataError("unmarshaling", data) + if err != nil { + panic(err) + } + return +} + +// Getting into an interface is driven completely by the AMQP type, since the interface{} +// target is type-neutral. +func getInterface(data *C.pn_data_t, v *interface{}) { + pnType := C.pn_data_type(data) + switch pnType { + case C.PN_NULL: + *v = nil + case C.PN_BOOL: + *v = bool(C.pn_data_get_bool(data)) + case C.PN_UBYTE: + *v = uint8(C.pn_data_get_ubyte(data)) + case C.PN_BYTE: + *v = int8(C.pn_data_get_byte(data)) + case C.PN_USHORT: + *v = uint16(C.pn_data_get_ushort(data)) + case C.PN_SHORT: + *v = int16(C.pn_data_get_short(data)) + case C.PN_UINT: + *v = uint32(C.pn_data_get_uint(data)) + case C.PN_INT: + *v = int32(C.pn_data_get_int(data)) + case C.PN_CHAR: + *v = uint8(C.pn_data_get_char(data)) + case C.PN_ULONG: + *v = uint64(C.pn_data_get_ulong(data)) + case C.PN_LONG: + *v = int64(C.pn_data_get_long(data)) + case C.PN_FLOAT: + *v = float32(C.pn_data_get_float(data)) + case C.PN_DOUBLE: + *v = float64(C.pn_data_get_double(data)) + case C.PN_BINARY: + *v = Binary(goBytes(C.pn_data_get_binary(data))) + case C.PN_STRING: + *v = goString(C.pn_data_get_string(data)) + case C.PN_SYMBOL: + *v = Symbol(goString(C.pn_data_get_symbol(data))) + case C.PN_MAP: + m := make(Map) + get(data, &m) + *v = m // FIXME aconway 2015-03-13: avoid the copy? + case C.PN_LIST: + l := make(List, 0) + get(data, &l) + *v = l // FIXME aconway 2015-03-13: avoid the copy? + default: + panic(newBadUnmarshal(pnType, v)) + } +} + +// get into map pointed at by v +func getMap(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_MAP { + panic(newBadUnmarshal(pnType, v)) + } + mapValue := reflect.ValueOf(v).Elem() + mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map + count := int(C.pn_data_get_map(data)) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count/2; i++ { + if bool(C.pn_data_next(data)) { + key := reflect.New(mapValue.Type().Key()) + get(data, key.Interface()) + if bool(C.pn_data_next(data)) { + val := reflect.New(mapValue.Type().Elem()) + get(data, val.Interface()) + mapValue.SetMapIndex(key.Elem(), val.Elem()) + } + } + } + C.pn_data_exit(data) + } +} + +func getList(data *C.pn_data_t, v interface{}) { + pnType := C.pn_data_type(data) + if pnType != C.PN_LIST { + panic(newBadUnmarshal(pnType, v)) + } + count := int(C.pn_data_get_list(data)) + listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) + if bool(C.pn_data_enter(data)) { + for i := 0; i < count; i++ { + if bool(C.pn_data_next(data)) { + val := reflect.New(listValue.Type().Elem()) + get(data, val.Interface()) + listValue.Index(i).Set(val.Elem()) + } + } + C.pn_data_exit(data) + } + // FIXME aconway 2015-04-09: avoid the copy? + reflect.ValueOf(v).Elem().Set(listValue) +} + +// decode from bytes. +// Return bytes decoded or 0 if we could not decode a complete object. +// +func decode(data *C.pn_data_t, bytes []byte) int { + if len(bytes) == 0 { + return 0 + } + n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) + if n == int(C.PN_UNDERFLOW) { + C.pn_error_clear(C.pn_data_error(data)) + return 0 + } else if n <= 0 { + panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) + } + return n +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go new file mode 100644 index 0000000..58711c7 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url.go @@ -0,0 +1,96 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +/* +#include +#include +#include + +// Helper function for setting URL fields. +typedef void (*setter_fn)(pn_url_t* url, const char* value); +inline void set(pn_url_t *url, setter_fn s, const char* value) { + s(url, value); +} +*/ +import "C" + +import ( + "net" + "net/url" + "qpid.apache.org/proton/go/internal" + "unsafe" +) + +const ( + amqp string = "amqp" + amqps = "amqps" +) + +// ParseUrl parses an AMQP URL string and returns a net/url.Url. +// +// It is more forgiving than net/url.Parse and allows most of the parts of the +// URL to be missing, assuming AMQP defaults. +// +func ParseURL(s string) (u *url.URL, err error) { + cstr := C.CString(s) + defer C.free(unsafe.Pointer(cstr)) + pnUrl := C.pn_url_parse(cstr) + if pnUrl == nil { + return nil, internal.Errorf("bad URL %#v", s) + } + defer C.pn_url_free(pnUrl) + + scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) + username := C.GoString(C.pn_url_get_username(pnUrl)) + password := C.GoString(C.pn_url_get_password(pnUrl)) + host := C.GoString(C.pn_url_get_host(pnUrl)) + port := C.GoString(C.pn_url_get_port(pnUrl)) + path := C.GoString(C.pn_url_get_path(pnUrl)) + + if err != nil { + return nil, internal.Errorf("bad URL %#v: %s", s, err) + } + if scheme == "" { + scheme = amqp + } + if port == "" { + if scheme == amqps { + port = amqps + } else { + port = amqp + } + } + var user *url.Userinfo + if password != "" { + user = url.UserPassword(username, password) + } else if username != "" { + user = url.User(username) + } + + u = &url.URL{ + Scheme: scheme, + User: user, + Host: net.JoinHostPort(host, port), + Path: path, + } + + return u, nil +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go new file mode 100644 index 0000000..f80f1c4 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/url_test.go @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "fmt" +) + +func ExampleParseURL() { + for _, s := range []string{ + "amqp://username:password@host:1234/path", + "host:1234", + "host", + ":1234", + "host/path", + "amqps://host", + "", + } { + u, err := ParseURL(s) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(u) + } + } + // Output: + // amqp://username:password@host:1234/path + // amqp://host:1234 + // amqp://host:amqp + // amqp://:1234 + // amqp://host:amqp/path + // amqps://host:amqps + // proton: bad URL "" +} diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go new file mode 100644 index 0000000..7a9ec12 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/doc.go @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package event provides a low-level API to the proton AMQP engine. + +For most tasks, consider instead package qpid.apache.org/proton/go/messaging. +It provides a higher-level, concurrent API that is easier to use. + +The API is event based. There are two alternative styles of handler. EventHandler +provides the core proton events. MessagingHandler provides a slighly simplified +view of the event stream and automates some common tasks. + +See type Pump documentation for more details of the interaction between proton +events and goroutines. +*/ +package event + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// This file is just for the package comment. diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go new file mode 100644 index 0000000..d76fac9 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/handlers.go @@ -0,0 +1,411 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express o --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org