qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Arnold <myk...@gmail.com>
Subject How to create a QPID topic exchange that holds messages even then then there is no active receiver
Date Fri, 08 Dec 2017 14:44:13 GMT
Disconnecting the QPID session that creates a durable topic exchange (and
thus closing the only open receiver) seems to delete the topic exchange and
its contents - any messages not yet retrieved from the exchange are lost
and any subsequent messages sent while no receiver is open, are likewise
lost.

Desired behaviour is that topic exchanges marked as durable should survive
session close(); topic exchanges marked as durable should hold messages for
the ttl even then there is no receiver currently open.

The below code:

Part A: Creates a durable topic exchange and receiver; posts some messages
to the exchange and then successfully retrieves them

Part B: Additional messages are then posted; the receiver (and session) is
closed and further messages posted

Part C: Tries, but fails to retrieve any of the messages send in Part B

Since the exchange and messages are marked as durable, the expectation is
that all the messages posted in part B, can be retrieved in part C.

Using:
qpid-cpp-1.36.0
Fedora 27 (4.13.16)

test.cpp---------------------------------------------------------------------------------
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <iostream>

using namespace qpid::messaging;
using namespace std;

void SendMessage(int i) {
    Connection QPIDSenderConnection(string("localhost:5672"));
     try {
         QPIDSenderConnection.setOption("username", string(""));
         QPIDSenderConnection.setOption("password", string(""));
         QPIDSenderConnection.setOption("reconnect", true);
         QPIDSenderConnection.setOption("reconnect_limit", 10);
         QPIDSenderConnection.setOption("reconnect_interval", 5);
         QPIDSenderConnection.setOption("heartbeat", 30);
        QPIDSenderConnection.open();

        Session QPIDSenderSession = QPIDSenderConnection.createSession();

        qpid::types::Variant::Map MessageContentMap;
        MessageContentMap["Value"] = i;

        string SenderAddress = "Topic.Subtopic; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true}}";
        qpid::messaging::Sender QPIDSender =
QPIDSenderSession.createSender(SenderAddress);

        qpid::messaging::Message QPIDMessageToSend;
        QPIDMessageToSend.setSubject(string("Subject"));
        qpid::messaging::encode(MessageContentMap, QPIDMessageToSend);
        QPIDMessageToSend.setDurable(false);
        QPIDMessageToSend.setTtl((qpid::messaging::Duration) 30000);
        QPIDMessageToSend.setContentType("amqp/map");
        QPIDSender.send(QPIDMessageToSend);

        QPIDSenderSession.close();
        QPIDSenderConnection.close();

    } catch(const exception& error) {
        cout << "Failed with: " << error.what() << " - quitting." <<
endl;
    }

}

int main(int argc, char** argv) {

    //Part A: Creates a topic exchange and reciever; posts some messages to
the exchange and then successfully retrieves them
    Connection QPIDConnection(string("localhost:5672"));
     try {
        QPIDConnection.setOption("username", string(""));
        QPIDConnection.setOption("password", string(""));
        QPIDConnection.setOption("reconnect", true);
        QPIDConnection.setOption("reconnect_limit", 10);
        QPIDConnection.setOption("reconnect_interval", 5);
        QPIDConnection.setOption("heartbeat", 30);
         QPIDConnection.open();

         Session QPIDSession = QPIDConnection.createSession();

        string ReceiverAddress = "Topic.Subtopic/Subject; {create: always,
node:{type: topic, durable: true}, link:{reliability: at-least-once,
durable: true}}";
        qpid::messaging::Receiver QPIDReceiver;

        try {
            QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
        } catch(const exception& error) {
            cout << "Unable to connect receiver to the exchange: " <<
error.what();
        }

        cout << "Sending Messages..." << endl;
        for(int i = 0; i < 3; i++)
                SendMessage(i);

        cout << "...now retrieving messages" << endl;
        qpid::messaging::Message QPIDMessage;
        for(int i = 0; i < 3; i++){
                if(QPIDReceiver.fetch(QPIDMessage,
qpid::messaging::Duration::SECOND * 10)) {
                    QPIDSession.acknowledge(QPIDMessage);
                    qpid::types::Variant::Map MessageContent;
                            decode(QPIDMessage, MessageContent);
                    std::cout << "Got message with subject: '" <<
QPIDMessage.getSubject() << "' and content value: " <<
MessageContent["Value"] << endl;
                }
                else
                    cout << "Got nothing" << endl;

        };

        //Part B: Additional messages are then posted; the receiver (and
session) is closed and further messages posted
        cout << "Sending some more messages before we close the session..."
<< endl;
        for(int i = 3; i < 6; i++)
                SendMessage(i);

        //Optional - comment out from here to show that the disconnect and
reconnect is significant
        cout << "Now disconnecting the session that created the topic" <<
endl;
        QPIDSession.close();

        cout << "Sending even more messages after the session is closed..."
<< endl;
        for(int i = 6; i < 9; i++)
                SendMessage(i);

        cout << "...now reconnect the session.." << endl;
         QPIDSession = QPIDConnection.createSession();

        cout << "..and send even more messages after the session is
created, but before the receiver is recreated..." << endl;
        for(int i = 9; i < 12; i++)
                SendMessage(i);

        cout << "...now create the receiver.." << endl;
        try {
            QPIDReceiver = QPIDSession.createReceiver(ReceiverAddress);
        } catch(const exception& error) {
            cout << "Unable to reconnect receiver session to the exchange:
" << error.what();
        }
        //Optional - Comment out till here

        //Part C: Tries, but failes to retrieve any of the messages send in
Part B
        cout << "...try to retrieve the 9 messages aleady sent" << endl;
        for(int i = 0; i < 3; i++){
                if(QPIDReceiver.fetch(QPIDMessage,
qpid::messaging::Duration::SECOND * 10)) {
                    QPIDSession.acknowledge(QPIDMessage);
                    qpid::types::Variant::Map MessageContent;
                            decode(QPIDMessage, MessageContent);
                    std::cout << "Got message with subject: '" <<
QPIDMessage.getSubject() << "' and content: " << MessageContent["Value"] <<
endl;
                }
                else
                    cout << "Got nothing" << endl;
        };

    } catch(const exception& error) {
        cerr << "Failed with: " << error.what() << "- quitting." <<
endl;
    }

    QPIDConnection.close();

    std::cout << "Finished" << std::endl;

}
----------------------------------------------------------------------
Makefile---------------------------------------------------------
CC = g++
CPPFLAGS += -I/usr/include
CFLAGS += -std=c++0x
CFLAGS += -c
CFLAGS += -g
LDFLAGS += -g

LDLIBS += -L/usr/lib64
LDLIBS += -lqpidtypes
LDLIBS += -lqpidmessaging

SOURCES = test.cpp
OBJECTS = $(SOURCES:.cpp=.o)
EXECUTABLE = test

all: $(EXECUTABLE)

$(EXECUTABLE): $(OBJECTS)
    $(CC) $(LDFLAGS) $(OBJECTS) $(LDLIBS) -o $@

.cpp.o:
    $(CC) $(CPPFLAGS) $(CFLAGS) $< -o $@

install:
    @echo "Build complete!"
----------------------------------------------------------------------

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message