qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adel Boutros <Adelbout...@live.com>
Subject Re: [Proton-c] [C++ bindings][0.14.0] How do I know a message was settled by the receiver?
Date Thu, 17 Nov 2016 08:41:58 GMT
I would like to add a bit of context. My C++ client has a method called "receiveTextMessage()".
This method is supposed to simulate what JMS.receiveMessage() does which is block until a
message is received and settled.


So, I am using wait/notify mechanism to unblock my consumer when the message is settled and
no longer available on the broker.

It seems in the code, when the auto_accept is set to true (default behavior), there is no
way to be notified when the delivery is settled and only on_message will be called (proton::messaging_adapter.cpp).

In that case, between the time MyHandler::on_message is called and the time where the message
is settled (d.accept() in the below code), can't anything go wrong?


PS: As the event_loop has performance issues, I am using the timerTask and schedule for the
interaction between the consumer main thread and the handler thread.


messaging_adapter.cpp Code block myHandler is reaching

-----------------------------------------------------------------------------


delegate_.on_message(d, msg);
                if (lctx.auto_accept && !d.settled())
                    d.accept(); //This is reached as well



messaging_adapter.cpp Full block code

-----------------------------------------------------------------------------

if (pn_link_is_receiver(lnk)) {
        delivery d(make_wrapper<delivery>(dlv));
        if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
            // generate on_message
            pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
            connection_context& ctx = connection_context::get(pnc);
            // Reusable per-connection message.
            // Avoid expensive heap malloc/free overhead.
            // See PROTON-998
            class message &msg(ctx.event_message);
            msg.decode(d);
            if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
                if (lctx.auto_accept)
                    d.release();
            } else {
                delegate_.on_message(d, msg);
                if (lctx.auto_accept && !d.settled())
                    d.accept();
                if (lctx.draining && !pn_link_credit(lnk)) {
                    lctx.draining = false;
                    receiver r(make_wrapper<receiver>(lnk));
                    delegate_.on_receiver_drain_finish(r);
                }
            }
        }
        else if (pn_delivery_updated(dlv) && d.settled()) {
            delegate_.on_delivery_settle(d);
        }
        if (lctx.draining && pn_link_credit(lnk) == 0) {
            lctx.draining = false;
            pn_link_set_drain(lnk, false);
            receiver r(make_wrapper<receiver>(lnk));
            delegate_.on_receiver_drain_finish(r);
            if (lctx.pending_credit) {
                pn_link_flow(lnk, lctx.pending_credit);
                lctx.pending_credit = 0;
            }
        }
        credit_topup(lnk);
    }


________________________________
From: Adel Boutros <Adelboutros@live.com>
Sent: Wednesday, November 16, 2016 7:08:56 PM
To: users@qpid.apache.org
Subject: [Proton-c] [C++ bindings][0.14.0] How do I know a message was settled by the receiver?

Hello,


I have a simple C++ client which connects to a Qpid Java Broker (6.0.4) and receives messages.

With the default behavior of the C++ client, how can I know when a delivery has been settled?

It seems there is no way for me to know that.


I have debugged a bit and when the on_message is called, the message becomes in "Acquired"
state on the broker. Later on, somewhere which I haven't found yet, the message is settled
and disappears from the broker. However none of the functions I implemented below is called.


Can you please assist?


Console output

-----------------------

on_message


Code

-----------------------


class MyHandler : public proton::messaging_handler
{
public:

   virtual void on_container_start(proton::container &c)
   {
      proton::url url("localhost:5672");
      proton::connection connection = c.connect(url);
      proton::receiver_options receiverOptions(proton::receiver_options().credit_window(0));
      connection.open_receiver("queue.name", receiverOptions);
   }

   virtual void on_receiver_open(proton::receiver& l)
   {
      l.add_credit(1);
   }

   virtual void on_message(proton::delivery &d, proton::message &m)
   {
      std::cout << "on_message" << std::endl;
   }
   virtual void on_delivery_settle(proton::delivery &d)
   {
      std::cout << "on_delivery_settle" << std::endl;
   }
   virtual void on_tracker_accept(proton::tracker &d)
   {
      std::cout << "on_tracker_accept" << std::endl;
   }

   virtual void on_tracker_reject(proton::tracker &d)
   {
      std::cout << "on_tracker_reject" << std::endl;
   }

   virtual void on_tracker_release(proton::tracker &d)
   {
      std::cout << "on_tracker_release" << std::endl;
   }

   virtual void on_tracker_settle(proton::tracker &d)
   {
      std::cout << "on_tracker_settle" << std::endl;
   }
};



Regards,

Adel

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message