Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 974CD17540 for ; Wed, 13 May 2015 19:18:27 +0000 (UTC) Received: (qmail 8405 invoked by uid 500); 13 May 2015 19:18:27 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 8324 invoked by uid 500); 13 May 2015 19:18:27 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 8226 invoked by uid 99); 13 May 2015 19:18:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 May 2015 19:18:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C6D6E30F6; Wed, 13 May 2015 19:18:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Wed, 13 May 2015 19:18:29 -0000 Message-Id: In-Reply-To: <6e15a64b2e3d465f8b8166a9b3698ea7@git.apache.org> References: <6e15a64b2e3d465f8b8166a9b3698ea7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] qpid-proton git commit: WIP Re org for `go get` and proper use of go workspace. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/event/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/event/handlers.go b/proton-c/bindings/go/event/handlers.go new file mode 100644 index 0000000..1835ff0 --- /dev/null +++ b/proton-c/bindings/go/event/handlers.go @@ -0,0 +1,411 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package event + +// #include +// #include +import "C" + +import ( + "qpid.apache.org/go/internal" +) + +// EventHandler handles core proton events. +type EventHandler interface { + // HandleEvent is called with an event. + // Typically HandleEvent() is implemented as a switch on e.Type() + HandleEvent(e Event) error +} + +// cHandler wraps a C pn_handler_t +type cHandler struct { + pn *C.pn_handler_t +} + +func (h cHandler) HandleEvent(e Event) error { + C.pn_handler_dispatch(h.pn, e.pn, C.pn_event_type(e.pn)) + return nil // FIXME aconway 2015-03-31: error handling +} + +// MessagingHandler provides an alternative interface to EventHandler. +// it is easier to use for most applications that send and receive messages. +// +// Implement this interface and then wrap your value with a MessagingHandlerDelegator. +// MessagingHandlerDelegator implements EventHandler and can be registered with a Pump. +// +type MessagingHandler interface { + HandleMessagingEvent(MessagingEventType, Event) error +} + +// MessagingEventType provides a set of events that are easier to work with than the +// core events defined by EventType +// +// There are 3 types of "endpoint": Connection, Session and Link. +// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed and Error. +// The meaning of these events is as follows: +// +// Opening: The remote end opened, the local end will open automatically. +// +// Opened: Both ends are open, regardless of which end opened first. +// +// Closing: The remote end closed without error, the local end will close automatically. +// +// Error: The remote end closed with an error, the local end will close automatically. +// +// Closed: Both ends are closed, regardless of which end closed first or if there was an error. +// +type MessagingEventType int + +const ( + // The event loop starts. + MStart MessagingEventType = iota + // The peer closes the connection with an error condition. + MConnectionError + // The peer closes the session with an error condition. + MSessionError + // The peer closes the link with an error condition. + MLinkError + // The peer Initiates the opening of the connection. + MConnectionOpening + // The peer initiates the opening of the session. + MSessionOpening + // The peer initiates the opening of the link. + MLinkOpening + // The connection is opened. + MConnectionOpened + // The session is opened. + MSessionOpened + // The link is opened. + MLinkOpened + // The peer initiates the closing of the connection. + MConnectionClosing + // The peer initiates the closing of the session. + MSessionClosing + // The peer initiates the closing of the link. + MLinkClosing + // Both ends of the connection are closed. + MConnectionClosed + // Both ends of the session are closed. + MSessionClosed + // Both ends of the link are closed. + MLinkClosed + // The connection is disconnected. + MConnectionDisconnected + // The session's connection was disconnected + MSessionDisconnected + // The session's connection was disconnected + MLinkDisconnected + // The sender link has credit and messages can + // therefore be transferred. + MSendable + // The remote peer accepts an outgoing message. + MAccepted + // The remote peer rejects an outgoing message. + MRejected + // The peer releases an outgoing message. Note that this may be in response to + // either the RELEASE or MODIFIED state as defined by the AMQP specification. + MReleased + // The peer has settled the outgoing message. This is the point at which it + // shouod never be retransmitted. + MSettled + // A message is received. Call DecodeMessage() to decode as an amqp.Message. + // To manage the outcome of this messages (e.g. to accept or reject the message) + // use Event.Delivery(). + MMessage + // The event loop terminates, there are no more events to process. + MFinal +) + +func (t MessagingEventType) String() string { + switch t { + case MStart: + return "Start" + case MConnectionError: + return "ConnectionError" + case MSessionError: + return "SessionError" + case MLinkError: + return "LinkError" + case MConnectionOpening: + return "ConnectionOpening" + case MSessionOpening: + return "SessionOpening" + case MLinkOpening: + return "LinkOpening" + case MConnectionOpened: + return "ConnectionOpened" + case MSessionOpened: + return "SessionOpened" + case MLinkOpened: + return "LinkOpened" + case MConnectionClosing: + return "ConnectionClosing" + case MSessionClosing: + return "SessionClosing" + case MLinkClosing: + return "LinkClosing" + case MConnectionClosed: + return "ConnectionClosed" + case MSessionClosed: + return "SessionClosed" + case MLinkClosed: + return "LinkClosed" + case MConnectionDisconnected: + return "ConnectionDisconnected" + case MSessionDisconnected: + return "MSessionDisconnected" + case MLinkDisconnected: + return "MLinkDisconnected" + case MSendable: + return "Sendable" + case MAccepted: + return "Accepted" + case MRejected: + return "Rejected" + case MReleased: + return "Released" + case MSettled: + return "Settled" + case MMessage: + return "Message" + default: + return "Unknown" + } +} + +// ResourceHandler provides a simple way to track the creation and deletion of +// various proton objects. +// endpointDelegator captures common patterns for endpoints opening/closing +type endpointDelegator struct { + remoteOpen, remoteClose, localOpen, localClose EventType + opening, opened, closing, closed, error MessagingEventType + endpoint func(Event) Endpoint + delegate MessagingHandler +} + +// HandleEvent handles an open/close event for an endpoint in a generic way. +func (d endpointDelegator) HandleEvent(e Event) (err error) { + endpoint := d.endpoint(e) + state := endpoint.State() + + switch e.Type() { + + case d.localOpen: + if state.Is(SRemoteActive) { + err = d.delegate.HandleMessagingEvent(d.opened, e) + } + + case d.remoteOpen: + switch { + case state.Is(SLocalActive): + err = d.delegate.HandleMessagingEvent(d.opened, e) + case state.Is(SLocalUninit): + err = d.delegate.HandleMessagingEvent(d.opening, e) + if err == nil { + endpoint.Open() + } + } + + case d.remoteClose: + var err1 error + if endpoint.RemoteCondition().IsSet() { // Closed with error + err1 = d.delegate.HandleMessagingEvent(d.error, e) + if err1 == nil { // Don't overwrite an application error. + err1 = endpoint.RemoteCondition().Error() + } + } else { + err1 = d.delegate.HandleMessagingEvent(d.closing, e) + } + if state.Is(SLocalClosed) { + err = d.delegate.HandleMessagingEvent(d.closed, e) + } else if state.Is(SLocalActive) { + endpoint.Close() + } + if err1 != nil { // Keep the first error. + err = err1 + } + + case d.localClose: + if state.Is(SRemoteClosed) { + err = d.delegate.HandleMessagingEvent(d.closed, e) + } + + default: + // We shouldn't be called with any other event type. + panic(internal.Errorf("internal error, not an open/close event: %s", e)) + } + + return err +} + +// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler. +// You can modify the exported fields before you pass the MessagingDelegator to +// a Pump. +type MessagingDelegator struct { + delegate MessagingHandler + connection, session, link endpointDelegator + handshaker, flowcontroller EventHandler + + // AutoSettle (default true) automatically pre-settle outgoing messages. + AutoSettle bool + // AutoAccept (default true) automatically accept and settle incoming messages + // if they are not settled by the delegate. + AutoAccept bool + // Prefetch (default 10) initial credit to issue for incoming links. + Prefetch int + // PeerCloseIsError (default false) if true a close by the peer will be treated as an error. + PeerCloseError bool +} + +func NewMessagingDelegator(h MessagingHandler) EventHandler { + return &MessagingDelegator{ + delegate: h, + connection: endpointDelegator{ + EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose, + MConnectionOpening, MConnectionOpened, MConnectionClosing, MConnectionClosed, + MConnectionError, + func(e Event) Endpoint { return e.Connection() }, + h, + }, + session: endpointDelegator{ + ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose, + MSessionOpening, MSessionOpened, MSessionClosing, MSessionClosed, + MSessionError, + func(e Event) Endpoint { return e.Session() }, + h, + }, + link: endpointDelegator{ + ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose, + MLinkOpening, MLinkOpened, MLinkClosing, MLinkClosed, + MLinkError, + func(e Event) Endpoint { return e.Link() }, + h, + }, + flowcontroller: nil, + AutoSettle: true, + AutoAccept: true, + Prefetch: 10, + PeerCloseError: false, + } +} + +func handleIf(h EventHandler, e Event) error { + if h != nil { + return h.HandleEvent(e) + } + return nil +} + +// Handle a proton event by passing the corresponding MessagingEvent(s) to +// the MessagingHandler. +func (d *MessagingDelegator) HandleEvent(e Event) error { + handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. + + switch e.Type() { + + case EConnectionInit: + d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} + d.delegate.HandleMessagingEvent(MStart, e) + + case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose: + return d.connection.HandleEvent(e) + + case ESessionRemoteOpen, ESessionRemoteClose, ESessionLocalOpen, ESessionLocalClose: + return d.session.HandleEvent(e) + + case ELinkRemoteOpen, ELinkRemoteClose, ELinkLocalOpen, ELinkLocalClose: + return d.link.HandleEvent(e) + + case ELinkFlow: + if e.Link().IsSender() && e.Link().Credit() > 0 { + return d.delegate.HandleMessagingEvent(MSendable, e) + } + + case EDelivery: + if e.Delivery().Link().IsReceiver() { + d.incoming(e) + } else { + d.outgoing(e) + } + + case ETransportTailClosed: + c := e.Connection() + for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = l.Next(SRemoteActive) { + e2 := e + e2.link = l + e2.session = l.Session() + d.delegate.HandleMessagingEvent(MLinkDisconnected, e2) + } + for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = s.Next(SRemoteActive) { + e2 := e + e2.session = s + d.delegate.HandleMessagingEvent(MSessionDisconnected, e2) + } + d.delegate.HandleMessagingEvent(MConnectionDisconnected, e) + d.delegate.HandleMessagingEvent(MFinal, e) + } + return nil +} + +func (d *MessagingDelegator) incoming(e Event) (err error) { + delivery := e.Delivery() + if delivery.Readable() && !delivery.Partial() { + if e.Link().State().Is(SLocalClosed) { + e.Link().Advance() + if d.AutoAccept { + delivery.Release(false) + } + } else { + err = d.delegate.HandleMessagingEvent(MMessage, e) + e.Link().Advance() + if d.AutoAccept && !delivery.Settled() { + if err == nil { + delivery.Accept() + } else { + delivery.Reject() + } + } + } + } else if delivery.Updated() && delivery.Settled() { + err = d.delegate.HandleMessagingEvent(MSettled, e) + } + return +} + +func (d *MessagingDelegator) outgoing(e Event) (err error) { + delivery := e.Delivery() + if delivery.Updated() { + switch delivery.Remote().Type() { + case Accepted: + err = d.delegate.HandleMessagingEvent(MAccepted, e) + case Rejected: + err = d.delegate.HandleMessagingEvent(MRejected, e) + case Released, Modified: + err = d.delegate.HandleMessagingEvent(MReleased, e) + } + if err == nil && delivery.Settled() { + err = d.delegate.HandleMessagingEvent(MSettled, e) + } + if err == nil && d.AutoSettle { + delivery.Settle() + } + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/event/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/event/message.go b/proton-c/bindings/go/event/message.go new file mode 100644 index 0000000..1475965 --- /dev/null +++ b/proton-c/bindings/go/event/message.go @@ -0,0 +1,75 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package event + +// #include +// #include +// #include +import "C" + +import ( + "qpid.apache.org/go/amqp" + "qpid.apache.org/go/internal" +) + +// DecodeMessage decodes the message containined in a delivery event. +func DecodeMessage(e Event) (m amqp.Message, err error) { + defer internal.DoRecover(&err) + delivery := e.Delivery() + if !delivery.Readable() || delivery.Partial() { + return nil, internal.Errorf("attempting to get incomplete message") + } + data := make([]byte, delivery.Pending()) + result := delivery.Link().Recv(data) + if result != len(data) { + return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) + } + return amqp.DecodeMessage(data) +} + +// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. +var tags amqp.UidCounter + +// Send sends a amqp.Message over a Link. +// Returns a Delivery that can be use to determine the outcome of the message. +func (link Link) Send(m amqp.Message) (Delivery, error) { + if !link.IsSender() { + return Delivery{}, internal.Errorf("attempt to send message on receiving link") + } + // FIXME aconway 2015-04-08: buffering, error handling + delivery := link.Delivery(tags.Next()) + bytes, err := m.Encode(nil) + if err != nil { + return Delivery{}, internal.Errorf("cannot send mesage %s", err) + } + result := link.SendBytes(bytes) + link.Advance() + if result != len(bytes) { + if result < 0 { + return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) + } else { + return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) + } + } + if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names + delivery.Settle() + } + return delivery, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/event/pump.go b/proton-c/bindings/go/event/pump.go new file mode 100644 index 0000000..4f9fcb8 --- /dev/null +++ b/proton-c/bindings/go/event/pump.go @@ -0,0 +1,357 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package event + +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// +// PN_HANDLE(REMOTE_ADDR) +import "C" + +import ( + "fmt" + "io" + "net" + "qpid.apache.org/go/internal" + "sync" + "unsafe" +) + +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} + +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if +// the pum stops. Make this a function that selects on running. + +// FIXME aconway 2015-05-05: for consistency should Pump be called Driver? + +/* +Pump reads from a net.Conn, decodes AMQP events and calls the appropriate +Handler functions. Actions taken by Handler functions (such as sending messages) +are encoded and written to the net.Conn. + +The proton protocol engine is single threaded (per connection). The Pump runs +proton in the goroutine that calls Pump.Run() and creates goroutines to feed +data to/from a net.Conn. You can create multiple Pumps to handle multiple +connections concurrently. + +Methods in this package can only be called in the goroutine that executes the +corresponding Pump.Run(). You implement the EventHandler or MessagingHandler +interfaces and provide those values to NewPump(). Their HandleEvent method will be +called in the Pump goroutine, in typical event-driven style. + +Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +other goroutines, store them, or use them as map indexes. Effectively they are +just C pointers. Other goroutines cannot call their methods directly but they +can can create function closures that call their methods and send those closures +to the Pump.Inject channel. They will execute safely in the pump +goroutine. Injected functions, or your handlers, can set up channels to get +results back to other goroutines. + +You are responsible for ensuring you don't use an event value after the C object +has been deleted. The handler methods will tell you when a value is no longer +valid. For example after a MethodHandler handles a LinkClosed event, that link +is no longer valid. If you do Link.Close() yourself (in a handler or injected +function) the link remains valid until the corresponing LinkClosed event is +received by the handler. + +Pump.Close() will take care of cleaning up any remaining values and types when +you are done with the Pump. All values associated with a pump become invalid +when you call Pump.Close() + +The qpid.apache.org/go/messaging package will do all this for you, so unless +you are doing something fairly low-level it is probably a better choice. + +*/ +type Pump struct { + // Error is set on exit from Run() if there was an error. + Error error + // Channel to inject functions to be executed in the Pump's proton event loop. + Inject chan func() + + conn net.Conn + transport *C.pn_transport_t + connection *C.pn_connection_t + collector *C.pn_collector_t + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. +} + +const bufferSize = 4096 + +var pumps map[*C.pn_connection_t]*Pump + +func init() { + pumps = make(map[*C.pn_connection_t]*Pump) +} + +// NewPump initializes a pump with a connection and handlers. To start it running: +// p := NewPump(...) +// go run p.Run() +// The goroutine will exit when the pump is closed or disconnected. +// You can check for errors on Pump.Error. +// +func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) { + // Save the connection ID for Connection.String() + p := &Pump{ + Inject: make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack + conn: conn, + transport: C.pn_transport(), + connection: C.pn_connection(), + collector: C.pn_collector(), + handlers: handlers, + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), + } + if p.transport == nil || p.connection == nil || p.collector == nil { + return nil, internal.Errorf("failed to allocate pump") + } + pnErr := int(C.pn_transport_bind(p.transport, p.connection)) + if pnErr != 0 { + return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr)) + } + C.pn_connection_collect(p.connection, p.collector) + C.pn_connection_open(p.connection) + pumps[p.connection] = p + return p, nil +} + +func (p *Pump) String() string { + return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr()) +} + +func (p *Pump) Id() string { + return fmt.Sprintf("%p", &p) +} + +// setError sets error only if not already set +func (p *Pump) setError(e error) { + if p.Error == nil { + p.Error = e + } +} + +// Server puts the Pump in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (p *Pump) Server() { + C.pn_transport_set_server(p.transport) +} + +func (p *Pump) free() { + if p.connection != nil { + C.pn_connection_free(p.connection) + } + if p.transport != nil { + C.pn_transport_free(p.transport) + } + if p.collector != nil { + C.pn_collector_free(p.collector) + } + for _, h := range p.handlers { + switch h := h.(type) { + case cHandler: + C.pn_handler_free(h.pn) + } + } +} + +// Close closes the AMQP connection, the net.Conn, and stops associated goroutines. +// It will cause Run() to return. Run() may return earlier if the network disconnects +// but you must still call Close() to clean everything up. +// +// Methods on values associated with the pump (Connections, Sessions, Links) will panic +// if called after Close() +// +func (p *Pump) Close() error { + // If the pump is still running, inject a close. Either way wait for it to finish. + select { + case p.Inject <- func() { C.pn_connection_close(p.connection) }: + <-p.running // Wait to finish + case <-p.running: // Wait for goroutines to finish + } + delete(pumps, p.connection) + p.free() + return p.Error +} + +// Run the pump. Normally called in a goroutine as: go pump.Run() +// An error dunring Run is stored on p.Error. +// +func (p *Pump) Run() { + // Signal errors from the read/write goroutines. Don't block if we don't + // read all the errors, we only care about the first. + error := make(chan error, 2) + // FIXME aconway 2015-05-04: stop := make(chan struct{}) // Closed to signal that read/write should stop. + + wait := sync.WaitGroup{} + wait.Add(2) + + go func() { // Read goroutine + defer wait.Done() + for { + rbuf := p.read.buffer() + n, err := p.conn.Read(rbuf) + if n > 0 { + p.read.buffers <- rbuf[:n] + } else if err != nil { + close(p.read.buffers) + error <- err + return + } + } + }() + + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-p.write.buffers + if !ok { + return + } + _, err := p.conn.Write(wbuf) + if err != nil { + error <- err + return + } + } + }() + + wbuf := p.write.buffer()[:0] +loop: + for { + if len(wbuf) == 0 { + p.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = p.write.buffers + } + + select { + case buf := <-p.read.buffers: // Read a buffer + p.push(buf) + case wchan <- wbuf: // Write a buffer + wbuf = p.write.buffer()[:0] + case f := <-p.Inject: // Function injected from another goroutine + f() + case err := <-error: // Read or write error + p.setError(err) + C.pn_transport_close_tail(p.transport) + C.pn_transport_close_head(p.transport) + } + if err := p.process(); err != nil { + p.setError(err) + break loop + } + } + close(p.write.buffers) + p.conn.Close() + wait.Wait() + close(p.running) // Signal goroutines have exited and Error is set. +} + +func minInt(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (p *Pump) pop(buf *[]byte) { + pending := int(C.pn_transport_pending(p.transport)) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: + panic(internal.Errorf("%s", internal.PnErrorCode(pending))) + } + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] + if size == 0 { + return + } + C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size)) + C.pn_transport_pop(p.transport, C.size_t(size)) +} + +func (p *Pump) push(buf []byte) { + buf2 := buf + for len(buf2) > 0 { + n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2)))) + if n <= 0 { + panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) + } + buf2 = buf2[n:] + } +} + +func (p *Pump) handle(e Event) error { + for _, h := range p.handlers { + if err := h.HandleEvent(e); err != nil { + return err + } + } + if e.Type() == ETransportClosed { + return io.EOF + } + return nil +} + +func (p *Pump) process() error { + // FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump + for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { + e := makeEvent(ce) + if err := p.handle(e); err != nil { + return err + } + C.pn_collector_pop(p.collector) + } + return nil +} + +// Connectoin gets the Pump's connection value. +func (p *Pump) Connection() Connection { return Connection{p.connection} } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/event/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/event/wrappers.go b/proton-c/bindings/go/event/wrappers.go new file mode 100644 index 0000000..93a6487 --- /dev/null +++ b/proton-c/bindings/go/event/wrappers.go @@ -0,0 +1,253 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package event + +//#include +//#include +//#include +//#include +//#include +//#include +//#include +//#include +//#include +//#include +import "C" + +import ( + "fmt" + "qpid.apache.org/go/internal" + "unsafe" +) + +// FIXME aconway 2015-05-05: Documentation for generated types. + +// Event is an AMQP protocol event. +type Event struct { + pn *C.pn_event_t + eventType EventType + connection Connection + session Session + link Link + delivery Delivery +} + +func makeEvent(pn *C.pn_event_t) Event { + return Event{ + pn: pn, + eventType: EventType(C.pn_event_type(pn)), + connection: Connection{C.pn_event_connection(pn)}, + session: Session{C.pn_event_session(pn)}, + link: Link{C.pn_event_link(pn)}, + delivery: Delivery{C.pn_event_delivery(pn)}, + } +} +func (e Event) IsNil() bool { return e.eventType == EventType(0) } +func (e Event) Type() EventType { return e.eventType } +func (e Event) Connection() Connection { return e.connection } +func (e Event) Session() Session { return e.session } +func (e Event) Link() Link { return e.link } +func (e Event) Delivery() Delivery { return e.delivery } +func (e Event) String() string { return e.Type().String() } + +// Data holds a pointer to decoded AMQP data. +// Use amqp.marshal/unmarshal to access it as Go data types. +// +type Data struct{ pn *C.pn_data_t } + +func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} } + +func (d Data) Free() { C.pn_data_free(d.pn) } +func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) } +func (d Data) Clear() { C.pn_data_clear(d.pn) } +func (d Data) Rewind() { C.pn_data_rewind(d.pn) } +func (d Data) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn))) +} + +// State holds the state flags for an AMQP endpoint. +type State byte + +const ( + SLocalUninit State = C.PN_LOCAL_UNINIT + SLocalActive = C.PN_LOCAL_ACTIVE + SLocalClosed = C.PN_LOCAL_CLOSED + SRemoteUninit = C.PN_REMOTE_UNINIT + SRemoteActive = C.PN_REMOTE_ACTIVE + SRemoteClosed = C.PN_REMOTE_CLOSED +) + +// Is is True if bits & state is non 0. +func (s State) Is(bits State) bool { return s&bits != 0 } + +// Return a State containig just the local flags +func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } + +// Return a State containig just the remote flags +func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } + +// Endpoint is the common interface for Connection, Link and Session. +type Endpoint interface { + // State is the open/closed state. + State() State + // Open an endpoint. + Open() + // Close an endpoint. + Close() + // Condition holds a local error condition. + Condition() Condition + // RemoteCondition holds a remote error condition. + RemoteCondition() Condition +} + +const ( + Received uint64 = C.PN_RECEIVED + Accepted = C.PN_ACCEPTED + Rejected = C.PN_REJECTED + Released = C.PN_RELEASED + Modified = C.PN_MODIFIED +) + +// SettleAs is equivalent to d.Update(disposition); d.Settle() +// It is a no-op if e does not have a delivery. +func (d Delivery) SettleAs(disposition uint64) { + d.Update(disposition) + d.Settle() +} + +// Accept accepts and settles a delivery. +func (d Delivery) Accept() { d.SettleAs(Accepted) } + +// Reject rejects and settles a delivery +func (d Delivery) Reject() { d.SettleAs(Rejected) } + +// Release releases and settles a delivery +// If delivered is true the delivery count for the message will be increased. +func (d Delivery) Release(delivered bool) { + if delivered { + d.SettleAs(Modified) + } else { + d.SettleAs(Released) + } +} + +// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String? + +type DeliveryTag struct{ pn C.pn_delivery_tag_t } + +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } + +func (l Link) Recv(buf []byte) int { + if len(buf) == 0 { + return 0 + } + return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) +} + +func (l Link) SendBytes(bytes []byte) int { + return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) +} + +func pnTag(tag string) C.pn_delivery_tag_t { + bytes := []byte(tag) + return C.pn_dtag(cPtr(bytes), cLen(bytes)) +} + +func (l Link) Delivery(tag string) Delivery { + return Delivery{C.pn_delivery(l.pn, pnTag(tag))} +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} + +func (s Session) Sender(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_sender(s.pn, cname)} +} + +func (s Session) Receiver(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_receiver(s.pn, cname)} +} + +func joinId(a, b interface{}) string { + return fmt.Sprintf("%s/%s", a, b) +} + +// Pump associated with this connection. +func (c Connection) Pump() *Pump { return pumps[c.pn] } + +// Unique (per process) string identifier for a connection, useful for debugging. +func (c Connection) String() string { return pumps[c.pn].String() } + +// Head functions don't follow the normal naming conventions so missed by the generator. + +func (c Connection) LinkHead(s State) Link { + return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) SessionHead(s State) Session { + return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} +} + +// Unique (per process) string identifier for a session, including connection identifier. +func (s Session) String() string { + return joinId(s.Connection(), fmt.Sprintf("%p", s.pn)) +} + +// Unique (per process) string identifier for a link, inlcuding session identifier. +func (l Link) String() string { + return joinId(l.Session(), l.Name()) +} + +// Error returns an error interface corresponding to Condition. +func (c Condition) Error() error { + if c.IsNil() { + return nil + } else { + return fmt.Errorf("%s: %s", c.Name(), c.Description()) + } +} + +// SetIfUnset sets name and description on a condition if it is not already set. +func (c Condition) SetIfUnset(name, description string) { + if !c.IsSet() { + c.SetName(name) + c.SetDescription(description) + } +} + +func (c Connection) Session() (Session, error) { + s := Session{C.pn_session(c.pn)} + if s.IsNil() { + return s, Connection(c).Error() + } + return s, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/event/wrappers_gen.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/event/wrappers_gen.go b/proton-c/bindings/go/event/wrappers_gen.go new file mode 100644 index 0000000..766fa78 --- /dev/null +++ b/proton-c/bindings/go/event/wrappers_gen.go @@ -0,0 +1,732 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// +// NOTE: This file was generated by genwrap.go, do not edit it by hand. +// + +package event + +import ( + "qpid.apache.org/go/internal" + "time" + "unsafe" +) + +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// #include +// #include +import "C" + +type EventType int + +const ( + EConnectionInit EventType = C.PN_CONNECTION_INIT + EConnectionBound EventType = C.PN_CONNECTION_BOUND + EConnectionUnbound EventType = C.PN_CONNECTION_UNBOUND + EConnectionLocalOpen EventType = C.PN_CONNECTION_LOCAL_OPEN + EConnectionRemoteOpen EventType = C.PN_CONNECTION_REMOTE_OPEN + EConnectionLocalClose EventType = C.PN_CONNECTION_LOCAL_CLOSE + EConnectionRemoteClose EventType = C.PN_CONNECTION_REMOTE_CLOSE + EConnectionFinal EventType = C.PN_CONNECTION_FINAL + ESessionInit EventType = C.PN_SESSION_INIT + ESessionLocalOpen EventType = C.PN_SESSION_LOCAL_OPEN + ESessionRemoteOpen EventType = C.PN_SESSION_REMOTE_OPEN + ESessionLocalClose EventType = C.PN_SESSION_LOCAL_CLOSE + ESessionRemoteClose EventType = C.PN_SESSION_REMOTE_CLOSE + ESessionFinal EventType = C.PN_SESSION_FINAL + ELinkInit EventType = C.PN_LINK_INIT + ELinkLocalOpen EventType = C.PN_LINK_LOCAL_OPEN + ELinkRemoteOpen EventType = C.PN_LINK_REMOTE_OPEN + ELinkLocalClose EventType = C.PN_LINK_LOCAL_CLOSE + ELinkRemoteClose EventType = C.PN_LINK_REMOTE_CLOSE + ELinkLocalDetach EventType = C.PN_LINK_LOCAL_DETACH + ELinkRemoteDetach EventType = C.PN_LINK_REMOTE_DETACH + ELinkFlow EventType = C.PN_LINK_FLOW + ELinkFinal EventType = C.PN_LINK_FINAL + EDelivery EventType = C.PN_DELIVERY + ETransport EventType = C.PN_TRANSPORT + ETransportAuthenticated EventType = C.PN_TRANSPORT_AUTHENTICATED + ETransportError EventType = C.PN_TRANSPORT_ERROR + ETransportHeadClosed EventType = C.PN_TRANSPORT_HEAD_CLOSED + ETransportTailClosed EventType = C.PN_TRANSPORT_TAIL_CLOSED + ETransportClosed EventType = C.PN_TRANSPORT_CLOSED +) + +func (e EventType) String() string { + switch e { + + case C.PN_CONNECTION_INIT: + return "ConnectionInit" + case C.PN_CONNECTION_BOUND: + return "ConnectionBound" + case C.PN_CONNECTION_UNBOUND: + return "ConnectionUnbound" + case C.PN_CONNECTION_LOCAL_OPEN: + return "ConnectionLocalOpen" + case C.PN_CONNECTION_REMOTE_OPEN: + return "ConnectionRemoteOpen" + case C.PN_CONNECTION_LOCAL_CLOSE: + return "ConnectionLocalClose" + case C.PN_CONNECTION_REMOTE_CLOSE: + return "ConnectionRemoteClose" + case C.PN_CONNECTION_FINAL: + return "ConnectionFinal" + case C.PN_SESSION_INIT: + return "SessionInit" + case C.PN_SESSION_LOCAL_OPEN: + return "SessionLocalOpen" + case C.PN_SESSION_REMOTE_OPEN: + return "SessionRemoteOpen" + case C.PN_SESSION_LOCAL_CLOSE: + return "SessionLocalClose" + case C.PN_SESSION_REMOTE_CLOSE: + return "SessionRemoteClose" + case C.PN_SESSION_FINAL: + return "SessionFinal" + case C.PN_LINK_INIT: + return "LinkInit" + case C.PN_LINK_LOCAL_OPEN: + return "LinkLocalOpen" + case C.PN_LINK_REMOTE_OPEN: + return "LinkRemoteOpen" + case C.PN_LINK_LOCAL_CLOSE: + return "LinkLocalClose" + case C.PN_LINK_REMOTE_CLOSE: + return "LinkRemoteClose" + case C.PN_LINK_LOCAL_DETACH: + return "LinkLocalDetach" + case C.PN_LINK_REMOTE_DETACH: + return "LinkRemoteDetach" + case C.PN_LINK_FLOW: + return "LinkFlow" + case C.PN_LINK_FINAL: + return "LinkFinal" + case C.PN_DELIVERY: + return "Delivery" + case C.PN_TRANSPORT: + return "Transport" + case C.PN_TRANSPORT_AUTHENTICATED: + return "TransportAuthenticated" + case C.PN_TRANSPORT_ERROR: + return "TransportError" + case C.PN_TRANSPORT_HEAD_CLOSED: + return "TransportHeadClosed" + case C.PN_TRANSPORT_TAIL_CLOSED: + return "TransportTailClosed" + case C.PN_TRANSPORT_CLOSED: + return "TransportClosed" + } + return "Unknown" +} + +// Wrappers for declarations in session.h + +type Session struct{ pn *C.pn_session_t } + +func (s Session) IsNil() bool { return s.pn == nil } +func (s Session) Free() { + C.pn_session_free(s.pn) +} +func (s Session) State() State { + return State(C.pn_session_state(s.pn)) +} +func (s Session) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_session_error(s.pn))) +} +func (s Session) Condition() Condition { + return Condition{C.pn_session_condition(s.pn)} +} +func (s Session) RemoteCondition() Condition { + return Condition{C.pn_session_remote_condition(s.pn)} +} +func (s Session) Connection() Connection { + return Connection{C.pn_session_connection(s.pn)} +} +func (s Session) Open() { + C.pn_session_open(s.pn) +} +func (s Session) Close() { + C.pn_session_close(s.pn) +} +func (s Session) IncomingCapacity() uint { + return uint(C.pn_session_get_incoming_capacity(s.pn)) +} +func (s Session) SetIncomingCapacity(capacity uint) { + C.pn_session_set_incoming_capacity(s.pn, C.size_t(capacity)) +} +func (s Session) OutgoingBytes() uint { + return uint(C.pn_session_outgoing_bytes(s.pn)) +} +func (s Session) IncomingBytes() uint { + return uint(C.pn_session_incoming_bytes(s.pn)) +} +func (s Session) Next(state State) Session { + return Session{C.pn_session_next(s.pn, C.pn_state_t(state))} +} + +// Wrappers for declarations in link.h + +type SndSettleMode C.pn_snd_settle_mode_t + +const ( + PnSndUnsettled SndSettleMode = C.PN_SND_UNSETTLED + PnSndSettled SndSettleMode = C.PN_SND_SETTLED + PnSndMixed SndSettleMode = C.PN_SND_MIXED +) + +func (e SndSettleMode) String() string { + switch e { + + case C.PN_SND_UNSETTLED: + return "SndUnsettled" + case C.PN_SND_SETTLED: + return "SndSettled" + case C.PN_SND_MIXED: + return "SndMixed" + } + return "unknown" +} + +type RcvSettleMode C.pn_rcv_settle_mode_t + +const ( + PnRcvFirst RcvSettleMode = C.PN_RCV_FIRST + PnRcvSecond RcvSettleMode = C.PN_RCV_SECOND +) + +func (e RcvSettleMode) String() string { + switch e { + + case C.PN_RCV_FIRST: + return "RcvFirst" + case C.PN_RCV_SECOND: + return "RcvSecond" + } + return "unknown" +} + +type Link struct{ pn *C.pn_link_t } + +func (l Link) IsNil() bool { return l.pn == nil } +func (l Link) Free() { + C.pn_link_free(l.pn) +} +func (l Link) Name() string { + return C.GoString(C.pn_link_name(l.pn)) +} +func (l Link) IsSender() bool { + return bool(C.pn_link_is_sender(l.pn)) +} +func (l Link) IsReceiver() bool { + return bool(C.pn_link_is_receiver(l.pn)) +} +func (l Link) State() State { + return State(C.pn_link_state(l.pn)) +} +func (l Link) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_link_error(l.pn))) +} +func (l Link) Condition() Condition { + return Condition{C.pn_link_condition(l.pn)} +} +func (l Link) RemoteCondition() Condition { + return Condition{C.pn_link_remote_condition(l.pn)} +} +func (l Link) Session() Session { + return Session{C.pn_link_session(l.pn)} +} +func (l Link) Next(state State) Link { + return Link{C.pn_link_next(l.pn, C.pn_state_t(state))} +} +func (l Link) Open() { + C.pn_link_open(l.pn) +} +func (l Link) Close() { + C.pn_link_close(l.pn) +} +func (l Link) Detach() { + C.pn_link_detach(l.pn) +} +func (l Link) Source() Terminus { + return Terminus{C.pn_link_source(l.pn)} +} +func (l Link) Target() Terminus { + return Terminus{C.pn_link_target(l.pn)} +} +func (l Link) RemoteSource() Terminus { + return Terminus{C.pn_link_remote_source(l.pn)} +} +func (l Link) RemoteTarget() Terminus { + return Terminus{C.pn_link_remote_target(l.pn)} +} +func (l Link) Current() Delivery { + return Delivery{C.pn_link_current(l.pn)} +} +func (l Link) Advance() bool { + return bool(C.pn_link_advance(l.pn)) +} +func (l Link) Credit() int { + return int(C.pn_link_credit(l.pn)) +} +func (l Link) Queued() int { + return int(C.pn_link_queued(l.pn)) +} +func (l Link) RemoteCredit() int { + return int(C.pn_link_remote_credit(l.pn)) +} +func (l Link) IsDrain() bool { + return bool(C.pn_link_get_drain(l.pn)) +} +func (l Link) Drained() int { + return int(C.pn_link_drained(l.pn)) +} +func (l Link) Available() int { + return int(C.pn_link_available(l.pn)) +} +func (l Link) SndSettleMode() SndSettleMode { + return SndSettleMode(C.pn_link_snd_settle_mode(l.pn)) +} +func (l Link) RcvSettleMode() RcvSettleMode { + return RcvSettleMode(C.pn_link_rcv_settle_mode(l.pn)) +} +func (l Link) SetSndSettleMode(mode SndSettleMode) { + C.pn_link_set_snd_settle_mode(l.pn, C.pn_snd_settle_mode_t(mode)) +} +func (l Link) SetRcvSettleMode(mode RcvSettleMode) { + C.pn_link_set_rcv_settle_mode(l.pn, C.pn_rcv_settle_mode_t(mode)) +} +func (l Link) RemoteSndSettleMode() SndSettleMode { + return SndSettleMode(C.pn_link_remote_snd_settle_mode(l.pn)) +} +func (l Link) RemoteRcvSettleMode() RcvSettleMode { + return RcvSettleMode(C.pn_link_remote_rcv_settle_mode(l.pn)) +} +func (l Link) Unsettled() int { + return int(C.pn_link_unsettled(l.pn)) +} +func (l Link) Offered(credit int) { + C.pn_link_offered(l.pn, C.int(credit)) +} +func (l Link) Flow(credit int) { + C.pn_link_flow(l.pn, C.int(credit)) +} +func (l Link) Drain(credit int) { + C.pn_link_drain(l.pn, C.int(credit)) +} +func (l Link) SetDrain(drain bool) { + C.pn_link_set_drain(l.pn, C.bool(drain)) +} +func (l Link) Draining() bool { + return bool(C.pn_link_draining(l.pn)) +} + +// Wrappers for declarations in delivery.h + +type Delivery struct{ pn *C.pn_delivery_t } + +func (d Delivery) IsNil() bool { return d.pn == nil } +func (d Delivery) Tag() DeliveryTag { + return DeliveryTag{C.pn_delivery_tag(d.pn)} +} +func (d Delivery) Link() Link { + return Link{C.pn_delivery_link(d.pn)} +} +func (d Delivery) Local() Disposition { + return Disposition{C.pn_delivery_local(d.pn)} +} +func (d Delivery) LocalState() uint64 { + return uint64(C.pn_delivery_local_state(d.pn)) +} +func (d Delivery) Remote() Disposition { + return Disposition{C.pn_delivery_remote(d.pn)} +} +func (d Delivery) RemoteState() uint64 { + return uint64(C.pn_delivery_remote_state(d.pn)) +} +func (d Delivery) Settled() bool { + return bool(C.pn_delivery_settled(d.pn)) +} +func (d Delivery) Pending() uint { + return uint(C.pn_delivery_pending(d.pn)) +} +func (d Delivery) Partial() bool { + return bool(C.pn_delivery_partial(d.pn)) +} +func (d Delivery) Writable() bool { + return bool(C.pn_delivery_writable(d.pn)) +} +func (d Delivery) Readable() bool { + return bool(C.pn_delivery_readable(d.pn)) +} +func (d Delivery) Updated() bool { + return bool(C.pn_delivery_updated(d.pn)) +} +func (d Delivery) Update(state uint64) { + C.pn_delivery_update(d.pn, C.uint64_t(state)) +} +func (d Delivery) Clear() { + C.pn_delivery_clear(d.pn) +} +func (d Delivery) Current() bool { + return bool(C.pn_delivery_current(d.pn)) +} +func (d Delivery) Settle() { + C.pn_delivery_settle(d.pn) +} +func (d Delivery) Dump() { + C.pn_delivery_dump(d.pn) +} +func (d Delivery) Buffered() bool { + return bool(C.pn_delivery_buffered(d.pn)) +} + +// Wrappers for declarations in disposition.h + +type Disposition struct{ pn *C.pn_disposition_t } + +func (d Disposition) IsNil() bool { return d.pn == nil } +func (d Disposition) Type() uint64 { + return uint64(C.pn_disposition_type(d.pn)) +} +func (d Disposition) Condition() Condition { + return Condition{C.pn_disposition_condition(d.pn)} +} +func (d Disposition) Data() Data { + return Data{C.pn_disposition_data(d.pn)} +} +func (d Disposition) SectionNumber() uint16 { + return uint16(C.pn_disposition_get_section_number(d.pn)) +} +func (d Disposition) SetSectionNumber(section_number uint16) { + C.pn_disposition_set_section_number(d.pn, C.uint32_t(section_number)) +} +func (d Disposition) SectionOffset() uint64 { + return uint64(C.pn_disposition_get_section_offset(d.pn)) +} +func (d Disposition) SetSectionOffset(section_offset uint64) { + C.pn_disposition_set_section_offset(d.pn, C.uint64_t(section_offset)) +} +func (d Disposition) IsFailed() bool { + return bool(C.pn_disposition_is_failed(d.pn)) +} +func (d Disposition) SetFailed(failed bool) { + C.pn_disposition_set_failed(d.pn, C.bool(failed)) +} +func (d Disposition) IsUndeliverable() bool { + return bool(C.pn_disposition_is_undeliverable(d.pn)) +} +func (d Disposition) SetUndeliverable(undeliverable bool) { + C.pn_disposition_set_undeliverable(d.pn, C.bool(undeliverable)) +} +func (d Disposition) Annotations() Data { + return Data{C.pn_disposition_annotations(d.pn)} +} + +// Wrappers for declarations in condition.h + +type Condition struct{ pn *C.pn_condition_t } + +func (c Condition) IsNil() bool { return c.pn == nil } +func (c Condition) IsSet() bool { + return bool(C.pn_condition_is_set(c.pn)) +} +func (c Condition) Clear() { + C.pn_condition_clear(c.pn) +} +func (c Condition) Name() string { + return C.GoString(C.pn_condition_get_name(c.pn)) +} +func (c Condition) SetName(name string) int { + nameC := C.CString(name) + defer C.free(unsafe.Pointer(nameC)) + + return int(C.pn_condition_set_name(c.pn, nameC)) +} +func (c Condition) Description() string { + return C.GoString(C.pn_condition_get_description(c.pn)) +} +func (c Condition) SetDescription(description string) int { + descriptionC := C.CString(description) + defer C.free(unsafe.Pointer(descriptionC)) + + return int(C.pn_condition_set_description(c.pn, descriptionC)) +} +func (c Condition) Info() Data { + return Data{C.pn_condition_info(c.pn)} +} +func (c Condition) IsRedirect() bool { + return bool(C.pn_condition_is_redirect(c.pn)) +} +func (c Condition) RedirectHost() string { + return C.GoString(C.pn_condition_redirect_host(c.pn)) +} +func (c Condition) RedirectPort() int { + return int(C.pn_condition_redirect_port(c.pn)) +} + +// Wrappers for declarations in terminus.h + +type TerminusType C.pn_terminus_type_t + +const ( + PnUnspecified TerminusType = C.PN_UNSPECIFIED + PnSource TerminusType = C.PN_SOURCE + PnTarget TerminusType = C.PN_TARGET + PnCoordinator TerminusType = C.PN_COORDINATOR +) + +func (e TerminusType) String() string { + switch e { + + case C.PN_UNSPECIFIED: + return "Unspecified" + case C.PN_SOURCE: + return "Source" + case C.PN_TARGET: + return "Target" + case C.PN_COORDINATOR: + return "Coordinator" + } + return "unknown" +} + +type Durability C.pn_durability_t + +const ( + PnNondurable Durability = C.PN_NONDURABLE + PnConfiguration Durability = C.PN_CONFIGURATION + PnDeliveries Durability = C.PN_DELIVERIES +) + +func (e Durability) String() string { + switch e { + + case C.PN_NONDURABLE: + return "Nondurable" + case C.PN_CONFIGURATION: + return "Configuration" + case C.PN_DELIVERIES: + return "Deliveries" + } + return "unknown" +} + +type ExpiryPolicy C.pn_expiry_policy_t + +const ( + PnExpireWithLink ExpiryPolicy = C.PN_EXPIRE_WITH_LINK + PnExpireWithSession ExpiryPolicy = C.PN_EXPIRE_WITH_SESSION + PnExpireWithConnection ExpiryPolicy = C.PN_EXPIRE_WITH_CONNECTION + PnExpireNever ExpiryPolicy = C.PN_EXPIRE_NEVER +) + +func (e ExpiryPolicy) String() string { + switch e { + + case C.PN_EXPIRE_WITH_LINK: + return "ExpireWithLink" + case C.PN_EXPIRE_WITH_SESSION: + return "ExpireWithSession" + case C.PN_EXPIRE_WITH_CONNECTION: + return "ExpireWithConnection" + case C.PN_EXPIRE_NEVER: + return "ExpireNever" + } + return "unknown" +} + +type DistributionMode C.pn_distribution_mode_t + +const ( + PnDistModeUnspecified DistributionMode = C.PN_DIST_MODE_UNSPECIFIED + PnDistModeCopy DistributionMode = C.PN_DIST_MODE_COPY + PnDistModeMove DistributionMode = C.PN_DIST_MODE_MOVE +) + +func (e DistributionMode) String() string { + switch e { + + case C.PN_DIST_MODE_UNSPECIFIED: + return "DistModeUnspecified" + case C.PN_DIST_MODE_COPY: + return "DistModeCopy" + case C.PN_DIST_MODE_MOVE: + return "DistModeMove" + } + return "unknown" +} + +type Terminus struct{ pn *C.pn_terminus_t } + +func (t Terminus) IsNil() bool { return t.pn == nil } +func (t Terminus) Type() TerminusType { + return TerminusType(C.pn_terminus_get_type(t.pn)) +} +func (t Terminus) SetType(type_ TerminusType) int { + return int(C.pn_terminus_set_type(t.pn, C.pn_terminus_type_t(type_))) +} +func (t Terminus) Address() string { + return C.GoString(C.pn_terminus_get_address(t.pn)) +} +func (t Terminus) SetAddress(address string) int { + addressC := C.CString(address) + defer C.free(unsafe.Pointer(addressC)) + + return int(C.pn_terminus_set_address(t.pn, addressC)) +} +func (t Terminus) SetDistributionMode(mode DistributionMode) int { + return int(C.pn_terminus_set_distribution_mode(t.pn, C.pn_distribution_mode_t(mode))) +} +func (t Terminus) Durability() Durability { + return Durability(C.pn_terminus_get_durability(t.pn)) +} +func (t Terminus) SetDurability(durability Durability) int { + return int(C.pn_terminus_set_durability(t.pn, C.pn_durability_t(durability))) +} +func (t Terminus) ExpiryPolicy() ExpiryPolicy { + return ExpiryPolicy(C.pn_terminus_get_expiry_policy(t.pn)) +} +func (t Terminus) SetExpiryPolicy(policy ExpiryPolicy) int { + return int(C.pn_terminus_set_expiry_policy(t.pn, C.pn_expiry_policy_t(policy))) +} +func (t Terminus) Timeout() time.Duration { + return (time.Duration(C.pn_terminus_get_timeout(t.pn)) * time.Second) +} +func (t Terminus) SetTimeout(timeout time.Duration) int { + return int(C.pn_terminus_set_timeout(t.pn, C.pn_seconds_t(timeout))) +} +func (t Terminus) IsDynamic() bool { + return bool(C.pn_terminus_is_dynamic(t.pn)) +} +func (t Terminus) SetDynamic(dynamic bool) int { + return int(C.pn_terminus_set_dynamic(t.pn, C.bool(dynamic))) +} +func (t Terminus) Properties() Data { + return Data{C.pn_terminus_properties(t.pn)} +} +func (t Terminus) Capabilities() Data { + return Data{C.pn_terminus_capabilities(t.pn)} +} +func (t Terminus) Outcomes() Data { + return Data{C.pn_terminus_outcomes(t.pn)} +} +func (t Terminus) Filter() Data { + return Data{C.pn_terminus_filter(t.pn)} +} +func (t Terminus) Copy(src Terminus) int { + return int(C.pn_terminus_copy(t.pn, src.pn)) +} + +// Wrappers for declarations in connection.h + +type Connection struct{ pn *C.pn_connection_t } + +func (c Connection) IsNil() bool { return c.pn == nil } +func (c Connection) Free() { + C.pn_connection_free(c.pn) +} +func (c Connection) Release() { + C.pn_connection_release(c.pn) +} +func (c Connection) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_connection_error(c.pn))) +} +func (c Connection) State() State { + return State(C.pn_connection_state(c.pn)) +} +func (c Connection) Open() { + C.pn_connection_open(c.pn) +} +func (c Connection) Close() { + C.pn_connection_close(c.pn) +} +func (c Connection) Reset() { + C.pn_connection_reset(c.pn) +} +func (c Connection) Condition() Condition { + return Condition{C.pn_connection_condition(c.pn)} +} +func (c Connection) RemoteCondition() Condition { + return Condition{C.pn_connection_remote_condition(c.pn)} +} +func (c Connection) Container() string { + return C.GoString(C.pn_connection_get_container(c.pn)) +} +func (c Connection) SetContainer(container string) { + containerC := C.CString(container) + defer C.free(unsafe.Pointer(containerC)) + + C.pn_connection_set_container(c.pn, containerC) +} +func (c Connection) SetUser(user string) { + userC := C.CString(user) + defer C.free(unsafe.Pointer(userC)) + + C.pn_connection_set_user(c.pn, userC) +} +func (c Connection) SetPassword(password string) { + passwordC := C.CString(password) + defer C.free(unsafe.Pointer(passwordC)) + + C.pn_connection_set_password(c.pn, passwordC) +} +func (c Connection) User() string { + return C.GoString(C.pn_connection_get_user(c.pn)) +} +func (c Connection) Hostname() string { + return C.GoString(C.pn_connection_get_hostname(c.pn)) +} +func (c Connection) SetHostname(hostname string) { + hostnameC := C.CString(hostname) + defer C.free(unsafe.Pointer(hostnameC)) + + C.pn_connection_set_hostname(c.pn, hostnameC) +} +func (c Connection) RemoteContainer() string { + return C.GoString(C.pn_connection_remote_container(c.pn)) +} +func (c Connection) RemoteHostname() string { + return C.GoString(C.pn_connection_remote_hostname(c.pn)) +} +func (c Connection) OfferedCapabilities() Data { + return Data{C.pn_connection_offered_capabilities(c.pn)} +} +func (c Connection) DesiredCapabilities() Data { + return Data{C.pn_connection_desired_capabilities(c.pn)} +} +func (c Connection) Properties() Data { + return Data{C.pn_connection_properties(c.pn)} +} +func (c Connection) RemoteOfferedCapabilities() Data { + return Data{C.pn_connection_remote_offered_capabilities(c.pn)} +} +func (c Connection) RemoteDesiredCapabilities() Data { + return Data{C.pn_connection_remote_desired_capabilities(c.pn)} +} +func (c Connection) RemoteProperties() Data { + return Data{C.pn_connection_remote_properties(c.pn)} +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/internal/error.go b/proton-c/bindings/go/internal/error.go new file mode 100644 index 0000000..01ba890 --- /dev/null +++ b/proton-c/bindings/go/internal/error.go @@ -0,0 +1,125 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Internal implementation details - ignore. +package internal + +// #cgo LDFLAGS: -lqpid-proton +// #include +// #include +import "C" + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "unsafe" +) + +// Error type for all proton errors. +type Error string + +// Error prefixes error message with proton: +func (e Error) Error() string { + return "proton: " + string(e) +} + +// Errorf creates an Error with a formatted message +func Errorf(format string, a ...interface{}) Error { + return Error(fmt.Sprintf(format, a...)) +} + +type PnErrorCode int + +func (e PnErrorCode) String() string { + switch e { + case C.PN_EOS: + return "end-of-data" + case C.PN_ERR: + return "error" + case C.PN_OVERFLOW: + return "overflow" + case C.PN_UNDERFLOW: + return "underflow" + case C.PN_STATE_ERR: + return "bad-state" + case C.PN_ARG_ERR: + return "invalid-argument" + case C.PN_TIMEOUT: + return "timeout" + case C.PN_INTR: + return "interrupted" + case C.PN_INPROGRESS: + return "in-progress" + default: + return fmt.Sprintf("unknown-error(%d)", e) + } +} + +func PnError(p unsafe.Pointer) error { + e := (*C.pn_error_t)(p) + if e == nil || C.pn_error_code(e) == 0 { + return nil + } + return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e))) +} + +// DoRecover is called to recover from internal panics +func DoRecover(err *error) { + r := recover() + switch r := r.(type) { + case nil: // We are not recovering + return + case runtime.Error: // Don't catch runtime.Error + panic(r) + case error: + *err = r + default: + panic(r) + } +} + +// panicIf panics if condition is true, the panic value is Errorf(fmt, args...) +func panicIf(condition bool, fmt string, args ...interface{}) { + if condition { + panic(Errorf(fmt, args...)) + } +} + +// FirstError is a goroutine-safe error holder that keeps the first error that is set. +type FirstError struct { + err atomic.Value + once sync.Once +} + +// Set the error if not allread set. +func (e *FirstError) Set(err error) { + e.once.Do(func() { e.err.Store(err) }) +} + +// Get the error. +func (e *FirstError) Get() error { + v := e.err.Load() + if v != nil { + return v.(error) + } else { + return nil + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/messaging/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/messaging/doc.go b/proton-c/bindings/go/messaging/doc.go new file mode 100644 index 0000000..c815f4e --- /dev/null +++ b/proton-c/bindings/go/messaging/doc.go @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package messaging provides a procedural, concurrent Go API for exchanging AMQP messages. +*/ +package messaging + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// Just for package comment http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/messaging/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/messaging/handler.go b/proton-c/bindings/go/messaging/handler.go new file mode 100644 index 0000000..f95c42b --- /dev/null +++ b/proton-c/bindings/go/messaging/handler.go @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package messaging + +import ( + "qpid.apache.org/go/amqp" + "qpid.apache.org/go/event" +) + +// FIXME aconway 2015-04-28: cleanup - exposing delivery vs. disposition. + +type acksMap map[event.Delivery]chan Disposition +type receiverMap map[event.Link]chan amqp.Message + +type handler struct { + connection *Connection + acks acksMap + receivers receiverMap +} + +func newHandler(c *Connection) *handler { + return &handler{c, make(acksMap), make(receiverMap)} +} + +func (h *handler) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { + switch t { + // FIXME aconway 2015-04-29: handle errors. + case event.MConnectionClosed: + for _, ack := range h.acks { + // FIXME aconway 2015-04-29: communicate error info + close(ack) + } + + case event.MSettled: + ack := h.acks[e.Delivery()] + if ack != nil { + ack <- Disposition(e.Delivery().Remote().Type()) + close(ack) + delete(h.acks, e.Delivery()) + } + + case event.MMessage: + r := h.receivers[e.Link()] + if r != nil { + m, _ := event.DecodeMessage(e) + // FIXME aconway 2015-04-29: hack, direct send, possible blocking. + r <- m + } else { + // FIXME aconway 2015-04-29: Message with no receiver - log? panic? deadletter? drop? + } + } + return nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/messaging/messaging.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/messaging/messaging.go b/proton-c/bindings/go/messaging/messaging.go new file mode 100644 index 0000000..e2ffaef --- /dev/null +++ b/proton-c/bindings/go/messaging/messaging.go @@ -0,0 +1,250 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package messaging + +// #include +import "C" + +import ( + "net" + "qpid.apache.org/go/amqp" + "qpid.apache.org/go/event" +) + +// Connection is a connection to a remote AMQP endpoint. +// +// You can set exported fields to configure the connection before calling +// Connection.Open() +// +type Connection struct { + // Server = true means a the connection will do automatic protocol detection. + Server bool + + // FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc. + + handler *handler + pump *event.Pump + session Session +} + +// Make an AMQP connection over a net.Conn connection. +// +// Use Connection.Close() to close the Connection, this will also close conn. +// Using conn.Close() directly will cause an abrupt disconnect rather than an +// orderly AMQP close. +// +func (c *Connection) Open(conn net.Conn) (err error) { + c.handler = newHandler(c) + c.pump, err = event.NewPump(conn, + event.NewMessagingDelegator(c.handler), + ) + if err != nil { + return err + } + if c.Server { + c.pump.Server() + } + go c.pump.Run() + return nil +} + +// Connect opens a default client connection. It is a shortcut for +// c := &Connection +// c.Open() +// +func Connect(conn net.Conn) (*Connection, error) { + c := &Connection{} + err := c.Open(conn) + return c, err +} + +// Close the connection. +// +// Connections must be closed to clean up resources and stop associated goroutines. +func (c *Connection) Close() error { return c.pump.Close() } + +// DefaultSession returns a default session for the connection. +// +// It is created on the first call to DefaultSession() and returned from all subsequent calls. +// Use Session() for more control over creating sessions. +// +func (c *Connection) DefaultSession() (s Session, err error) { + if c.session.e.IsNil() { + c.session, err = c.Session() + } + return c.session, err +} + +type sessionErr struct { + s event.Session + err error +} + +// Session creates a new session. +func (c *Connection) Session() (Session, error) { + connection := c.pump.Connection() + result := make(chan sessionErr) + c.pump.Inject <- func() { + s, err := connection.Session() + if err == nil { + s.Open() + } + result <- sessionErr{s, err} + } + se := <-result + return Session{se.s, c.pump}, se.err +} + +// FIXME aconway 2015-04-27: set sender name, options etc. + +// Sender creates a Sender that will send messages to the address addr. +func (c *Connection) Sender(addr string) (s Sender, err error) { + session, err := c.DefaultSession() + if err != nil { + return Sender{}, err + } + result := make(chan Sender) + c.pump.Inject <- func() { + link := session.e.Sender(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Target().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + result <- Sender{Link{c, link}} + } + return <-result, err +} + +// Receiver returns a receiver that will receive messages sent to address addr. +func (c *Connection) Receiver(addr string) (r Receiver, err error) { + // FIXME aconway 2015-04-29: move code to session, in link.go? + session, err := c.DefaultSession() + if err != nil { + return Receiver{}, err + } + result := make(chan Receiver) + c.pump.Inject <- func() { + link := session.e.Receiver(linkNames.Next()) + if link.IsNil() { + err = session.e.Error() + } else { + link.Source().SetAddress(addr) + // FIXME aconway 2015-04-27: link options? + link.Open() + } + // FIXME aconway 2015-04-29: hack to avoid blocking, need proper buffering linked to flow control + rchan := make(chan amqp.Message, 1000) + c.handler.receivers[link] = rchan + result <- Receiver{Link{c, link}, rchan} + } + return <-result, err +} + +// FIXME aconway 2015-04-29: counter per session. +var linkNames amqp.UidCounter + +// Session is an AMQP session, it contains Senders and Receivers. +// Every Connection has a DefaultSession, you can create additional sessions +// with Connection.Session() +type Session struct { + e event.Session + pump *event.Pump +} + +// FIXME aconway 2015-05-05: REWORK Sender/receiver/session. + +// Disposition indicates the outcome of a settled message delivery. +type Disposition uint64 + +const ( + // Message was accepted by the receiver + Accepted Disposition = C.PN_ACCEPTED + // Message was rejected as invalid by the receiver + Rejected = C.PN_REJECTED + // Message was not processed by the receiver but may be processed by some other receiver. + Released = C.PN_RELEASED +) + +// String human readable name for a Disposition. +func (d Disposition) String() string { + switch d { + case Accepted: + return "Accepted" + case Rejected: + return "Rejected" + case Released: + return "Released" + default: + return "Unknown" + } +} + +// FIXME aconway 2015-04-29: How to signal errors via ack channels. + +// An Acknowledgement is a channel which will receive the Disposition of the message +// when it is acknowledged. The channel is closed after the disposition is sent. +type Acknowledgement <-chan Disposition + +// Link has common data and methods for Sender and Receiver links. +type Link struct { + connection *Connection + elink event.Link +} + +// Sender sends messages. +type Sender struct { + Link +} + +// FIXME aconway 2015-04-28: allow user to specify delivery tag. +// FIXME aconway 2015-04-28: should we provide a sending channel rather than a send function? + +// Send sends a message. If d is not nil, the disposition is retured on d. +// If d is nil the message is sent pre-settled and no disposition is returned. +func (s *Sender) Send(m amqp.Message) (ack Acknowledgement, err error) { + ackChan := make(chan Disposition, 1) + ack = ackChan + s.connection.pump.Inject <- func() { + // FIXME aconway 2015-04-28: flow control & credit, buffer or fail? + delivery, err := s.elink.Send(m) + if err == nil { // FIXME aconway 2015-04-28: error handling + s.connection.handler.acks[delivery] = ackChan + } + } + return ack, nil +} + +// Close the sender. +func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free + +// Receiver receives messages via the channel Receive. +type Receiver struct { + Link + // Channel of messag + Receive <-chan amqp.Message +} + +// FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method? + +// Close the Receiver. +func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/src/Makefile ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/Makefile b/proton-c/bindings/go/src/Makefile deleted file mode 100644 index 98baa4c..0000000 --- a/proton-c/bindings/go/src/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -# FIXME aconway 2015-04-09: integrate with cmake - -#GOFLAGS=-compiler gccgo -gccgoflags "-g -O0" -#GOFLAGS=-gcflags "-N -l" - -GENERATED=qpid.apache.org/proton/event/wrappers_gen.go - -test: $(GENERATED) - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/event - go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/messaging - -$(GENERATED): genwrap.go ../../../include/proton/*.h - go run genwrap.go - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/src/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go deleted file mode 100644 index e34c045..0000000 --- a/proton-c/bindings/go/src/genwrap.go +++ /dev/null @@ -1,423 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// Code generator to generate a think Go wrapper API around the C proton API. -// - -package main - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "os/exec" - "path" - "regexp" - "strings" - "text/template" -) - -func mixedCase(s string) string { - result := "" - for _, w := range strings.Split(s, "_") { - if w != "" { - result = result + strings.ToUpper(w[0:1]) + strings.ToLower(w[1:]) - } - } - return result -} - -func mixedCaseTrim(s, prefix string) string { - return mixedCase(strings.TrimPrefix(s, prefix)) -} - -var templateFuncs = template.FuncMap{"mixedCase": mixedCase, "mixedCaseTrim": mixedCaseTrim} - -func doTemplate(out io.Writer, data interface{}, tmpl string) { - panicIf(template.Must(template.New("").Funcs(templateFuncs).Parse(tmpl)).Execute(out, data)) -} - -type enumType struct { - Name string - Values []string -} - -// Find enums in a header file return map of enum name to values. -func findEnums(header string) (enums []enumType) { - for _, enum := range enumDefRe.FindAllStringSubmatch(header, -1) { - enums = append(enums, enumType{enum[2], enumValRe.FindAllString(enum[1], -1)}) - } - return enums -} - -func genEnum(out io.Writer, name string, values []string) { - doTemplate(out, []interface{}{name, values}, `{{$enumName := index . 0}}{{$values := index . 1}} -type {{mixedCase $enumName}} C.pn_{{$enumName}}_t -const ({{range $values}} - {{mixedCase .}} {{mixedCase $enumName}} = C.{{.}} {{end}} -) - -func (e {{mixedCase $enumName}}) String() string { - switch e { -{{range $values}} - case C.{{.}}: return "{{mixedCaseTrim . "PN_"}}" {{end}} - } - return "unknown" -} -`) -} - -var ( - reSpace = regexp.MustCompile("\\s+") -) - -func panicIf(err error) { - if err != nil { - panic(err) - } -} - -func readHeader(name string) string { - file, err := os.Open(path.Join("..", "..", "..", "include", "proton", name+".h")) - panicIf(err) - defer file.Close() - s, err := ioutil.ReadAll(file) - panicIf(err) - return string(s) -} - -var copyright string = `/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// NOTE: This file was generated by genwrap.go, do not edit it by hand. -// -` - -type eventType struct { - // C, function and interface names for the event - Name, Cname, Fname, Iname string -} - -func newEventType(cName string) eventType { - var etype eventType - etype.Cname = cName - etype.Name = mixedCaseTrim(cName, "PN_") - etype.Fname = "On" + etype.Name - etype.Iname = etype.Fname + "Interface" - return etype -} - -var ( - enumDefRe = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;") - enumValRe = regexp.MustCompile("PN_[A-Z_]+") - skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER") - skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport") -) - -// Generate event wrappers. -func event(out io.Writer) { - event_h := readHeader("event") - - // Event is implented by hand in wrappers.go - - // Get all the pn_event_type_t enum values - var etypes []eventType - enums := findEnums(event_h) - for _, e := range enums[0].Values { - if skipEventRe.FindStringSubmatch(e) == nil { - etypes = append(etypes, newEventType(e)) - } - } - - doTemplate(out, etypes, ` -type EventType int -const ({{range .}} - E{{.Name}} EventType = C.{{.Cname}}{{end}} -) -`) - - doTemplate(out, etypes, ` -func (e EventType) String() string { - switch e { -{{range .}} - case C.{{.Cname}}: return "{{.Name}}"{{end}} - } - return "Unknown" -} -`) -} - -type genType struct { - Ctype, Gotype string - ToGo func(value string) string - ToC func(value string) string - Assign func(value string) string -} - -func (g genType) printBody(out io.Writer, value string) { - if g.Gotype != "" { - fmt.Fprintf(out, "return %s", g.ToGo(value)) - } else { - fmt.Fprintf(out, "%s", value) - } -} - -func (g genType) goLiteral(value string) string { - return fmt.Sprintf("%s{%s}", g.Gotype, value) -} - -func (g genType) goConvert(value string) string { - switch g.Gotype { - case "string": - return fmt.Sprintf("C.GoString(%s)", value) - case "Event": - return fmt.Sprintf("makeEvent(%s)", value) - default: - return fmt.Sprintf("%s(%s)", g.Gotype, value) - } -} - -var notStruct = map[string]bool{ - "EventType": true, - "SndSettleMode": true, - "RcvSettleMode": true, - "TerminusType": true, - "State": true, - "Durability": true, - "ExpiryPolicy": true, - "DistributionMode": true, -} - -func mapType(ctype string) (g genType) { - g.Ctype = "C." + strings.Trim(ctype, " \n") - - switch g.Ctype { - case "C.void": - g.Gotype = "" - case "C.size_t": - g.Gotype = "uint" - case "C.int": - g.Gotype = "int" - case "C.void *": - g.Gotype = "unsafe.Pointer" - g.Ctype = "unsafe.Pointer" - case "C.bool": - g.Gotype = "bool" - case "C.ssize_t": - g.Gotype = "int" - case "C.uint64_t": - g.Gotype = "uint64" - case "C.uint32_t": - g.Gotype = "uint16" - case "C.uint16_t": - g.Gotype = "uint32" - case "C.const char *": - fallthrough - case "C.char *": - g.Gotype = "string" - g.Ctype = "C.CString" - g.ToC = func(v string) string { return fmt.Sprintf("%sC", v) } - g.Assign = func(v string) string { - return fmt.Sprintf("%sC := C.CString(%s)\n defer C.free(unsafe.Pointer(%sC))\n", v, v, v) - } - case "C.pn_seconds_t": - g.Gotype = "time.Duration" - g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) } - case "C.pn_error_t *": - g.Gotype = "error" - g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) } - default: - pnId := regexp.MustCompile(" *pn_([a-z_]+)_t *\\*? *") - match := pnId.FindStringSubmatch(g.Ctype) - if match == nil { - panic(fmt.Errorf("unknown C type %#v", g.Ctype)) - } - g.Gotype = mixedCase(match[1]) - if !notStruct[g.Gotype] { - g.ToGo = g.goLiteral - g.ToC = func(v string) string { return v + ".pn" } - } - } - if g.ToGo == nil { - g.ToGo = g.goConvert // Use conversion by default. - } - if g.ToC == nil { - g.ToC = func(v string) string { return fmt.Sprintf("%s(%s)", g.Ctype, v) } - } - return -} - -type genArg struct { - Name string - genType -} - -var typeNameRe = regexp.MustCompile("^(.*( |\\*))([^ *]+)$") - -func splitArgs(argstr string) []genArg { - argstr = strings.Trim(argstr, " \n") - if argstr == "" { - return []genArg{} - } - args := make([]genArg, 0) - for _, item := range strings.Split(argstr, ",") { - item = strings.Trim(item, " \n") - typeName := typeNameRe.FindStringSubmatch(item) - if typeName == nil { - panic(fmt.Errorf("Can't split argument type/name %#v", item)) - } - cType := strings.Trim(typeName[1], " \n") - name := strings.Trim(typeName[3], " \n") - if name == "type" { - name = "type_" - } - args = append(args, genArg{name, mapType(cType)}) - } - return args -} - -func goArgs(args []genArg) string { - l := "" - for i, arg := range args { - if i != 0 { - l += ", " - } - l += arg.Name + " " + arg.Gotype - } - return l -} - -func cArgs(args []genArg) string { - l := "" - for _, arg := range args { - l += fmt.Sprintf(", %s", arg.ToC(arg.Name)) - } - return l -} - -func cAssigns(args []genArg) string { - l := "\n" - for _, arg := range args { - if arg.Assign != nil { - l += fmt.Sprintf("%s\n", arg.Assign(arg.Name)) - } - } - return l -} - -// Return the go name of the function or "" to skip the function. -func goFnName(api, fname string) string { - // Skip class, context and attachment functions. - if skipFnRe.FindStringSubmatch(fname) != nil { - return "" - } - switch api + "." + fname { - case "link.get_drain": - return "IsDrain" - default: - return mixedCaseTrim(fname, "get_") - } -} - -func apiWrapFns(api, header string, out io.Writer) { - fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api) - fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) - fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api)) - for _, m := range fn.FindAllStringSubmatch(header, -1) { - rtype, fname, argstr := mapType(m[1]), m[2], m[3] - gname := goFnName(api, fname) - if gname == "" { // Skip - continue - } - args := splitArgs(argstr) - fmt.Fprintf(out, "func (%c %s) %s", api[0], mixedCase(api), gname) - fmt.Fprintf(out, "(%s) %s { ", goArgs(args), rtype.Gotype) - fmt.Fprint(out, cAssigns(args)) - rtype.printBody(out, fmt.Sprintf("C.pn_%s_%s(%c.pn%s)", api, fname, api[0], cArgs(args))) - fmt.Fprintf(out, "}\n") - } -} - -func main() { - outpath := path.Join("qpid.apache.org", "proton", "event", "wrappers_gen.go") - out, err := os.Create(outpath) - panicIf(err) - defer out.Close() - - apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} - fmt.Fprintln(out, copyright) - fmt.Fprint(out, ` -package event - -import ( - "time" - "unsafe" - "qpid.apache.org/proton/internal" -) - -// #include -// #include -// #include -`) - for _, api := range apis { - fmt.Fprintf(out, "// #include \n", api) - } - fmt.Fprintln(out, `import "C"`) - - event(out) - - for _, api := range apis { - fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api) - header := readHeader(api) - enums := findEnums(header) - for _, e := range enums { - genEnum(out, e.Name, e.Values) - } - apiWrapFns(api, header, out) - } - out.Close() - - // Run gofmt. - cmd := exec.Command("gofmt", "-w", outpath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err = cmd.Run() - if err != nil { - fmt.Fprintf(os.Stderr, "gofmt: %s", err) - os.Exit(1) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go deleted file mode 100644 index 38c2d00..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package proton encodes and decodes AMQP messages and data as Go types. - -It follows the standard 'encoding' libraries pattern. The mapping between AMQP -and Go types is described in the documentation of the Marshal and Unmarshal -functions. - -The sub-packages 'event' and 'messaging' provide two alternative ways to write -AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' -gives complete low-level control of the underlying proton C engine. - -AMQP is an open standard for inter-operable message exchange, see -*/ -package proton - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. - -// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bb67e54d/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go deleted file mode 100644 index a0d45d7..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -/* -Package event provides a low-level API to the proton AMQP engine. - -For most tasks, consider instead package qpid.apache.org/proton/messaging. -It provides a higher-level, concurrent API that is easier to use. - -The API is event based. There are two alternative styles of handler. EventHandler -provides the core proton events. MessagingHandler provides a slighly simplified -view of the event stream and automates some common tasks. - -See type Pump documentation for more details of the interaction between proton -events and goroutines. -*/ -package event - -// #cgo LDFLAGS: -lqpid-proton -import "C" - -// This file is just for the package comment. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org