qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ringel, Rick" <Rick.Rin...@G3TI.NET>
Subject Getting started with messenger
Date Fri, 09 Jan 2015 21:21:41 GMT
 

Hello qpid experts,

 

I found your project this week, and it looks very applicable to my design
constraints.  I've been working to use the messenger library (with Proton)
version 0.80 to post messages to ActiveMQ.   I've started with the sample
file, send.c, and made some progress.   My overall plan is to use the a
successful delivery status to determine when I can delete a message from a
stream being written to my local disk.

 

In my use case, the link between the client and activeMQ is unreliable,
narrow, and intermittent, which is why the proton engine is so  useful.   I
am using the tracker delivery status to determine when activeMQ has accepted
a message, and I am using the messenger's window feature to determine when
to 'put' another message on the queue.  My code works fine when the activeMQ
application is running, but when I restart that service, messages never flow
again.  Also, I never see delivery status as 'aborted', even though the
window reports no outbound messages are queued up.

 

Is there a document (or sample code) that explains how to ask the proton
engine to reconnect, and to report, via the tracker, when outbound messages
are aborted due to connection failures?

 

I've included my main loop for your reference, but I'm more interested in
documentation/samples than detailed instruction.  I'm still on the learning
curve.  It just isn't obvious how to gather error information at the
messenger API, or what to do with it once I have it.

 

Prior to this fragment,  the code generates all the outbound messages and
stores them in the 'message' array.  As they are accepted by activeMQ,  the
code will 'put' the next message into the window, and keep the 'tracker' and
'window' arrays up to date.   The code never sees the 'work' function return
an error code, but I do see the lower layer report connection errors to
standard error:

 

[0x9b8ecd0]:ERROR amqp:connection:framing-error connection aborted

CONNECTION ERROR connection aborted (remote)

send: Broken pipe

 

 

 

                bool windowEmpty = false;

                while ((nextMessage < MSGCOUNT) || (!windowEmpty))

                {

                                fprintf(stdout,"Window status on entry: %d
of %d filled\n", pn_messenger_outgoing(messenger),
pn_messenger_get_outgoing_window(messenger));

                                windowEmpty = false;

                                int workStatus;

                                while (workStatus =
pn_messenger_work(messenger,10) == 1)

                                {

                                                failCount = 0;

 
//fprintf(stdout,"working...\n");

                                                pn_status_t status;

                                                windowEmpty = true;

                                                for (i=0; i < WINDOWCOUNT;
i++)

                                                {

                                                                status =
pn_messenger_status(messenger,tracker[i]);

                                                                switch
(status)

                                                                {

                                                                case
PN_STATUS_UNKNOWN:

 
break;

                                                                case
PN_STATUS_PENDING:

                                                                case
PN_STATUS_MODIFIED:

 
windowEmpty = false;

 
break;

                                                                case
PN_STATUS_ABORTED:

 
fprintf(stdout,"aborted, retrying %d\n", i);

 
pn_messenger_put(messenger, window[i]);

 
tracker[i] = pn_messenger_outgoing_tracker(messenger);

 
windowEmpty = false;

 
break;

                                                                case
PN_STATUS_ACCEPTED:

                                                                case
PN_STATUS_REJECTED:

                                                                case
PN_STATUS_SETTLED:

                                                                case
PN_STATUS_RELEASED:

 
// Release the completed message

 
pn_messenger_settle(messenger,tracker[i],0);

 
pn_message_free(window[i]);

 
fprintf(stdout,"window position %d settled: %s.  \n", i, statusLit[status]);

 
// Put a new message on the queue, if there is one

 
if (nextMessage < MSGCOUNT)

 
{

 
fprintf(stdout,"Putting message %d\n", nextMessage);

 
window[i] = message[nextMessage++];

 
pn_messenger_put(messenger, window[i]);

 
tracker[i] = pn_messenger_outgoing_tracker(messenger);

 
windowEmpty = false;

 
}

 
break;

                                                                }

                                                }

                                }

                                fprintf(stdout,"Window status on exit: %d of
%d filled, work status:%d\n", pn_messenger_outgoing(messenger),
pn_messenger_get_outgoing_window(messenger),workStatus);

 

                                if(pn_messenger_errno(messenger))


                                {


                                                fprintf(stdout,"%d:%s\n",
pn_messenger_errno(messenger),pn_error_text(pn_messenger_error(messenger)));


                                                switch
(pn_messenger_errno(messenger))

                                                {

                                                                default:

 
;

                                                }

                                }


                }


Mime
View raw message