Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B654B200D4F for ; Wed, 6 Dec 2017 16:59:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B4F73160BFD; Wed, 6 Dec 2017 15:59:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 303DB160C1F for ; Wed, 6 Dec 2017 16:59:47 +0100 (CET) Received: (qmail 36945 invoked by uid 500); 6 Dec 2017 15:59:46 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 36746 invoked by uid 99); 6 Dec 2017 15:59:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Dec 2017 15:59:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FAA2F2DCA; Wed, 6 Dec 2017 15:59:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Wed, 06 Dec 2017 15:59:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] qpid-proton git commit: PROTON-1537: [ruby] Split Delivery API into Tracker and Transfer. archived-at: Wed, 06 Dec 2017 15:59:48 -0000 PROTON-1537: [ruby] Split Delivery API into Tracker and Transfer. Delivery is now used only for incoming messages, Tracker for outgoing messages, and Transfer is the common base class for both. Since most of the previous Deliveyr behavior is common, Delivery is still backwards-compatible with some deprecated methods. The API for release and modify is still compatible but has been re-worked so that Delivery#release(opts) now handles release and modified states with appropriate option settings. Tracker#modified provides to modified state directly. The Disposition class has is deprecated in favour of the new Delivery/Tracker methods. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e0445234 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e0445234 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e0445234 Branch: refs/heads/master Commit: e0445234dfdb78baa8c0d8a82d0c1b4111e4b6a1 Parents: 098f84d Author: Alan Conway Authored: Mon Dec 4 18:32:09 2017 -0500 Committer: Alan Conway Committed: Tue Dec 5 16:54:17 2017 -0500 ---------------------------------------------------------------------- proton-c/bindings/ruby/README.rdoc | 13 +- proton-c/bindings/ruby/lib/codec/data.rb | 8 +- proton-c/bindings/ruby/lib/core/connection.rb | 4 +- proton-c/bindings/ruby/lib/core/delivery.rb | 313 ++++--------------- proton-c/bindings/ruby/lib/core/disposition.rb | 31 +- proton-c/bindings/ruby/lib/core/event.rb | 14 +- proton-c/bindings/ruby/lib/core/exceptions.rb | 5 + proton-c/bindings/ruby/lib/core/link.rb | 19 +- proton-c/bindings/ruby/lib/core/message.rb | 16 +- .../bindings/ruby/lib/core/messaging_handler.rb | 8 +- proton-c/bindings/ruby/lib/core/sender.rb | 42 +-- proton-c/bindings/ruby/lib/core/tracker.rb | 41 +++ proton-c/bindings/ruby/lib/core/transfer.rb | 123 ++++++++ proton-c/bindings/ruby/lib/core/url.rb | 5 +- proton-c/bindings/ruby/lib/handler/adapter.rb | 15 +- proton-c/bindings/ruby/lib/qpid_proton.rb | 15 +- proton-c/bindings/ruby/lib/reactor/container.rb | 6 +- proton-c/bindings/ruby/tests/test_tools.rb | 4 +- 18 files changed, 314 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/README.rdoc ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/README.rdoc b/proton-c/bindings/ruby/README.rdoc index 1b902da..75552c8 100644 --- a/proton-c/bindings/ruby/README.rdoc +++ b/proton-c/bindings/ruby/README.rdoc @@ -16,9 +16,9 @@ Links are grouped in a {Qpid::Proton::Session}. Messages in the same session are Sessions belong to a {Qpid::Proton::Connection}. If you don't need multiple sessions, a connection will create links directly using a default session. -A {Qpid::Proton::Delivery} represents the transfer of a message and allows the receiver to accept or reject it. The sender can use a {Qpid::Proton::Tracker} to track the status of a sent message and find out if it was accepted. +A {Qpid::Proton::Transfer} represents the transfer of a message, the {Qpid::Proton::Delivery} subclass allows a receiver to accept or reject an incoming message. The {Qpid::Proton::Tracker} subclass allows a sender to track the status of a sent message and find out if it was accepted or rejected. -A delivery is settled when both ends are done with it. Different settlement methods give different levels of reliability: at-most-once, at-least-once, and exactly-once. See below. +A transfer is _settled_ when both ends are done with it. Different settlement methods give different levels of reliability: at-most-once, at-least-once, and exactly-once. See below. == The anatomy of a Proton application @@ -40,13 +40,14 @@ In the request-response pattern, a request message carries a reply-to address fo The server_direct.cpp example shows how to implement a request-response server. -== Settlement +== Settling a Message Transfer -A message is _settled_ by one end of a link when that end has forgotton the message. +A message transfer is _settled_ at one end of a link when that end of the link +has finished with the message and forgotten it. -_Pre-settled_ messages are settled by the sender before sending. If the connection is lost before the message is received by the receiver, the message will not be delivered. +_Pre-settled_ messages are settled by the sender before sending. This gives _at most once_ reliability(also known as _best effort_, _unreliable_ or _fire and forget_) since the sender never knows for sure if the message arrived. If the connection is lost before the message is received by the receiver, the message will not be delivered. -If the sender does not pre-settle a message, then the receiver settles it once it is processed, and the receiver is informed of the settlement via the {Qpid::Proton::Tracker Tracker}. If the connection is lost before the sender is informed of the settlement, then the delivery is considered in-doubt and the message should be re-set. This ensures it eventually gets delivered (provided the connection and link can be reestablished) but also that it may be delivered multiple times. +If the sender does not pre-settle a message, then the receiver settles it once it has been processed. The sender is informed of the settlement via the {Qpid::Proton::Tracker Tracker}. If the connection is lost before the sender has received notice of settlement, the delivery is considered in-doubt and the sender can re-send it. This ensures it eventually gets delivered (provided the connection and link can be reestablished) but it is possible for multiple copies of the same message are delivered, so the receiver must be aware of that. This is known as _at_least_once_ reliability. == Installing http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/codec/data.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/codec/data.rb b/proton-c/bindings/ruby/lib/codec/data.rb index bb242dc..2742008 100644 --- a/proton-c/bindings/ruby/lib/codec/data.rb +++ b/proton-c/bindings/ruby/lib/codec/data.rb @@ -82,11 +82,16 @@ module Qpid::Proton::Codec private + # Rewind and convert a pn_data_t* containing a single value to a ruby object. def self.to_object(impl) Data.new(impl).rewind.object; end - def self.from_object(impl, x) Data.new(impl).rewind.object = x; end + # Clear a pn_data_t* and convert a ruby object into it. + def self.from_object(impl, x) Data.new(impl).clear.object = x; end public + # TODO aconway 2017-12-05: confusing: use Data.new(cap) for making a + # brand new data object only, add Data.wrap(impl) to wrap existing data object. + # Creates a new instance. # @param capacity [Integer] capacity for the new data instance. def initialize(capacity = 16) @@ -123,6 +128,7 @@ module Qpid::Proton::Codec # def clear Cproton.pn_data_clear(@data) + self end # Clears the current node and sets the parent to the root node. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/connection.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb index 25149ce..c2f477c 100644 --- a/proton-c/bindings/ruby/lib/core/connection.rb +++ b/proton-c/bindings/ruby/lib/core/connection.rb @@ -76,7 +76,7 @@ module Qpid::Proton # @return AMQP container ID advertised by the remote peer def remote_container_id() Cproton.pn_connection_remote_container(@impl); end - alias :remote_container :remote_container_id + alias remote_container remote_container_id # @return [Container] the container managing this connection attr_reader :container @@ -170,7 +170,7 @@ module Qpid::Proton end # @deprecated use #default_session() - alias_method :session, :default_session + alias session default_session # Open a new session on this connection. def open_session http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/delivery.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/delivery.rb b/proton-c/bindings/ruby/lib/core/delivery.rb index a642a7b..838671f 100644 --- a/proton-c/bindings/ruby/lib/core/delivery.rb +++ b/proton-c/bindings/ruby/lib/core/delivery.rb @@ -5,7 +5,6 @@ # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # @@ -18,276 +17,74 @@ #++ module Qpid::Proton - - # A Delivery maintains detail on the delivery of data to an endpoint. - # - # A Delivery has a single parent Qpid::Proton::Link - # - # @example - # - # # SCENARIO: An event comes in notifying that data has been delivered to - # # the local endpoint. A Delivery object can be used to check - # # the details of the delivery. - # - # delivery = @event.delivery - # if delivery.readable? && !delivery.partial? - # # decode the incoming message - # msg = Qpid::Proton::Message.new - # msg.decode(link.receive(delivery.pending)) - # end - # - class Delivery - - include DeliveryState - - # @private - include Util::Wrapper - - # @private - def self.wrap(impl) # :nodoc: - return nil if impl.nil? - self.fetch_instance(impl, :pn_delivery_attachments) || Delivery.new(impl) - end - - # @private - def initialize(impl) - @impl = impl - @local = Disposition.new(Cproton.pn_delivery_local(impl), true) - @remote = Disposition.new(Cproton.pn_delivery_remote(impl), false) - self.class.store_instance(self, :pn_delivery_attachments) - end - - # @private - include Util::SwigHelper - - # @private - PROTON_METHOD_PREFIX = "pn_delivery" - - # @!attribute [r] tag - # - # @return [String] The tag for the delivery. - # - proton_caller :tag - - # @!attribute [r] writable? - # - # A delivery is considered writable if it is the current delivery on an - # outgoing link, and the link has positive credit. - # - # @return [Boolean] Returns if a delivery is writable. - # - proton_caller :writable? - - # @!attribute [r] readable? - # - # A delivery is considered readable if it is the current delivery on an - # incoming link. - # - # @return [Boolean] Returns if a delivery is readable. - # - proton_caller :readable? - # @!attribute [r] updated? - # - # A delivery is considered updated whenever the peer communicates a new - # disposition for the dlievery. Once a delivery becomes updated, it will - # remain so until cleared. - # - # @return [Boolean] Returns if a delivery is updated. - # - # @see #clear - # - proton_caller :updated? - - # @!method clear - # - # Clear the updated flag for a delivery. - # - proton_caller :clear - - # @!attribute [r] pending - # - # @return [Integer] Return the amount of pending message data for the - # delivery. - # - proton_caller :pending - - # @!attribute [r] partial? - # - # @return [Boolean] Returns if the delivery has only partial message data. - # - proton_caller :partial? - - # @!attribute [r] settled? - # - # @return [Boolean] Returns if the delivery is remotely settled. - # - proton_caller :settled? - - # @!attribute [r] aborted? - # - # A delivery can be aborted before it is complete by the remote sender. - # The receiver must ignore the message and discard any partial data. - # - # @return [Boolean] Returns if a delivery is aborted. - # - proton_caller :aborted? - - # Update the state of the delivery - # @param state [Integer] the delivery state, defined in {DeliveryState} - def update(state) Cproton.pn_delivery_update(@impl, state); end - - # Settle a delivery, optionally update state before settling - # A settled delivery can never be used again. - # @param state [Integer] the delivery state, defined in {DeliveryState} - def settle(state = nil) - update(state) unless state.nil? - Cproton.pn_delivery_settle(@impl) - end + # Allow a {Receiver} to indicate the status of a received message to the {Sender} + class Delivery < Transfer + # @return [Receiver] The parent {Receiver} link. + def receiver() link; end # Accept the receiveed message. def accept() settle ACCEPTED; end - # Reject a received message that is considered invalid. + # Reject a received message that is considered invalid and should never + # be delivered again to this or any other receiver. def reject() settle REJECTED; end - # Release a received message making it available to other receivers. - def release(delivered = true) settle(delivered ? MODIFIED : RELEASED); end - - # @!method dump - # - # Utility function for printing details of a delivery. - # - proton_caller :dump - - # @!attribute [r] buffered? - # - # A delivery that is buffered has not yet been written to the wire. - # - # Note that returning false does not imply that a delivery was definitely - # written to the wire. If false is returned, it is not known whether the - # delivery was actually written to the wire or not. - # - # @return [Boolean] Returns if the delivery is buffered. - # - proton_caller :buffered? - - # Returns the local disposition state for the delivery. - # - # @return [Disposition] The local disposition state. - # - def local_state - Cproton.pn_delivery_local_state(@impl) - end - - # Returns the remote disposition state for the delivery. - # - # @return [Disposition] The remote disposition state. - # - def remote_state - Cproton.pn_delivery_remote_state(@impl) - end - - # Returns the parent link. - # - # @return [Link] The parent link. - # - def link - Link.wrap(Cproton.pn_delivery_link(@impl)) - end - - # Returns the parent session. - # - # @return [Session] The session. - # - def session - self.link.session - end - - # Returns the parent connection. - # - # @return [Connection] The connection. - # - def connection - self.session.connection - end - - # Returns the parent transport. - # - # @return [Transport] The transport. - # - def transport - self.connection.transport - end - - # @private - def local_received? - self.local_state == Disposition::RECEIVED - end - - # @private - def remote_received? - self.remote_state == Disposition::RECEIVED - end - - # @private - def local_accepted? - self.local_state == Disposition::ACCEPTED - end - - # @private - def remote_accepted? - self.remote_state == Disposition::ACCEPTED - end - - # @private - def local_rejected? - self.local_state == Disposition::REJECTED - end - - # @private - def remote_rejected? - self.remote_state == Disposition::REJECTED - end - - # @private - def local_released? - self.local_state == Disposition::RELEASED - end - - # @private - def remote_released? - self.remote_state == Disposition::RELEASED + # Release a received message. It may be delivered again to this or another + # receiver. + # + # @option opts [Boolean] :failed (default true) If true + # {Message#delivery_count} will be increased so future receivers will know + # there was a failed delivery. If false, {Message#delivery_count} will not + # be increased. + # + # @option opts [Boolean] :undeliverable (default false) If true the message + # will not be re-delivered to this receiver. It may be delivered tbo other + # receivers. + # + # @option opts [Hash] :annotations Annotations to be added to + # {Message#annotations} before re-delivery. Entries with the same key + # replace existing entries in {Message#annotations} + def release(opts = nil) + opts = { :failed => true } if opts == true # Backwards compatibility + failed = opts ? opts.fetch(:failed, true) : true + undeliverable = opts && opts[:undeliverable] + annotations = opts && opts[:annotations] + if failed || undeliverable || annotations + d = Cproton.pn_delivery_local(@impl) + Cproton.pn_disposition_set_failed(d) if failed + Cproton.pn_disposition_set_undeliverable(d) if undeliverable + Data.from_object(Cproton.pn_disposition_annotations(d), annotations) if annotations + settle(MODIFIED) + else + settle(RELEASED) + end end - # @private - def local_modified? - self.local_state == Disposition::MODIFIED + # @deprecated use {#release} with modification options + def modify() + deprecated __method__, "#release" + release failed=>true end - # @private - def remote_modified? - self.remote_state == Disposition::MODIFIED - end + # @return [Boolean] True if the transfer was aborted by the sender. + proton_caller :aborted? - # @return true if the delivery has a complete incoming message ready to decode - def message? - readable? && !aborted? && !partial? - end + # @return true if the incoming message is complete, call {#message} to retrieve it. + def complete?() readable? && !aborted? && !partial?; end - # Decode the message from the delivery into a new {Message} - # @raise [ProtonError] unless {#message?} + # Get the message from the delivery. + # @raise [ProtonError] if the message is not {#complete?} or there is an + # error decoding the message. def message - if message? - m = Message.new - m.decode(link.receive(pending)) - link.advance - m - else - status = [("not readable" if !readable?), - ("aborted" if aborted?), - ("partial" if partial?)].compact.join(", ") - raise ProtonError, "incoming delivery #{status}" - end + return @message if @message + raise ProtonError("message aborted by sender") if aborted? + raise ProtonError("incoming message incomplete") if partial? + raise ProtonError("no incoming message") unless readable? + @message = Message.new + @message.decode(link.receive(pending)) + link.advance + @message end + end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/disposition.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/disposition.rb b/proton-c/bindings/ruby/lib/core/disposition.rb index 2c12345..38fab6b 100644 --- a/proton-c/bindings/ruby/lib/core/disposition.rb +++ b/proton-c/bindings/ruby/lib/core/disposition.rb @@ -19,40 +19,20 @@ module Qpid::Proton - # States of a delivery - module DeliveryState - # Message was successfully processed by the receiver - ACCEPTED = Cproton::PN_ACCEPTED + # @deprecated use {Delivery} + class Disposition - # Message rejected as invalid and unprocessable by the receiver. + ACCEPTED = Cproton::PN_ACCEPTED REJECTED = Cproton::PN_REJECTED - - # Message was not (and will not be) processed by the receiver, but may be - # acceptable if re-delivered to another receiver RELEASED = Cproton::PN_RELEASED - - # Like released, but the disposition includes modifications to be made to - # the message before re-delivery MODIFIED = Cproton::PN_MODIFIED - - # Partial message data was received, message can be resuemed - used only during link recovery. RECEIVED = Cproton::PN_RECEIVED - end - - # Disposition records the current state and/or final outcome of a transfer. - # - # Every delivery contains both a local and a remote disposition. The local - # disposition holds the local state of the delivery, and the remote - # disposition holds the *last known* remote state of the delivery. - # - class Disposition - - include DeliveryState attr_reader :impl # @private def initialize(impl, local) + deprecated self.class, Delivery @impl = impl @local = local @data = nil @@ -63,6 +43,7 @@ module Qpid::Proton # @private include Util::SwigHelper + # @private PROTON_METHOD_PREFIX = "pn_disposition" @@ -136,7 +117,7 @@ module Qpid::Proton end end - # Sets the condition for the disposition. + # Sets the condition for the disposition. # # @param condition [Codec::Data] The condition. # http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/event.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/event.rb b/proton-c/bindings/ruby/lib/core/event.rb index 136c120..e62214b 100644 --- a/proton-c/bindings/ruby/lib/core/event.rb +++ b/proton-c/bindings/ruby/lib/core/event.rb @@ -71,6 +71,7 @@ module Qpid::Proton @method ||= TYPE_METHODS[Cproton.pn_event_type(@impl)] if @impl end + # Get the context if it is_a?(clazz), else call method on the context def get(clazz, method=nil) (ctx = context).is_a?(clazz) ? ctx : ctx.__send__(method) rescue nil end @@ -98,7 +99,7 @@ module Qpid::Proton # @return [Symbol] method name that this event will call in {#dispatch} attr_accessor :method - alias :type :method + alias type method # @return [Object] the event context object def context; return @context ||= _context; end @@ -125,10 +126,17 @@ module Qpid::Proton def receiver() link if link && link.receiver?; end # @return [Delivery, nil] delivery for this event - def delivery() @delivery ||= get(Delivery); end + def delivery() + return @delivery if @delivery + case context + when Delivery then @delivery = @context + # deprecated: for backwards compat allow a Tracker to be treated as a Delivery + when Tracker then @delivery = Delivery.new(Tracker.impl) + end + end # @return [Tracker, nil] delivery for this event - def tracker() delivery; end + def tracker() @tracker ||= get(Tracker); end # @return [Message, nil] message for this event def message() @message ||= delivery.message if delivery; end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/exceptions.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/exceptions.rb b/proton-c/bindings/ruby/lib/core/exceptions.rb index 75d6552..adef927 100644 --- a/proton-c/bindings/ruby/lib/core/exceptions.rb +++ b/proton-c/bindings/ruby/lib/core/exceptions.rb @@ -121,6 +121,11 @@ module Qpid class Release < ProtonError end + # Raised when a message is aborted. + # + class Aborted < ProtonError + end + end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/link.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/link.rb b/proton-c/bindings/ruby/lib/core/link.rb index 255e8b2..370fb61 100644 --- a/proton-c/bindings/ruby/lib/core/link.rb +++ b/proton-c/bindings/ruby/lib/core/link.rb @@ -208,15 +208,9 @@ module Qpid::Proton # @private def self.wrap(impl) - return nil if impl.nil? - - result = self.fetch_instance(impl, :pn_link_attachments) - return result unless result.nil? - if Cproton.pn_link_is_sender(impl) - return Sender.new(impl) - elsif Cproton.pn_link_is_receiver(impl) - return Receiver.new(impl) - end + return unless impl + return fetch_instance(impl, :pn_link_attachments) || + (Cproton.pn_link_is_sender(impl) ? Sender : Receiver).new(impl) end # @private @@ -294,11 +288,10 @@ module Qpid::Proton self.session.connection end - # Returns the parent delivery. - # - # @return [Delivery] The delivery. - # + + # @deprecated use {Sender#send} def delivery(tag) + deprecated __method__, "Sender#send" Delivery.new(Cproton.pn_delivery(@impl, tag)) end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/message.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/message.rb b/proton-c/bindings/ruby/lib/core/message.rb index 747414c..a39ab09 100644 --- a/proton-c/bindings/ruby/lib/core/message.rb +++ b/proton-c/bindings/ruby/lib/core/message.rb @@ -22,7 +22,7 @@ module Qpid::Proton # Messsage data and headers that can sent or received on a {Link} # # {#body} is the main message content. - # {#properties} is a hash of extra properties that can be attached to the message. + # {#properties} is a {Hash} of extra properties that can be attached to the message. # # @example Create a message containing a Unicode string # msg = Qpid::Proton::Message.new "this is a string" @@ -33,16 +33,6 @@ module Qpid::Proton # class Message - # @private - def proton_send(sender, tag = nil) - dlv = sender.delivery(tag || sender.delivery_tag) - encoded = self.encode - sender.stream(encoded) - sender.advance - dlv.settle if sender.snd_settle_mode == Link::SND_SETTLED - return dlv - end - # Decodes a message from AMQP binary data. # @param encoded [String] the encoded bytes # @return[Integer] the number of bytes consumed @@ -535,8 +525,8 @@ module Qpid::Proton @properties end - # Replaces the entire set of properties with the specified hash. - # + # Use +properties+ as the message properties. + # @param properties [Hash] new properties def properties=(properties) @properties = properties end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/messaging_handler.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/messaging_handler.rb b/proton-c/bindings/ruby/lib/core/messaging_handler.rb index 580a4ab..8ef9a06 100644 --- a/proton-c/bindings/ruby/lib/core/messaging_handler.rb +++ b/proton-c/bindings/ruby/lib/core/messaging_handler.rb @@ -129,9 +129,11 @@ module Qpid::Proton # @param event [Event] The event. # @!method on_released(event) - # Called when the remote peer releases an outgoing message. - # Note that this may be in response to either the RELEASE or - # MODIFIED state as defined by the AMPQ specification. + # Called when the remote peer releases an outgoing message for re-delivery as-is. + # @param event [Event] The event. + + # @!method on_modified(event) + # Called when the remote peer releases an outgoing message for re-delivery with modifications. # @param event [Event] The event. # @!method on_settled(event) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/sender.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/sender.rb b/proton-c/bindings/ruby/lib/core/sender.rb index dddde15..2f45356 100644 --- a/proton-c/bindings/ruby/lib/core/sender.rb +++ b/proton-c/bindings/ruby/lib/core/sender.rb @@ -31,40 +31,40 @@ module Qpid::Proton # @private can_raise_error :stream, :error_class => Qpid::Proton::LinkError - # Signals the availability of deliveries. - # + # Hint to the remote receiver about the number of messages available. + # The receiver may use this to optimize credit flow, or may ignore it. # @param n [Integer] The number of deliveries potentially available. - # def offered(n) Cproton.pn_link_offered(@impl, n) end - # Send a message to the remote endpoint. - # + # TODO aconway 2017-12-05: incompatible, used to return bytes sent. + + # @!method send(message) + # Send a message. # @param message [Message] The message to send. - # @param tag [Object] Optional unique delivery tag, one will be generated if not supplied. - # - # @return [Integer] The number of bytes sent. - # - def send(object, tag = nil) - if object.respond_to? :proton_send - object.proton_send(self, tag) - else - stream(object) + # @return [Tracker] Tracks the outcome of the message. + def send(message, *args) + tag = nil + if args.size > 0 + # deprecated: allow tag in args[0] for backwards compat + raise ArgumentError("too many arguments") if args.size > 1 + tag = args[0] end + tag ||= delivery_tag + t = Tracker.new(Cproton.pn_delivery(@impl, tag)) + Cproton.pn_link_send(@impl, message.encode) + Cproton.pn_link_advance(@impl) + t.settle if snd_settle_mode == SND_SETTLED + return t end - # Send the specified bytes as part of the current delivery. - # - # @param bytes [String] The bytes to send. - # - # @return [Integer] The number of bytes sent. - # + # @deprecated internal use only def stream(bytes) Cproton.pn_link_send(@impl, bytes) end - # Generate a new unique delivery tag for this sender + # @deprecated internal use only def delivery_tag @tag_count ||= 0 result = @tag_count.succ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/tracker.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/tracker.rb b/proton-c/bindings/ruby/lib/core/tracker.rb new file mode 100644 index 0000000..fbecd61 --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/tracker.rb @@ -0,0 +1,41 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +module Qpid::Proton + + # Tracks the status of a sent message. + class Tracker < Transfer + # @return [Sender] The parent {Sender} link. + def sender() link; end + + # If {#state} == {#MODIFIED} this method returns additional information + # about re-delivery from the receiver's call to {Delivery#release} + # + # @return [Hash] See {Delivery#release} options for the meaning of hash entries. + def modified() + return unless (state == MODIFIED) && (d = Cproton.pn_delivery_remote(@impl)) + { + :failed => Cproton.pn_disposition_get_failed(d), + :undeliverable => Cproton.pn_disposition_get_undeliverable(d), + :annotations => Data.to_object(Cproton.pn_disposition_annotations(d)) + } + end + end + +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/transfer.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/transfer.rb b/proton-c/bindings/ruby/lib/core/transfer.rb new file mode 100644 index 0000000..4d640f0 --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/transfer.rb @@ -0,0 +1,123 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +module Qpid::Proton + + # Status of a message transfer on a {Link} + # Common base class for {Tracker} and {Delivery}. + class Transfer + + private + + include Util::Wrapper + include Util::SwigHelper + PROTON_METHOD_PREFIX = "pn_delivery" + + protected + + def self.wrap(impl) + return unless impl + self.fetch_instance(impl, :pn_delivery_attachments) || + (Cproton.pn_link_is_sender(Cproton.pn_delivery_link(impl)) ? Tracker : Delivery).new(impl) + end + + def initialize(impl) + @impl = impl + self.class.store_instance(self, :pn_delivery_attachments) + end + + public + + # States of a transfer + module State + # Message was successfully processed by the receiver + ACCEPTED = Cproton::PN_ACCEPTED + + # Message rejected as invalid and unprocessable by the receiver. + REJECTED = Cproton::PN_REJECTED + + # Message was not (and will not be) processed by the receiver, but may be + # acceptable if re-delivered to another receiver + RELEASED = Cproton::PN_RELEASED + + # Like {RELEASED}, but {Tracker#modified} has modifications to be made to + # the message before re-delivery + MODIFIED = Cproton::PN_MODIFIED + + # Partial message data received. Only used during link recovery. + RECEIVED = Cproton::PN_RECEIVED + end + + include State + + # @return [String] Unique ID for the transfer in the context of the {#link} + def id() Cproton.pn_delivery_tag(@impl); end + + # @deprecated use {#id} + alias tag id + + # @return [Boolean] True if the transfer has is remotely settled. + proton_caller :settled? + + # @return [Integer] Remote state of the transfer, one of the values in {State} + def state() Cproton.pn_delivery_remote_state(@impl); end + + def to_s() Cproton.pn_delivery_dump(@impl); end + alias inspect to_s + + # @return [Link] The parent link. + def link() Link.wrap(Cproton.pn_delivery_link(@impl)); end + + # @return [Session] The parent session. + def session() link.session; end + + # @return [Connection] The parent connection. + def connection() self.session.connection; end + + # @return [Transport] The parent connection's transport. + def transport() self.connection.transport; end + + # @deprecated internal use only + proton_caller :writable? + # @deprecated internal use only + proton_caller :readable? + # @deprecated internal use only + proton_caller :updated? + # @deprecated internal use only + proton_caller :clear + # @deprecated internal use only + proton_caller :pending + # @deprecated internal use only + proton_caller :partial? + # @deprecated internal use only + def update(state) Cproton.pn_delivery_update(@impl, state); end + # @deprecated internal use only + proton_caller :buffered? + # @deprecated internal use only + def local_state() Cproton.pn_delivery_local_state(@impl); end + # @deprecated use {#state} + def remote_state() Cproton.pn_delivery_remote_state(@impl); end + # @deprecated internal use only + def settle(state = nil) + update(state) unless state.nil? + Cproton.pn_delivery_settle(@impl) + end + + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/core/url.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/url.rb b/proton-c/bindings/ruby/lib/core/url.rb index ff49053..6bd7ece 100644 --- a/proton-c/bindings/ruby/lib/core/url.rb +++ b/proton-c/bindings/ruby/lib/core/url.rb @@ -19,11 +19,12 @@ module Qpid::Proton + # @deprecated use {URI} or {String} class URL attr_reader :scheme attr_reader :username - alias :user :username + alias user username attr_reader :password attr_reader :host attr_reader :port @@ -68,7 +69,7 @@ module Qpid::Proton end # @return [String] Allow implicit conversion by {String#try_convert} - alias :to_str :to_s + alias to_str to_s private http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/handler/adapter.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/handler/adapter.rb b/proton-c/bindings/ruby/lib/handler/adapter.rb index 6cf3831..2d9bdc8 100644 --- a/proton-c/bindings/ruby/lib/handler/adapter.rb +++ b/proton-c/bindings/ruby/lib/handler/adapter.rb @@ -105,12 +105,12 @@ module Qpid::Proton::Handler def on_delivery(event) - d = event.delivery - if d.link.receiver? # Incoming message + if event.link.receiver? # Incoming message + d = event.delivery if d.aborted? delegate(:on_aborted, event) d.settle - elsif d.message? + elsif d.complete? if d.link.local_closed? && @opts[:auto_accept] d.release else @@ -128,14 +128,15 @@ module Qpid::Proton::Handler end add_credit(event) else # Outgoing message - if d.updated? - case d.remote_state + t = event.tracker + if t.updated? + case t.remote_state when Qpid::Proton::Delivery::ACCEPTED then delegate(:on_accepted, event) when Qpid::Proton::Delivery::REJECTED then delegate(:on_rejected, event) when Qpid::Proton::Delivery::RELEASED, Qpid::Proton::Delivery::MODIFIED then delegate(:on_released, event) end - delegate(:on_settled, event) if d.settled? - d.settle if @opts[:auto_settle] + delegate(:on_settled, event) if t.settled? + t.settle if @opts[:auto_settle] end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index ab24dfd..f92f2df 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -29,8 +29,8 @@ end DEPRECATION = "[DEPRECATION]" def deprecated(old, new=nil) - repl = new ? ", use `#{new}`" : "with no replacement" - warn "#{DEPRECATION} `#{old}` is deprecated #{repl} (called from #{caller(2).first})" + repl = new ? "use `#{new}`" : "internal use only" + warn "#{DEPRECATION} `#{old}` is deprecated, #{repl} (called from #{caller(2).first})" end # Exception classes @@ -62,7 +62,9 @@ require "core/endpoint" require "core/session" require "core/terminus" require "core/disposition" +require "core/transfer" require "core/delivery" +require "core/tracker" require "core/link" require "core/sender" require "core/receiver" @@ -93,29 +95,24 @@ require "core/connection_driver" require "reactor/container" module Qpid::Proton::Handler - # @deprecated alias for backwards compatibility + # TODO aconway 2017-12-05: replace with back-compatible handler MessagingHandler = Qpid::Proton::MessagingHandler end module Qpid::Proton - Tracker = Delivery - - # @private + private def self.registry @registry ||= {} end - # @private def self.add_to_registry(key, value) self.registry[key] = value end - # @private def self.get_from_registry(key) self.registry[key] end - # @private def self.delete_from_registry(key) self.registry.delete(key) end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/lib/reactor/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/reactor/container.rb b/proton-c/bindings/ruby/lib/reactor/container.rb index 8862133..41b2a88 100644 --- a/proton-c/bindings/ruby/lib/reactor/container.rb +++ b/proton-c/bindings/ruby/lib/reactor/container.rb @@ -23,7 +23,7 @@ module Qpid::Proton::Reactor class Container < Qpid::Proton::Container private - alias :super_connect :connect # Access to superclass method + alias super_connect connect # Access to superclass method public @@ -33,8 +33,8 @@ module Qpid::Proton::Reactor super handlers || (opts && opts[:global_handler]), opts && opts[:container_id] end - alias :container_id :id - alias :global_handler :handler + alias container_id id + alias global_handler handler def connect(opts=nil) url = opts && (opts[:url] || opts[:address]) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0445234/proton-c/bindings/ruby/tests/test_tools.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb index eea0057..b0e98d6 100644 --- a/proton-c/bindings/ruby/tests/test_tools.rb +++ b/proton-c/bindings/ruby/tests/test_tools.rb @@ -106,8 +106,8 @@ class DriverPair < Array server.transport.set_server end - alias :client :first - alias :server :last + alias client first + alias server last # Run till there is nothing to do def run --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org