Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B97D10153 for ; Tue, 8 Sep 2015 16:37:22 +0000 (UTC) Received: (qmail 31147 invoked by uid 500); 8 Sep 2015 16:37:21 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 31106 invoked by uid 500); 8 Sep 2015 16:37:21 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 30522 invoked by uid 99); 8 Sep 2015 16:37:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Sep 2015 16:37:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9795EE0664; Tue, 8 Sep 2015 16:37:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Tue, 08 Sep 2015 16:38:02 -0000 Message-Id: <549f0194e1b946c0a3c41116cbf9f458@git.apache.org> In-Reply-To: <4391ed4e15704d9c85eeba9305b98280@git.apache.org> References: <4391ed4e15704d9c85eeba9305b98280@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] qpid-proton git commit: PROTON-827: Switch go directory and symlink locations, problems with some go tools. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a8ff0bb1/proton-c/bindings/go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go b/proton-c/bindings/go new file mode 120000 index 0000000..6ffb7db --- /dev/null +++ b/proton-c/bindings/go @@ -0,0 +1 @@ +../../go \ No newline at end of file diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md deleted file mode 100644 index 397016c..0000000 --- a/proton-c/bindings/go/README.md +++ /dev/null @@ -1,139 +0,0 @@ -# *EXPERIMENTAL* Go binding for proton - -This is the beginning of a [Go](http://golang.org) binding for proton. - -This work is in early *experimental* stages, *everything* may change in future. -Comments and contributions are strongly encouraged, this experiment is public so -early feedback can guide development. - -- Email -- Create issues , attach patches to an issue. - -There are working [examples](../../../examples/go/README.md) and the examples README file -explains how to install the packages in your go workspace and read the documentation. - -## Goals - -The API should - -- be idiomatic, unsurprising, and easy to use for Go developers. -- support client and server development. -- make simple tasks simple. -- provide deep access to AMQP protocol when that is required. - -There are two types of developer we want to support - -1. Go developers using AMQP as a message transport: - - Straightforward conversions between Go built-in types and AMQP types. - - Easy message exchange via Go channels to support use in goroutines. - -2. AMQP-aware developers using Go as an implementation language: - - Go types to exactly represent all AMQP types and encoding details. - - Full access to detailed AMQP concepts: sessions, links, deliveries etc. - -## Status - -There are 3 go packages for proton: - -- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. -- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. -- qpid.apache.org/proton/go/event: full low-level access to the proton engine. - -Most applications should use the `messaging` package. The `event` package is for -applications that need low-level access to the proton engine. - -The `event` package is fairly complete, with the exception of the proton -reactor. It's unclear if the reactor is important for go. - -The `messaging` package can run the examples but probably not much else. There -is work to do on error handling and the API may change. - -There are working [examples](../../../examples/go/README.md) of a broker using `event` and -a sender and receiver using `messaging`. - -## The event driven API - -See the package documentation for details. - -## The Go API - -The goal: A procedural API that allows any user goroutine to send and receive -AMQP messages and other information (acknowledgments, flow control instructions -etc.) using channels. There will be no user-visible locks and no need to run -user code in special goroutines, e.g. as handlers in a proton event loop. - -See the package documentation for emerging details. - -Currently using a channel to receive messages, a function to send them (channels -internally) and a channel as a "future" for acknowledgements to senders. This -may change. - -## Why a separate API for Go? - -Go is a concurrent language and encourages applications to be divided into -concurrent *goroutines*. It provides traditional locking but it encourages the -use *channels* to communicate between goroutines without explicit locks: - - "Share memory by communicating, don't communicate by sharing memory" - -The idea is that a given value is only operated on by one goroutine at a time, -but values can easily be passed from one goroutine to another. This removes much -of the need for locking. - -Go literature distinguishes between: - -- *concurrency*: "keeping track of things that could be done in parallel" -- *parallelism*: "actually doing things in parallel" - -The application expresses concurrency by starting goroutines for potentially -concurrent tasks. The Go run-times schedule the activity of goroutines onto a -small number (possibly one) of actual parallel executions. - -Even with *no* parallelism, concurrency lets the Go run-times *order* work with -respect to events like file descriptors being readable/writable, channels having -data, timers firing etc. Go automatically takes care of switching out goroutines -that block or sleep so it is normal to write code in terms of blocking calls. - -Event-driven API (like poll, epoll, select or the proton event API) also -channel unpredictably ordered events to actions in one or a small pool of -execution threads. However this requires a different style of programming: -"event-driven" or "reactive" programming. Go developers call it "inside-out" -programming. In an event-driven architecture blocking is a big problem as it -consumes a scarce thread of execution, so actions that take time to complete -have to be re-structured in terms of future event delivery. - -The promise of Go is that you can express your application in concurrent, -procedural terms with simple blocking calls and the Go run-times will turn it -inside-out for you. Write as many goroutines as you want, and let Go interleave -and schedule them efficiently. - -For example: the Go equivalent of listening for connections is a goroutine with -a simple endless loop that calls a blocking Listen() function and starts a -goroutine for each new connection. Each connection has its own goroutine that -deals with just that connection till it closes. - -The benefit is that the variables and logic live closer together. Once you're in -a goroutine, you have everything you need in local variables, and they are -preserved across blocking calls. There's no need to store details in context -objects that you have to look up when handling a later event to figure out how -to continue where you left off. - -So a Go-like proton API does not force the users code to run in an event-loop -goroutine. Instead user goroutines communicate with the event loop(s) via -channels. There's no need to funnel connections into one event loop, in fact it -makes no sense. Connections can be processed concurrently so they should be -processed in separate goroutines and left to Go to schedule. User goroutines can -have simple loops that block channels till messages are available, the user can -start as many or as few such goroutines as they wish to implement concurrency as -simple or as complex as they wish. For example blocking request-response -vs. asynchronous flows of messages and acknowledgments. - -## New to Go? - -If you are new to Go then these are a good place to start: - -- [A Tour of Go](http://tour.golang.org) -- [Effective Go](http://golang.org/doc/effective_go.html) - -Then look at the tools and library docs at as you need them. - diff --git a/proton-c/bindings/go/amqp/doc.go b/proton-c/bindings/go/amqp/doc.go deleted file mode 100644 index 7c00aa0..0000000 --- a/proton-c/bindings/go/amqp/doc.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package amqp encodes and decodes AMQP messages and data as Go types. - -It follows the standard 'encoding' libraries pattern. The mapping between AMQP -and Go types is described in the documentation of the Marshal and Unmarshal -functions. - -The sub-packages 'event' and 'messaging' provide two alternative ways to write -AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' -gives complete low-level control of the underlying proton C engine. - -AMQP is an open standard for inter-operable message exchange, see -*/ -package amqp - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. - -// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. diff --git a/proton-c/bindings/go/amqp/interop b/proton-c/bindings/go/amqp/interop deleted file mode 120000 index 8f50d0e..0000000 --- a/proton-c/bindings/go/amqp/interop +++ /dev/null @@ -1 +0,0 @@ -../../../../tests/interop \ No newline at end of file diff --git a/proton-c/bindings/go/amqp/interop_test.go b/proton-c/bindings/go/amqp/interop_test.go deleted file mode 100644 index 11049f7..0000000 --- a/proton-c/bindings/go/amqp/interop_test.go +++ /dev/null @@ -1,308 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Test that conversion of Go type to/from AMQP is compatible with other -// bindings. -// -package amqp - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - "os" - "reflect" - "strings" - "testing" -) - -func assertEqual(want interface{}, got interface{}) { - if !reflect.DeepEqual(want, got) { - panic(fmt.Errorf("%#v != %#v", want, got)) - } -} - -func assertNil(err interface{}) { - if err != nil { - panic(err) - } -} - -func getReader(name string) (r io.Reader) { - r, err := os.Open("interop/" + name + ".amqp") - if err != nil { - panic(fmt.Errorf("Can't open %#v: %v", name, err)) - } - return -} - -func remaining(d *Decoder) string { - remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) - return string(remainder) -} - -// assertDecode: want is the expected value, gotPtr is a pointer to a -// instance of the same type for Decode. -func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) { - - assertNil(d.Decode(gotPtr)) - - got := reflect.ValueOf(gotPtr).Elem().Interface() - assertEqual(want, got) - - // Try round trip encoding - bytes, err := Marshal(want, nil) - assertNil(err) - n, err := Unmarshal(bytes, gotPtr) - assertNil(err) - assertEqual(n, len(bytes)) - got = reflect.ValueOf(gotPtr).Elem().Interface() - assertEqual(want, got) -} - -func TestUnmarshal(t *testing.T) { - bytes, err := ioutil.ReadAll(getReader("strings")) - if err != nil { - t.Error(err) - } - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - n, err := Unmarshal(bytes, &got) - if err != nil { - t.Error(err) - } - if want != got { - t.Errorf("%#v != %#v", want, got) - } - bytes = bytes[n:] - } -} - -func TestPrimitivesExact(t *testing.T) { - d := NewDecoder(getReader("primitives")) - // Decoding into exact types - var b bool - assertDecode(d, true, &b) - assertDecode(d, false, &b) - var u8 uint8 - assertDecode(d, uint8(42), &u8) - var u16 uint16 - assertDecode(d, uint16(42), &u16) - var i16 int16 - assertDecode(d, int16(-42), &i16) - var u32 uint32 - assertDecode(d, uint32(12345), &u32) - var i32 int32 - assertDecode(d, int32(-12345), &i32) - var u64 uint64 - assertDecode(d, uint64(12345), &u64) - var i64 int64 - assertDecode(d, int64(-12345), &i64) - var f32 float32 - assertDecode(d, float32(0.125), &f32) - var f64 float64 - assertDecode(d, float64(0.125), &f64) -} - -func TestPrimitivesCompatible(t *testing.T) { - d := NewDecoder(getReader("primitives")) - // Decoding into compatible types - var b bool - var i int - var u uint - var f float64 - assertDecode(d, true, &b) - assertDecode(d, false, &b) - assertDecode(d, uint(42), &u) - assertDecode(d, uint(42), &u) - assertDecode(d, -42, &i) - assertDecode(d, uint(12345), &u) - assertDecode(d, -12345, &i) - assertDecode(d, uint(12345), &u) - assertDecode(d, -12345, &i) - assertDecode(d, 0.125, &f) - assertDecode(d, 0.125, &f) -} - -// assertDecodeValue: want is the expected value, decode into a reflect.Value -func assertDecodeInterface(d *Decoder, want interface{}) { - - var got, got2 interface{} - assertNil(d.Decode(&got)) - - assertEqual(want, got) - - // Try round trip encoding - bytes, err := Marshal(got, nil) - assertNil(err) - n, err := Unmarshal(bytes, &got2) - assertNil(err) - assertEqual(n, len(bytes)) - assertEqual(want, got2) -} - -func TestPrimitivesInterface(t *testing.T) { - d := NewDecoder(getReader("primitives")) - assertDecodeInterface(d, true) - assertDecodeInterface(d, false) - assertDecodeInterface(d, uint8(42)) - assertDecodeInterface(d, uint16(42)) - assertDecodeInterface(d, int16(-42)) - assertDecodeInterface(d, uint32(12345)) - assertDecodeInterface(d, int32(-12345)) - assertDecodeInterface(d, uint64(12345)) - assertDecodeInterface(d, int64(-12345)) - assertDecodeInterface(d, float32(0.125)) - assertDecodeInterface(d, float64(0.125)) -} - -func TestStrings(t *testing.T) { - d := NewDecoder(getReader("strings")) - // Test decoding as plain Go strings - for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { - var got string - assertDecode(d, want, &got) - } - remains := remaining(d) - if remains != "" { - t.Errorf("leftover: %s", remains) - } - - // Test decoding as specific string types - d = NewDecoder(getReader("strings")) - var bytes []byte - var str, sym string - assertDecode(d, []byte("abc\000defg"), &bytes) - assertDecode(d, "abcdefg", &str) - assertDecode(d, "abcdefg", &sym) - assertDecode(d, make([]byte, 0), &bytes) - assertDecode(d, "", &str) - assertDecode(d, "", &sym) - remains = remaining(d) - if remains != "" { - t.Fatalf("leftover: %s", remains) - } - - // Test some error handling - d = NewDecoder(getReader("strings")) - var s string - err := d.Decode(s) - if err == nil { - t.Fatal("Expected error") - } - if !strings.Contains(err.Error(), "not a pointer") { - t.Error(err) - } - var i int - err = d.Decode(&i) - if !strings.Contains(err.Error(), "cannot unmarshal") { - t.Error(err) - } - _, err = Unmarshal([]byte{}, nil) - if !strings.Contains(err.Error(), "not enough data") { - t.Error(err) - } - _, err = Unmarshal([]byte("foobar"), nil) - if !strings.Contains(err.Error(), "invalid-argument") { - t.Error(err) - } -} - -func TestEncodeDecode(t *testing.T) { - type data struct { - s string - i int - u8 uint8 - b bool - f float32 - v interface{} - } - - in := data{"foo", 42, 9, true, 1.234, "thing"} - - buf := bytes.Buffer{} - e := NewEncoder(&buf) - assertNil(e.Encode(in.s)) - assertNil(e.Encode(in.i)) - assertNil(e.Encode(in.u8)) - assertNil(e.Encode(in.b)) - assertNil(e.Encode(in.f)) - assertNil(e.Encode(in.v)) - - var out data - d := NewDecoder(&buf) - assertNil(d.Decode(&out.s)) - assertNil(d.Decode(&out.i)) - assertNil(d.Decode(&out.u8)) - assertNil(d.Decode(&out.b)) - assertNil(d.Decode(&out.f)) - assertNil(d.Decode(&out.v)) - - assertEqual(in, out) -} - -func TestMap(t *testing.T) { - d := NewDecoder(getReader("maps")) - - // Generic map - var m Map - assertDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m) - - // Interface as map - var i interface{} - assertDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i) - - d = NewDecoder(getReader("maps")) - // Specific typed map - var m2 map[string]int - assertDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2) - - // Round trip a nested map - m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} - bytes, err := Marshal(m, nil) - assertNil(err) - _, err = Unmarshal(bytes, &i) - assertNil(err) - assertEqual(m, i) -} - -func TestList(t *testing.T) { - d := NewDecoder(getReader("lists")) - var l List - assertDecode(d, List{int32(32), "foo", true}, &l) - assertDecode(d, List{}, &l) -} - -func FIXMETestMessage(t *testing.T) { - // FIXME aconway 2015-04-09: integrate Message encoding under marshal/unmarshal API. - bytes, err := ioutil.ReadAll(getReader("message")) - assertNil(err) - m, err := DecodeMessage(bytes) - assertNil(err) - fmt.Printf("%+v\n", m) - assertEqual(m.Body(), "hello") - - bytes2 := make([]byte, len(bytes)) - bytes2, err = m.Encode(bytes2) - assertNil(err) - assertEqual(bytes, bytes2) -} - -// FIXME aconway 2015-03-13: finish the full interop test diff --git a/proton-c/bindings/go/amqp/marshal.go b/proton-c/bindings/go/amqp/marshal.go deleted file mode 100644 index e5c2945..0000000 --- a/proton-c/bindings/go/amqp/marshal.go +++ /dev/null @@ -1,238 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include -import "C" - -import ( - "io" - "qpid.apache.org/proton/go/internal" - "reflect" - "unsafe" -) - -func dataError(prefix string, data *C.pn_data_t) error { - err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) - if err != nil { - err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) - } - return err -} - -/* -Marshal encodes a Go value as AMQP data in buffer. -If buffer is nil, or is not large enough, a new buffer is created. - -Returns the buffer used for encoding with len() adjusted to the actual size of data. - -Go types are encoded as follows - - +-------------------------------------+--------------------------------------------+ - |Go type |AMQP type | - +-------------------------------------+--------------------------------------------+ - |bool |bool | - +-------------------------------------+--------------------------------------------+ - |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | - +-------------------------------------+--------------------------------------------+ - |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | - +-------------------------------------+--------------------------------------------+ - |float32, float64 |float, double. | - +-------------------------------------+--------------------------------------------+ - |string |string | - +-------------------------------------+--------------------------------------------+ - |[]byte, Binary |binary | - +-------------------------------------+--------------------------------------------+ - |Symbol |symbol | - +-------------------------------------+--------------------------------------------+ - |interface{} |the contained type | - +-------------------------------------+--------------------------------------------+ - |nil |null | - +-------------------------------------+--------------------------------------------+ - |map[K]T |map with K and T converted as above | - +-------------------------------------+--------------------------------------------+ - |Map |map, may have mixed types for keys, values | - +-------------------------------------+--------------------------------------------+ - |[]T |list with T converted as above | - +-------------------------------------+--------------------------------------------+ - |List |list, may have mixed types values | - +-------------------------------------+--------------------------------------------+ - -TODO Go types: array, slice, struct - -Go types that cannot be marshaled: complex64/128, uintptr, function, interface, channel -*/ -func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - put(data, v) - encode := func(buf []byte) ([]byte, error) { - n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) - switch { - case n == int(C.PN_OVERFLOW): - return buf, overflow - case n < 0: - return buf, dataError("marshal error", data) - default: - return buf[:n], nil - } - } - return encodeGrow(buffer, encode) -} - -const minEncode = 256 - -// overflow is returned when an encoding function can't fit data in the buffer. -var overflow = internal.Errorf("buffer too small") - -// encodeFn encodes into buffer[0:len(buffer)]. -// Returns buffer with length adjusted for data encoded. -// If buffer too small, returns overflow as error. -type encodeFn func(buffer []byte) ([]byte, error) - -// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. -// Returns the final buffer. -func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { - if buffer == nil || len(buffer) == 0 { - buffer = make([]byte, minEncode) - } - var err error - for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { - buffer = make([]byte, 2*len(buffer)) - } - return buffer, err -} - -func put(data *C.pn_data_t, v interface{}) { - switch v := v.(type) { - case nil: - C.pn_data_put_null(data) - case bool: - C.pn_data_put_bool(data, C.bool(v)) - case int8: - C.pn_data_put_byte(data, C.int8_t(v)) - case int16: - C.pn_data_put_short(data, C.int16_t(v)) - case int32: - C.pn_data_put_int(data, C.int32_t(v)) - case int64: - C.pn_data_put_long(data, C.int64_t(v)) - case int: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_long(data, C.int64_t(v)) - } else { - C.pn_data_put_int(data, C.int32_t(v)) - } - case uint8: - C.pn_data_put_ubyte(data, C.uint8_t(v)) - case uint16: - C.pn_data_put_ushort(data, C.uint16_t(v)) - case uint32: - C.pn_data_put_uint(data, C.uint32_t(v)) - case uint64: - C.pn_data_put_ulong(data, C.uint64_t(v)) - case uint: - if unsafe.Sizeof(0) == 8 { - C.pn_data_put_ulong(data, C.uint64_t(v)) - } else { - C.pn_data_put_uint(data, C.uint32_t(v)) - } - case float32: - C.pn_data_put_float(data, C.float(v)) - case float64: - C.pn_data_put_double(data, C.double(v)) - case string: - C.pn_data_put_string(data, pnBytes([]byte(v))) - case []byte: - C.pn_data_put_binary(data, pnBytes(v)) - case Binary: - C.pn_data_put_binary(data, pnBytes([]byte(v))) - case Symbol: - C.pn_data_put_symbol(data, pnBytes([]byte(v))) - case Map: // Special map type - C.pn_data_put_map(data) - C.pn_data_enter(data) - for key, val := range v { - put(data, key) - put(data, val) - } - C.pn_data_exit(data) - default: - switch reflect.TypeOf(v).Kind() { - case reflect.Map: - putMap(data, v) - case reflect.Slice: - putList(data, v) - default: - panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) - } - } - err := dataError("marshal", data) - if err != nil { - panic(err) - } - return -} - -func putMap(data *C.pn_data_t, v interface{}) { - mapValue := reflect.ValueOf(v) - C.pn_data_put_map(data) - C.pn_data_enter(data) - for _, key := range mapValue.MapKeys() { - put(data, key.Interface()) - put(data, mapValue.MapIndex(key).Interface()) - } - C.pn_data_exit(data) -} - -func putList(data *C.pn_data_t, v interface{}) { - listValue := reflect.ValueOf(v) - C.pn_data_put_list(data) - C.pn_data_enter(data) - for i := 0; i < listValue.Len(); i++ { - put(data, listValue.Index(i).Interface()) - } - C.pn_data_exit(data) -} - -// Encoder encodes AMQP values to an io.Writer -type Encoder struct { - writer io.Writer - buffer []byte -} - -// New encoder returns a new encoder that writes to w. -func NewEncoder(w io.Writer) *Encoder { - return &Encoder{w, make([]byte, minEncode)} -} - -func (e *Encoder) Encode(v interface{}) (err error) { - e.buffer, err = Marshal(v, e.buffer) - if err == nil { - e.writer.Write(e.buffer) - } - return err -} - -func replace(data *C.pn_data_t, v interface{}) { - C.pn_data_clear(data) - put(data, v) -} diff --git a/proton-c/bindings/go/amqp/message.go b/proton-c/bindings/go/amqp/message.go deleted file mode 100644 index 87093f5..0000000 --- a/proton-c/bindings/go/amqp/message.go +++ /dev/null @@ -1,342 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include -// #include -// #include -import "C" - -import ( - "qpid.apache.org/proton/go/internal" - "time" - "unsafe" -) - -// FIXME aconway 2015-04-28: Do we need the interface or can we just export the struct? - -// Message is the interface to an AMQP message. -// Instances of this interface contain a pointer to the underlying struct. -type Message interface { - /** - * Inferred indicates how the message content - * is encoded into AMQP sections. If inferred is true then binary and - * list values in the body of the message will be encoded as AMQP DATA - * and AMQP SEQUENCE sections, respectively. If inferred is false, - * then all values in the body of the message will be encoded as AMQP - * VALUE sections regardless of their type. - */ - Inferred() bool - SetInferred(bool) - - /** - * Durable indicates that any parties taking responsibility - * for the message must durably store the content. - */ - Durable() bool - SetDurable(bool) - - /** - * Priority impacts ordering guarantees. Within a - * given ordered context, higher priority messages may jump ahead of - * lower priority messages. - */ - Priority() uint8 - SetPriority(uint8) - - /** - * TTL or Time To Live, a message it may be dropped after this duration - */ - TTL() time.Duration - SetTTL(time.Duration) - - /** - * FirstAcquirer indicates - * that the recipient of the message is the first recipient to acquire - * the message, i.e. there have been no failed delivery attempts to - * other acquirers. Note that this does not mean the message has not - * been delivered to, but not acquired, by other recipients. - */ - FirstAcquirer() bool - SetFirstAcquirer(bool) - - /** - * DeliveryCount tracks how many attempts have been made to - * delivery a message. - */ - DeliveryCount() uint32 - SetDeliveryCount(uint32) - - /** - * MessageId provides a unique identifier for a message. - * it can be an a string, an unsigned long, a uuid or a - * binary value. - */ - MessageId() interface{} - SetMessageId(interface{}) - - UserId() string - SetUserId(string) - - Address() string - SetAddress(string) - - Subject() string - SetSubject(string) - - ReplyTo() string - SetReplyTo(string) - - /** - * CorrelationId is set on correlated request and response messages. It can be an a string, an unsigned long, a uuid or a - * binary value. - */ - CorrelationId() interface{} - SetCorrelationId(interface{}) - - ContentType() string - SetContentType(string) - - ContentEncoding() string - SetContentEncoding(string) - - // ExpiryTime indicates an absoulte time when the message may be dropped. - // A Zero time (i.e. t.isZero() == true) indicates a message never expires. - ExpiryTime() time.Time - SetExpiryTime(time.Time) - - CreationTime() time.Time - SetCreationTime(time.Time) - - GroupId() string - SetGroupId(string) - - GroupSequence() int32 - SetGroupSequence(int32) - - ReplyToGroupId() string - SetReplyToGroupId(string) - - /** - * Instructions can be used to access or modify AMQP delivery instructions. - */ - Instructions() *map[string]interface{} - - /** - * Annotations can be used to access or modify AMQP annotations. - */ - Annotations() *map[string]interface{} - - /** - * Properties can be used to access or modify the application properties of a message. - */ - Properties() *map[string]interface{} - - /** - * Body of the message can be any AMQP encodable type. - */ - Body() interface{} - SetBody(interface{}) - - // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough - // the message is encoded into it, otherwise a new buffer is created. - // Returns the buffer containing the message. - Encode(buffer []byte) ([]byte, error) -} - -// NewMessage creates a new message instance. The returned interface contains a pointer. -func NewMessage() Message { - pn := C.pn_message() // Pick up default setting from C message. - defer C.pn_message_free(pn) - return goMessage(pn) -} - -// Message implementation copies all message data into Go space so it can be proprely -// memory managed. -// -type message struct { - inferred, durable, firstAcquirer bool - priority uint8 - ttl time.Duration - deliveryCount uint32 - messageId interface{} - userId, address, subject, replyTo string - contentType, contentEncoding string - groupId, replyToGroupId string - creationTime, expiryTime time.Time - groupSequence int32 - correlationId interface{} - instructions, annotations, properties map[string]interface{} - body interface{} -} - -func (m *message) Inferred() bool { return m.inferred } -func (m *message) SetInferred(b bool) { m.inferred = b } -func (m *message) Durable() bool { return m.durable } -func (m *message) SetDurable(b bool) { m.durable = b } -func (m *message) Priority() uint8 { return m.priority } -func (m *message) SetPriority(b uint8) { m.priority = b } -func (m *message) TTL() time.Duration { return m.ttl } -func (m *message) SetTTL(d time.Duration) { m.ttl = d } -func (m *message) FirstAcquirer() bool { return m.firstAcquirer } -func (m *message) SetFirstAcquirer(b bool) { m.firstAcquirer = b } -func (m *message) DeliveryCount() uint32 { return m.deliveryCount } -func (m *message) SetDeliveryCount(c uint32) { m.deliveryCount = c } -func (m *message) MessageId() interface{} { return m.messageId } -func (m *message) SetMessageId(id interface{}) { m.messageId = id } -func (m *message) UserId() string { return m.userId } -func (m *message) SetUserId(s string) { m.userId = s } -func (m *message) Address() string { return m.address } -func (m *message) SetAddress(s string) { m.address = s } -func (m *message) Subject() string { return m.subject } -func (m *message) SetSubject(s string) { m.subject = s } -func (m *message) ReplyTo() string { return m.replyTo } -func (m *message) SetReplyTo(s string) { m.replyTo = s } -func (m *message) CorrelationId() interface{} { return m.correlationId } -func (m *message) SetCorrelationId(c interface{}) { m.correlationId = c } -func (m *message) ContentType() string { return m.contentType } -func (m *message) SetContentType(s string) { m.contentType = s } -func (m *message) ContentEncoding() string { return m.contentEncoding } -func (m *message) SetContentEncoding(s string) { m.contentEncoding = s } -func (m *message) ExpiryTime() time.Time { return m.expiryTime } -func (m *message) SetExpiryTime(t time.Time) { m.expiryTime = t } -func (m *message) CreationTime() time.Time { return m.creationTime } -func (m *message) SetCreationTime(t time.Time) { m.creationTime = t } -func (m *message) GroupId() string { return m.groupId } -func (m *message) SetGroupId(s string) { m.groupId = s } -func (m *message) GroupSequence() int32 { return m.groupSequence } -func (m *message) SetGroupSequence(s int32) { m.groupSequence = s } -func (m *message) ReplyToGroupId() string { return m.replyToGroupId } -func (m *message) SetReplyToGroupId(s string) { m.replyToGroupId = s } -func (m *message) Instructions() *map[string]interface{} { return &m.instructions } -func (m *message) Annotations() *map[string]interface{} { return &m.annotations } -func (m *message) Properties() *map[string]interface{} { return &m.properties } -func (m *message) Body() interface{} { return m.body } -func (m *message) SetBody(b interface{}) { m.body = b } - -// rewindGet rewinds and then gets the value from a data object. -func rewindGet(data *C.pn_data_t, v interface{}) { - if data != nil && C.pn_data_size(data) > 0 { - C.pn_data_rewind(data) - C.pn_data_next(data) - get(data, v) - } -} - -// goMessage populates a Go message from a pn_message_t -func goMessage(pn *C.pn_message_t) *message { - m := &message{ - inferred: bool(C.pn_message_is_inferred(pn)), - durable: bool(C.pn_message_is_durable(pn)), - priority: uint8(C.pn_message_get_priority(pn)), - ttl: time.Duration(C.pn_message_get_ttl(pn)) * time.Millisecond, - firstAcquirer: bool(C.pn_message_is_first_acquirer(pn)), - deliveryCount: uint32(C.pn_message_get_delivery_count(pn)), - userId: goString(C.pn_message_get_user_id(pn)), - address: C.GoString(C.pn_message_get_address(pn)), - subject: C.GoString(C.pn_message_get_subject(pn)), - replyTo: C.GoString(C.pn_message_get_reply_to(pn)), - contentType: C.GoString(C.pn_message_get_content_type(pn)), - contentEncoding: C.GoString(C.pn_message_get_content_encoding(pn)), - expiryTime: time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(pn)))), - creationTime: time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(pn))), - groupId: C.GoString(C.pn_message_get_group_id(pn)), - groupSequence: int32(C.pn_message_get_group_sequence(pn)), - replyToGroupId: C.GoString(C.pn_message_get_reply_to_group_id(pn)), - messageId: nil, - correlationId: nil, - instructions: make(map[string]interface{}), - annotations: make(map[string]interface{}), - properties: make(map[string]interface{}), - } - rewindGet(C.pn_message_id(pn), &m.messageId) - rewindGet(C.pn_message_correlation_id(pn), &m.correlationId) - rewindGet(C.pn_message_instructions(pn), &m.instructions) - rewindGet(C.pn_message_annotations(pn), &m.annotations) - rewindGet(C.pn_message_properties(pn), &m.properties) - rewindGet(C.pn_message_body(pn), &m.body) - return m -} - -// pnMessage populates a pn_message_t from a Go message. -func (m *message) pnMessage() *C.pn_message_t { - pn := C.pn_message() - C.pn_message_set_inferred(pn, C.bool(m.Inferred())) - C.pn_message_set_durable(pn, C.bool(m.Durable())) - C.pn_message_set_priority(pn, C.uint8_t(m.priority)) - C.pn_message_set_ttl(pn, C.pn_millis_t(m.TTL()/time.Millisecond)) - C.pn_message_set_first_acquirer(pn, C.bool(m.FirstAcquirer())) - C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) - replace(C.pn_message_id(pn), m.MessageId()) - C.pn_message_set_user_id(pn, pnBytes([]byte(m.UserId()))) - C.pn_message_set_address(pn, C.CString(m.Address())) - C.pn_message_set_subject(pn, C.CString(m.Subject())) - C.pn_message_set_reply_to(pn, C.CString(m.ReplyTo())) - replace(C.pn_message_correlation_id(pn), m.CorrelationId()) - C.pn_message_set_content_type(pn, C.CString(m.ContentType())) - C.pn_message_set_content_encoding(pn, C.CString(m.ContentEncoding())) - C.pn_message_set_expiry_time(pn, pnTime(m.ExpiryTime())) - C.pn_message_set_creation_time(pn, pnTime(m.CreationTime())) - C.pn_message_set_group_id(pn, C.CString(m.GroupId())) - C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.GroupSequence())) - C.pn_message_set_reply_to_group_id(pn, C.CString(m.ReplyToGroupId())) - replace(C.pn_message_instructions(pn), *m.Instructions()) - replace(C.pn_message_annotations(pn), *m.Annotations()) - replace(C.pn_message_properties(pn), *m.Properties()) - replace(C.pn_message_body(pn), m.Body()) - return pn -} - -// FIXME aconway 2015-04-08: Move message encode/decode under Marshal/Unmarshal interfaces. - -// DecodeMessage decodes bytes as a message -func DecodeMessage(data []byte) (Message, error) { - pnMsg := C.pn_message() - defer C.pn_message_free(pnMsg) - if len(data) == 0 { - return nil, internal.Errorf("empty buffer for decode") - } - if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 { - return nil, internal.Errorf("decoding message: %s", - internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg)))) - } - return goMessage(pnMsg), nil -} - -// Encode the message into bufffer. -// If buffer is nil or len(buffer) is not sufficient to encode the message a larger -// buffer will be returned. -func (m *message) Encode(buffer []byte) ([]byte, error) { - pn := m.pnMessage() - defer C.pn_message_free(pn) - encode := func(buf []byte) ([]byte, error) { - len := cLen(buf) - result := C.pn_message_encode(pn, cPtr(buf), &len) - switch { - case result == C.PN_OVERFLOW: - return buf, overflow - case result < 0: - return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) - default: - return buf[:len], nil - } - } - return encodeGrow(buffer, encode) -} diff --git a/proton-c/bindings/go/amqp/message_test.go b/proton-c/bindings/go/amqp/message_test.go deleted file mode 100644 index 46e26de..0000000 --- a/proton-c/bindings/go/amqp/message_test.go +++ /dev/null @@ -1,90 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "reflect" - "testing" - "time" -) - -func roundTrip(t *testing.T, m Message) { - buffer, err := m.Encode(nil) - if err != nil { - t.Fatalf("Encode failed: %v", err) - } - m2, err := DecodeMessage(buffer) - if err != nil { - t.Fatalf("Decode failed: %v", err) - } - if !reflect.DeepEqual(m, m2) { - t.Errorf("Message mismatch got\n%#v\nwant\n%#v", m, m2) - } -} - -func TestDefaultMessageRoundTrip(t *testing.T) { - m := NewMessage() - // Check defaults - assertEqual(m.Inferred(), false) - assertEqual(m.Durable(), false) - assertEqual(m.Priority(), uint8(4)) - assertEqual(m.TTL(), time.Duration(0)) - assertEqual(m.UserId(), "") - assertEqual(m.Address(), "") - assertEqual(m.Subject(), "") - assertEqual(m.ReplyTo(), "") - assertEqual(m.ContentType(), "") - assertEqual(m.ContentEncoding(), "") - assertEqual(m.GroupId(), "") - assertEqual(m.GroupSequence(), int32(0)) - assertEqual(m.ReplyToGroupId(), "") - assertEqual(m.MessageId(), nil) - assertEqual(m.CorrelationId(), nil) - assertEqual(*m.Instructions(), map[string]interface{}{}) - assertEqual(*m.Annotations(), map[string]interface{}{}) - assertEqual(*m.Properties(), map[string]interface{}{}) - assertEqual(m.Body(), nil) - - roundTrip(t, m) -} - -func TestMessageRoundTrip(t *testing.T) { - m := NewMessage() - m.SetInferred(false) - m.SetDurable(true) - m.SetPriority(42) - m.SetTTL(0) - m.SetUserId("user") - m.SetAddress("address") - m.SetSubject("subject") - m.SetReplyTo("replyto") - m.SetContentType("content") - m.SetContentEncoding("encoding") - m.SetGroupId("group") - m.SetGroupSequence(42) - m.SetReplyToGroupId("replytogroup") - m.SetMessageId("id") - m.SetCorrelationId("correlation") - *m.Instructions() = map[string]interface{}{"instructions": "foo"} - *m.Annotations() = map[string]interface{}{"annotations": "foo"} - *m.Properties() = map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"} - m.SetBody("hello") - roundTrip(t, m) -} diff --git a/proton-c/bindings/go/amqp/types.go b/proton-c/bindings/go/amqp/types.go deleted file mode 100644 index 8713520..0000000 --- a/proton-c/bindings/go/amqp/types.go +++ /dev/null @@ -1,193 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include -// const pn_type_t PN_DATA_TYPE_ERROR = (pn_type_t) -1; -import "C" - -import ( - "bytes" - "fmt" - "reflect" - "time" - "unsafe" -) - -func pnTypeString(pt C.pn_type_t) string { - switch pt { - case C.PN_NULL: - return "null" - case C.PN_BOOL: - return "bool" - case C.PN_UBYTE: - return "ubyte" - case C.PN_BYTE: - return "byte" - case C.PN_USHORT: - return "ushort" - case C.PN_SHORT: - return "short" - case C.PN_CHAR: - return "char" - case C.PN_UINT: - return "uint" - case C.PN_INT: - return "int" - case C.PN_ULONG: - return "ulong" - case C.PN_LONG: - return "long" - case C.PN_TIMESTAMP: - return "timestamp" - case C.PN_FLOAT: - return "float" - case C.PN_DOUBLE: - return "double" - case C.PN_DECIMAL32: - return "decimal32" - case C.PN_DECIMAL64: - return "decimal64" - case C.PN_DECIMAL128: - return "decimal128" - case C.PN_UUID: - return "uuid" - case C.PN_BINARY: - return "binary" - case C.PN_STRING: - return "string" - case C.PN_SYMBOL: - return "symbol" - case C.PN_DESCRIBED: - return "described" - case C.PN_ARRAY: - return "array" - case C.PN_LIST: - return "list" - case C.PN_MAP: - return "map" - case C.PN_DATA_TYPE_ERROR: - return "no-data" - default: - return fmt.Sprintf("unknown-type(%d)", pt) - } -} - -// Go types -var ( - bytesType = reflect.TypeOf([]byte{}) - valueType = reflect.TypeOf(reflect.Value{}) -) - -// FIXME aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. - -// Map is a generic map that can have mixed key and value types and so can represent any AMQP map -type Map map[interface{}]interface{} - -// List is a generic list that can hold mixed values and can represent any AMQP list. -// -type List []interface{} - -// Symbol is a string that is encoded as an AMQP symbol -type Symbol string - -// Binary is a string that is encoded as an AMQP binary. -// It is a string rather than a byte[] because byte[] is not hashable and can't be used as -// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte -type Binary string - -// GoString for Map prints values with their types, useful for debugging. -func (m Map) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", m) - i := len(m) - for k, v := range m { - fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) - i-- - if i > 0 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// GoString for List prints values with their types, useful for debugging. -func (l List) GoString() string { - out := &bytes.Buffer{} - fmt.Fprintf(out, "%T{", l) - for i := 0; i < len(l); i++ { - fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) - if i == len(l)-1 { - fmt.Fprint(out, ", ") - } - } - fmt.Fprint(out, "}") - return out.String() -} - -// pnTime converts Go time.Time to Proton millisecond Unix time. -func pnTime(t time.Time) C.pn_timestamp_t { - secs := t.Unix() - // Note: sub-second accuracy is not guaraunteed if the Unix time in - // nanoseconds cannot be represented by an int64 (sometime around year 2260) - msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) - return C.pn_timestamp_t(secs*1000 + msecs) -} - -// goTime converts a pn_timestamp_t to a Go time.Time. -func goTime(t C.pn_timestamp_t) time.Time { - secs := int64(t) / 1000 - nsecs := (int64(t) % 1000) * int64(time.Millisecond) - return time.Unix(secs, nsecs) -} - -func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { - if cBytes.start != nil { - bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) - } - return -} - -func goString(cBytes C.pn_bytes_t) (str string) { - if cBytes.start != nil { - str = C.GoStringN(cBytes.start, C.int(cBytes.size)) - } - return -} - -func pnBytes(b []byte) C.pn_bytes_t { - if len(b) == 0 { - return C.pn_bytes_t{0, nil} - } else { - return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} - } -} - -func cPtr(b []byte) *C.char { - if len(b) == 0 { - return nil - } - return (*C.char)(unsafe.Pointer(&b[0])) -} - -func cLen(b []byte) C.size_t { - return C.size_t(len(b)) -} diff --git a/proton-c/bindings/go/amqp/uid.go b/proton-c/bindings/go/amqp/uid.go deleted file mode 100644 index 944bf6f..0000000 --- a/proton-c/bindings/go/amqp/uid.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Generating unique IDs for various things. - -package amqp - -import ( - "strconv" - "sync/atomic" -) - -// A simple atomic counter to generate unique 64 bit IDs. -type UidCounter struct{ count uint64 } - -// NextInt gets the next uint64 value from the atomic counter. -func (uc *UidCounter) NextInt() uint64 { - return atomic.AddUint64(&uc.count, 1) -} - -// Next gets the next integer value encoded as a base32 string, safe for NUL terminated C strings. -func (uc *UidCounter) Next() string { - return strconv.FormatUint(uc.NextInt(), 32) -} diff --git a/proton-c/bindings/go/amqp/unmarshal.go b/proton-c/bindings/go/amqp/unmarshal.go deleted file mode 100644 index 89ab64a..0000000 --- a/proton-c/bindings/go/amqp/unmarshal.go +++ /dev/null @@ -1,552 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -oor more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -// #include -import "C" - -import ( - "bytes" - "fmt" - "io" - "qpid.apache.org/proton/go/internal" - "reflect" - "unsafe" -) - -const minDecode = 1024 - -// Error returned if AMQP data cannot be unmarshaled as the desired Go type. -type BadUnmarshal struct { - // The name of the AMQP type. - AMQPType string - // The Go type. - GoType reflect.Type -} - -func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { - return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)} -} - -func (e BadUnmarshal) Error() string { - if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) - } else { - return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) - } -} - -// -// Decoding from a pn_data_t -// -// NOTE: we use panic() to signal a decoding error, simplifies decoding logic. -// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode. -// - -// Decoder decodes AMQP values from an io.Reader. -// -type Decoder struct { - reader io.Reader - buffer bytes.Buffer -} - -// NewDecoder returns a new decoder that reads from r. -// -// The decoder has it's own buffer and may read more data than required for the -// AMQP values requested. Use Buffered to see if there is data left in the -// buffer. -// -func NewDecoder(r io.Reader) *Decoder { - return &Decoder{r, bytes.Buffer{}} -} - -// Buffered returns a reader of the data remaining in the Decoder's buffer. The -// reader is valid until the next call to Decode. -// -func (d *Decoder) Buffered() io.Reader { - return bytes.NewReader(d.buffer.Bytes()) -} - -// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v. -// -// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value. -// -func (d *Decoder) Decode(v interface{}) (err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - var n int - for n == 0 && err == nil { - n = unmarshal(data, d.buffer.Bytes(), v) - if n == 0 { // n == 0 means not enough data, read more - err = d.more() - if err != nil { - return - } - } - } - d.buffer.Next(n) - return -} - -/* -Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v. -Types are converted as follows: - - +---------------------------+----------------------------------------------------------------------+ - |To Go types |From AMQP types | - +===========================+======================================================================+ - |bool |bool | - +---------------------------+----------------------------------------------------------------------+ - |int, int8, int16, |Equivalent or smaller signed integer type: char, byte, short, int, | - |int32, int64 |long. | - +---------------------------+----------------------------------------------------------------------+ - |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: char, ubyte, ushort, | - |uint32, uint64 types |uint, ulong | - +---------------------------+----------------------------------------------------------------------+ - |float32, float64 |Equivalent or smaller float or double. | - +---------------------------+----------------------------------------------------------------------+ - |string, []byte |string, symbol or binary. | - +---------------------------+----------------------------------------------------------------------+ - |Symbol |symbol | - +---------------------------+----------------------------------------------------------------------+ - |map[K]T |map, provided all keys and values can unmarshal to types K, T | - +---------------------------+----------------------------------------------------------------------+ - |Map |map, any AMQP map | - +---------------------------+----------------------------------------------------------------------+ - |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: | - | +------------------------+---------------------------------------------+ - | |AMQP Type |Go Type in interface{} | - | +========================+=============================================+ - | |bool |bool | - | +------------------------+---------------------------------------------+ - | |char |unint8 | - | +------------------------+---------------------------------------------+ - | |byte,short,int,long |int8,int16,int32,int64 | - | +------------------------+---------------------------------------------+ - | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 | - | +------------------------+---------------------------------------------+ - | |float, double |float32, float64 | - | +------------------------+---------------------------------------------+ - | |string |string | - | +------------------------+---------------------------------------------+ - | |symbol |Symbol | - | +------------------------+---------------------------------------------+ - | |binary |Binary | - | +------------------------+---------------------------------------------+ - | |nulll |nil | - | +------------------------+---------------------------------------------+ - | |map |Map | - | +------------------------+---------------------------------------------+ - | |list |List | - +---------------------------+------------------------+---------------------------------------------+ - -The following Go types cannot be unmarshaled: complex64/128, uintptr, function, interface, channel. - -TODO types - -AMQP: timestamp, decimal32/64/128, uuid, described, array. - -Go: array, struct. - -Maps: currently we cannot unmarshal AMQP maps with unhashable key types, need an alternate -representation for those. -*/ -func Unmarshal(bytes []byte, v interface{}) (n int, err error) { - defer internal.DoRecover(&err) - data := C.pn_data(0) - defer C.pn_data_free(data) - n = unmarshal(data, bytes, v) - if n == 0 { - err = internal.Errorf("not enough data") - } - return -} - -// more reads more data when we can't parse a complete AMQP type -func (d *Decoder) more() error { - var readSize int64 = minDecode - if int64(d.buffer.Len()) > readSize { // Grow by doubling - readSize = int64(d.buffer.Len()) - } - var n int64 - n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize)) - if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0 - err = io.EOF - } - return err -} - -// unmarshal decodes from bytes and converts into the value pointed to by v. -// Used by Unmarshal and Decode -// -// Returns the number of bytes decoded or 0 if not enough data. -// -func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) { - n = decode(data, bytes) - if n == 0 { - return 0 - } - get(data, v) - return -} - -// get value from data into value pointed at by v. -func get(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - - switch v := v.(type) { - case *bool: - switch pnType { - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int8: - switch pnType { - case C.PN_CHAR: - *v = int8(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint8: - switch pnType { - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int16: - switch pnType { - case C.PN_CHAR: - *v = int16(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int16(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint16: - switch pnType { - case C.PN_CHAR: - *v = uint16(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint16(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *int32: - switch pnType { - case C.PN_CHAR: - *v = int32(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int32(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int32(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - case *uint32: - switch pnType { - case C.PN_CHAR: - *v = uint32(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint32(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint32(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *int64: - switch pnType { - case C.PN_CHAR: - *v = int64(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int64(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int64(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int64(C.pn_data_get_int(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *uint64: - switch pnType { - case C.PN_CHAR: - *v = uint64(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint64(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint64(C.pn_data_get_ushort(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *int: - switch pnType { - case C.PN_CHAR: - *v = int(C.pn_data_get_char(data)) - case C.PN_BYTE: - *v = int(C.pn_data_get_byte(data)) - case C.PN_SHORT: - *v = int(C.pn_data_get_short(data)) - case C.PN_INT: - *v = int(C.pn_data_get_int(data)) - case C.PN_LONG: - if unsafe.Sizeof(0) == 8 { - *v = int(C.pn_data_get_long(data)) - } else { - panic(newBadUnmarshal(pnType, v)) - } - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *uint: - switch pnType { - case C.PN_CHAR: - *v = uint(C.pn_data_get_char(data)) - case C.PN_UBYTE: - *v = uint(C.pn_data_get_ubyte(data)) - case C.PN_USHORT: - *v = uint(C.pn_data_get_ushort(data)) - case C.PN_UINT: - *v = uint(C.pn_data_get_uint(data)) - case C.PN_ULONG: - if unsafe.Sizeof(0) == 8 { - *v = uint(C.pn_data_get_ulong(data)) - } else { - panic(newBadUnmarshal(pnType, v)) - } - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *float32: - switch pnType { - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *float64: - switch pnType { - case C.PN_FLOAT: - *v = float64(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *string: - switch pnType { - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goString(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goString(C.pn_data_get_binary(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *[]byte: - switch pnType { - case C.PN_STRING: - *v = goBytes(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = goBytes(C.pn_data_get_symbol(data)) - case C.PN_BINARY: - *v = goBytes(C.pn_data_get_binary(data)) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *Binary: - switch pnType { - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *Symbol: - switch pnType { - case C.PN_SYMBOL: - *v = Symbol(goBytes(C.pn_data_get_symbol(data))) - default: - panic(newBadUnmarshal(pnType, v)) - } - - case *interface{}: - getInterface(data, v) - - default: - if reflect.TypeOf(v).Kind() != reflect.Ptr { - panic(newBadUnmarshal(pnType, v)) - } - switch reflect.TypeOf(v).Elem().Kind() { - case reflect.Map: - getMap(data, v) - case reflect.Slice: - getList(data, v) - default: - panic(newBadUnmarshal(pnType, v)) - } - } - err := dataError("unmarshaling", data) - if err != nil { - panic(err) - } - return -} - -// Getting into an interface is driven completely by the AMQP type, since the interface{} -// target is type-neutral. -func getInterface(data *C.pn_data_t, v *interface{}) { - pnType := C.pn_data_type(data) - switch pnType { - case C.PN_NULL: - *v = nil - case C.PN_BOOL: - *v = bool(C.pn_data_get_bool(data)) - case C.PN_UBYTE: - *v = uint8(C.pn_data_get_ubyte(data)) - case C.PN_BYTE: - *v = int8(C.pn_data_get_byte(data)) - case C.PN_USHORT: - *v = uint16(C.pn_data_get_ushort(data)) - case C.PN_SHORT: - *v = int16(C.pn_data_get_short(data)) - case C.PN_UINT: - *v = uint32(C.pn_data_get_uint(data)) - case C.PN_INT: - *v = int32(C.pn_data_get_int(data)) - case C.PN_CHAR: - *v = uint8(C.pn_data_get_char(data)) - case C.PN_ULONG: - *v = uint64(C.pn_data_get_ulong(data)) - case C.PN_LONG: - *v = int64(C.pn_data_get_long(data)) - case C.PN_FLOAT: - *v = float32(C.pn_data_get_float(data)) - case C.PN_DOUBLE: - *v = float64(C.pn_data_get_double(data)) - case C.PN_BINARY: - *v = Binary(goBytes(C.pn_data_get_binary(data))) - case C.PN_STRING: - *v = goString(C.pn_data_get_string(data)) - case C.PN_SYMBOL: - *v = Symbol(goString(C.pn_data_get_symbol(data))) - case C.PN_MAP: - m := make(Map) - get(data, &m) - *v = m // FIXME aconway 2015-03-13: avoid the copy? - case C.PN_LIST: - l := make(List, 0) - get(data, &l) - *v = l // FIXME aconway 2015-03-13: avoid the copy? - default: - panic(newBadUnmarshal(pnType, v)) - } -} - -// get into map pointed at by v -func getMap(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - if pnType != C.PN_MAP { - panic(newBadUnmarshal(pnType, v)) - } - mapValue := reflect.ValueOf(v).Elem() - mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map - count := int(C.pn_data_get_map(data)) - if bool(C.pn_data_enter(data)) { - for i := 0; i < count/2; i++ { - if bool(C.pn_data_next(data)) { - key := reflect.New(mapValue.Type().Key()) - get(data, key.Interface()) - if bool(C.pn_data_next(data)) { - val := reflect.New(mapValue.Type().Elem()) - get(data, val.Interface()) - mapValue.SetMapIndex(key.Elem(), val.Elem()) - } - } - } - C.pn_data_exit(data) - } -} - -func getList(data *C.pn_data_t, v interface{}) { - pnType := C.pn_data_type(data) - if pnType != C.PN_LIST { - panic(newBadUnmarshal(pnType, v)) - } - count := int(C.pn_data_get_list(data)) - listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count) - if bool(C.pn_data_enter(data)) { - for i := 0; i < count; i++ { - if bool(C.pn_data_next(data)) { - val := reflect.New(listValue.Type().Elem()) - get(data, val.Interface()) - listValue.Index(i).Set(val.Elem()) - } - } - C.pn_data_exit(data) - } - // FIXME aconway 2015-04-09: avoid the copy? - reflect.ValueOf(v).Elem().Set(listValue) -} - -// decode from bytes. -// Return bytes decoded or 0 if we could not decode a complete object. -// -func decode(data *C.pn_data_t, bytes []byte) int { - if len(bytes) == 0 { - return 0 - } - n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes))) - if n == int(C.PN_UNDERFLOW) { - C.pn_error_clear(C.pn_data_error(data)) - return 0 - } else if n <= 0 { - panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n))) - } - return n -} diff --git a/proton-c/bindings/go/amqp/url.go b/proton-c/bindings/go/amqp/url.go deleted file mode 100644 index 58711c7..0000000 --- a/proton-c/bindings/go/amqp/url.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -/* -#include -#include -#include - -// Helper function for setting URL fields. -typedef void (*setter_fn)(pn_url_t* url, const char* value); -inline void set(pn_url_t *url, setter_fn s, const char* value) { - s(url, value); -} -*/ -import "C" - -import ( - "net" - "net/url" - "qpid.apache.org/proton/go/internal" - "unsafe" -) - -const ( - amqp string = "amqp" - amqps = "amqps" -) - -// ParseUrl parses an AMQP URL string and returns a net/url.Url. -// -// It is more forgiving than net/url.Parse and allows most of the parts of the -// URL to be missing, assuming AMQP defaults. -// -func ParseURL(s string) (u *url.URL, err error) { - cstr := C.CString(s) - defer C.free(unsafe.Pointer(cstr)) - pnUrl := C.pn_url_parse(cstr) - if pnUrl == nil { - return nil, internal.Errorf("bad URL %#v", s) - } - defer C.pn_url_free(pnUrl) - - scheme := C.GoString(C.pn_url_get_scheme(pnUrl)) - username := C.GoString(C.pn_url_get_username(pnUrl)) - password := C.GoString(C.pn_url_get_password(pnUrl)) - host := C.GoString(C.pn_url_get_host(pnUrl)) - port := C.GoString(C.pn_url_get_port(pnUrl)) - path := C.GoString(C.pn_url_get_path(pnUrl)) - - if err != nil { - return nil, internal.Errorf("bad URL %#v: %s", s, err) - } - if scheme == "" { - scheme = amqp - } - if port == "" { - if scheme == amqps { - port = amqps - } else { - port = amqp - } - } - var user *url.Userinfo - if password != "" { - user = url.UserPassword(username, password) - } else if username != "" { - user = url.User(username) - } - - u = &url.URL{ - Scheme: scheme, - User: user, - Host: net.JoinHostPort(host, port), - Path: path, - } - - return u, nil -} diff --git a/proton-c/bindings/go/amqp/url_test.go b/proton-c/bindings/go/amqp/url_test.go deleted file mode 100644 index f80f1c4..0000000 --- a/proton-c/bindings/go/amqp/url_test.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package amqp - -import ( - "fmt" -) - -func ExampleParseURL() { - for _, s := range []string{ - "amqp://username:password@host:1234/path", - "host:1234", - "host", - ":1234", - "host/path", - "amqps://host", - "", - } { - u, err := ParseURL(s) - if err != nil { - fmt.Println(err) - } else { - fmt.Println(u) - } - } - // Output: - // amqp://username:password@host:1234/path - // amqp://host:1234 - // amqp://host:amqp - // amqp://:1234 - // amqp://host:amqp/path - // amqps://host:amqps - // proton: bad URL "" -} diff --git a/proton-c/bindings/go/event/doc.go b/proton-c/bindings/go/event/doc.go deleted file mode 100644 index 7a9ec12..0000000 --- a/proton-c/bindings/go/event/doc.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package event provides a low-level API to the proton AMQP engine. - -For most tasks, consider instead package qpid.apache.org/proton/go/messaging. -It provides a higher-level, concurrent API that is easier to use. - -The API is event based. There are two alternative styles of handler. EventHandler -provides the core proton events. MessagingHandler provides a slighly simplified -view of the event stream and automates some common tasks. - -See type Pump documentation for more details of the interaction between proton -events and goroutines. -*/ -package event - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. diff --git a/proton-c/bindings/go/event/handlers.go b/proton-c/bindings/go/event/handlers.go deleted file mode 100644 index d76fac9..0000000 --- a/proton-c/bindings/go/event/handlers.go +++ /dev/null @@ -1,411 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include -// #include -import "C" - -import ( - "qpid.apache.org/proton/go/internal" -) - -// EventHandler handles core proton events. -type EventHandler interface { - // HandleEvent is called with an event. - // Typically HandleEvent() is implemented as a switch on e.Type() - HandleEvent(e Event) error -} - -// cHandler wraps a C pn_handler_t -type cHandler struct { - pn *C.pn_handler_t -} - -func (h cHandler) HandleEvent(e Event) error { - C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn)) - return nil // FIXME aconway 2015-03-31: error handling -} - -// MessagingHandler provides an alternative interface to EventHandler. -// it is easier to use for most applications that send and receive messages. -// -// Implement this interface and then wrap your value with a MessagingHandlerDelegator. -// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump. -// -type MessagingHandler interface { - HandleMessagingEvent(MessagingEventType, Event) error -} - -// MessagingEventType provides a set of events that are easier to work with than the -// core events defined by EventType -// -// There are 3 types of "endpoint": Connection, Session and Link. -// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error. -// The meaning of these events is as follows: -// -// Opening: The remote end opened, the local end will open automatically. -// -// Opened: Both ends are open, regardless of which end opened first. -// -// Closing: The remote end closed without error, the local end will close automatically. -// -// Error: The remote end closed with an error, the local end will close automatically. -// -// Closed: Both ends are closed, regardless of which end closed first or if there was an error. -// -type MessagingEventType int - -const ( - // The event loop starts. - MStart MessagingEventType = iota - // The peer closes the connection with an error condition. - MConnectionError - // The peer closes the session with an error condition. - MSessionError - // The peer closes the link with an error condition. - MLinkError - // The peer Initiates the opening of the connection. - MConnectionOpening - // The peer initiates the opening of the session. - MSessionOpening - // The peer initiates the opening of the link. - MLinkOpening - // The connection is opened. - MConnectionOpened - // The session is opened. - MSessionOpened - // The link is opened. - MLinkOpened - // The peer initiates the closing of the connection. - MConnectionClosing - // The peer initiates the closing of the session. - MSessionClosing - // The peer initiates the closing of the link. - MLinkClosing - // Both ends of the connection are closed. - MConnectionClosed - // Both ends of the session are closed. - MSessionClosed - // Both ends of the link are closed. - MLinkClosed - // The connection is disconnected. - MConnectionDisconnected - // The session's connection was disconnected - MSessionDisconnected - // The session's connection was disconnected - MLinkDisconnected - // The sender link has credit and messages can - // therefore be transferred. - MSendable - // The remote peer accepts an outgoing message. - MAccepted - // The remote peer rejects an outgoing message. - MRejected - // The peer releases an outgoing message. Note that this may be in response to - // either the RELEASE or MODIFIED state as defined by the AMQP specification. - MReleased - // The peer has settled the outgoing message. This is the point at which it - // shouod never be retransmitted. - MSettled - // A message is received. Call DecodeMessage() to decode as an amqp.Message. - // To manage the outcome of this messages (e.g. to accept or reject the message) - // use Event.Delivery(). - MMessage - // The event loop terminates, there are no more events to process. - MFinal -) - -func (t MessagingEventType) String() string { - switch t { - case MStart: - return "Start" - case MConnectionError: - return "ConnectionError" - case MSessionError: - return "SessionError" - case MLinkError: - return "LinkError" - case MConnectionOpening: - return "ConnectionOpening" - case MSessionOpening: - return "SessionOpening" - case MLinkOpening: - return "LinkOpening" - case MConnectionOpened: - return "ConnectionOpened" - case MSessionOpened: - return "SessionOpened" - case MLinkOpened: - return "LinkOpened" - case MConnectionClosing: - return "ConnectionClosing" - case MSessionClosing: - return "SessionClosing" - case MLinkClosing: - return "LinkClosing" - case MConnectionClosed: - return "ConnectionClosed" - case MSessionClosed: - return "SessionClosed" - case MLinkClosed: - return "LinkClosed" - case MConnectionDisconnected: - return "ConnectionDisconnected" - case MSessionDisconnected: - return "MSessionDisconnected" - case MLinkDisconnected: - return "MLinkDisconnected" - case MSendable: - return "Sendable" - case MAccepted: - return "Accepted" - case MRejected: - return "Rejected" - case MReleased: - return "Released" - case MSettled: - return "Settled" - case MMessage: - return "Message" - default: - return "Unknown" - } -} - -// ResourceHandler provides a simple way to track the creation and deletion of -// various proton objects. -// endpointDelegator captures common patterns for endpoints opening/closing -type endpointDelegator struct { - remoteOpen, remoteClose, localOpen, localClose EventType - opening, opened, closing, closed, error MessagingEventType - endpoint func(Event) Endpoint - delegate MessagingHandler -} - -// HandleEvent handles an open/close event for an endpoint in a generic way. -func (d endpointDelegator) HandleEvent(e Event) (err error) { - endpoint := d.endpoint(e) - state := endpoint.State() - - switch e.Type() { - - case d.localOpen: - if state.Is(SRemoteActive) { - err = d.delegate.HandleMessagingEvent(d.opened, e) - } - - case d.remoteOpen: - switch { - case state.Is(SLocalActive): - err = d.delegate.HandleMessagingEvent(d.opened, e) - case state.Is(SLocalUninit): - err = d.delegate.HandleMessagingEvent(d.opening, e) - if err == nil { - endpoint.Open() - } - } - - case d.remoteClose: - var err1 error - if endpoint.RemoteCondition().IsSet() { // Closed with error - err1 = d.delegate.HandleMessagingEvent(d.error, e) - if err1 == nil { // Don't overwrite an application error. - err1 = endpoint.RemoteCondition().Error() - } - } else { - err1 = d.delegate.HandleMessagingEvent(d.closing, e) - } - if state.Is(SLocalClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } else if state.Is(SLocalActive) { - endpoint.Close() - } - if err1 != nil { // Keep the first error. - err = err1 - } - - case d.localClose: - if state.Is(SRemoteClosed) { - err = d.delegate.HandleMessagingEvent(d.closed, e) - } - - default: - // We shouldn't be called with any other event type. - panic(internal.Errorf("internal error, not an open/close event: %s", e)) - } - - return err -} - -// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. -// You can modify the exported fields before you pass the MessagingDelegator to -// a Pump. -type MessagingDelegator struct { - delegate MessagingHandler - connection, session, link endpointDelegator - handshaker, flowcontroller EventHandler - - // AutoSettle (default true) automatically pre-settle outgoing messages. - AutoSettle bool - // AutoAccept (default true) automatically accept and settle incoming messages - // if they are not settled by the delegate. - AutoAccept bool - // Prefetch (default 10) initial credit to issue for incoming links. - Prefetch int - // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. - PeerCloseError bool -} - -func NewMessagingDelegator(h MessagingHandler) EventHandler { - return &MessagingDelegator{ - delegate: h, - connection: endpointDelegator{ - EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, - MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, - MConnectionError, - func(e Event) Endpoint { return e.Connection() }, - h, - }, - session: endpointDelegator{ - ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, - MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, - MSessionError, - func(e Event) Endpoint { return e.Session() }, - h, - }, - link: endpointDelegator{ - ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, - MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, - MLinkError, - func(e Event) Endpoint { return e.Link() }, - h, - }, - flowcontroller: nil, - AutoSettle: true, - AutoAccept: true, - Prefetch: 10, - PeerCloseError: false, - } -} - -func handleIf(h EventHandler, e Event) error { - if h != nil { - return h.HandleEvent(e) - } - return nil -} - -// Handle a proton event by passing the corresponding MessagingEvent(s) to -// the MessagingHandler. -func (d *MessagingDelegator) HandleEvent(e Event) error { - handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. - - switch e.Type() { - - case EConnectionInit: - d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} - d.delegate.HandleMessagingEvent(MStart, e) - - case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose: - return d.connection.HandleEvent(e) - - case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: - return d.session.HandleEvent(e) - - case ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: - return d.link.HandleEvent(e) - - case ELinkFlow: - if e.Link().IsSender() && e.Link().Credit() > 0 { - return d.delegate.HandleMessagingEvent(MSendable, e) - } - - case EDelivery: - if e.Delivery().Link().IsReceiver() { - d.incoming(e) - } else { - d.outgoing(e) - } - - case ETransportTailClosed: - c := e.Connection() - for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) { - e2 := e - e2.link = l - e2.session = l.Session() - d.delegate.HandleMessagingEvent(MLinkDisconnected, e2) - } - for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) { - e2 := e - e2.session = s - d.delegate.HandleMessagingEvent(MSessionDisconnected, e2) - } - d.delegate.HandleMessagingEvent(MConnectionDisconnected, e) - d.delegate.HandleMessagingEvent(MFinal, e) - } - return nil -} - -func (d *MessagingDelegator) incoming(e Event) (err error) { - delivery := e.Delivery() - if delivery.Readable() && !delivery.Partial() { - if e.Link().State().Is(SLocalClosed) { - e.Link().Advance() - if d.AutoAccept { - delivery.Release(false) - } - } else { - err = d.delegate.HandleMessagingEvent(MMessage, e) - e.Link().Advance() - if d.AutoAccept && !delivery.Settled() { - if err == nil { - delivery.Accept() - } else { - delivery.Reject() - } - } - } - } else if delivery.Updated() && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - return -} - -func (d *MessagingDelegator) outgoing(e Event) (err error) { - delivery := e.Delivery() - if delivery.Updated() { - switch delivery.Remote().Type() { - case Accepted: - err = d.delegate.HandleMessagingEvent(MAccepted, e) - case Rejected: - err = d.delegate.HandleMessagingEvent(MRejected, e) - case Released, Modified: - err = d.delegate.HandleMessagingEvent(MReleased, e) - } - if err == nil && delivery.Settled() { - err = d.delegate.HandleMessagingEvent(MSettled, e) - } - if err == nil && d.AutoSettle { - delivery.Settle() - } - } - return -} diff --git a/proton-c/bindings/go/event/message.go b/proton-c/bindings/go/event/message.go deleted file mode 100644 index d900744..0000000 --- a/proton-c/bindings/go/event/message.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event - -// #include -// #include -// #include -import "C" - -import ( - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/internal" -) - -// DecodeMessage decodes the message containined in a delivery event. -func DecodeMessage(e Event) (m amqp.Message, err error) { - defer internal.DoRecover(&err) - delivery := e.Delivery() - if !delivery.Readable() || delivery.Partial() { - return nil, internal.Errorf("attempting to get incomplete message") - } - data := make([]byte, delivery.Pending()) - result := delivery.Link().Recv(data) - if result != len(data) { - return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) - } - return amqp.DecodeMessage(data) -} - -// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. -var tags amqp.UidCounter - -// Send sends a amqp.Message over a Link. -// Returns a Delivery that can be use to determine the outcome of the message. -func (link Link) Send(m amqp.Message) (Delivery, error) { - if !link.IsSender() { - return Delivery{}, internal.Errorf("attempt to send message on receiving link") - } - // FIXME aconway 2015-04-08: buffering, error handling - delivery := link.Delivery(tags.Next()) - bytes, err := m.Encode(nil) - if err != nil { - return Delivery{}, internal.Errorf("cannot send mesage %s", err) - } - result := link.SendBytes(bytes) - link.Advance() - if result != len(bytes) { - if result < 0 { - return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) - } else { - return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) - } - } - if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names - delivery.Settle() - } - return delivery, nil -} diff --git a/proton-c/bindings/go/event/pump.go b/proton-c/bindings/go/event/pump.go deleted file mode 100644 index db022ff..0000000 --- a/proton-c/bindings/go/event/pump.go +++ /dev/null @@ -1,357 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org