qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Praveen M <lefthandma...@gmail.com>
Subject Re: What is the memory footprint of an Apache qpid queue?
Date Wed, 19 Oct 2011 18:57:20 GMT
Hi Robbie,

Thanks for your mail.
I've posted a small code snippet below.(I was advised not to attach code)
This might not compile directly..But you'll certainly get an idea of what
i'm trying to do.

The test basically tries to

1) Connect to qpid
2) Create n number of queues as specified and register a listener for each
queue.
3) enqueue a message for each queue
4) wait until each of these messages are received and quits.

An interesting behavior i saw was, when i have a big number of queues and
run this test,

When the enqueues are pushed, there is a huge peek in memory usage
immediately and then it slowly drops down to a stable number.

I tried a variant of the above test but I put a small sleep in my handler.
In that case, the peak after enqueues doesn't really happen.

I suspect the reason for this is, if the consumer isn't keeping up in pace
with the producer Qpid was stopping to cache messages in memory but defer
everything to disk.
However, if the consumer is keeping pace with the producer it tries to push
as much stuff into memory..

What I'm looking for is ~ is there a way where I can pull some controls to
ensure that Qpid doesn't flood the memory when there is a flood of messages
coming for consumer either fast moving/not. I'm not worried about my
throughput a lot. I'd like to restrict the amount of memory qpid uses when
there is a sudden flood of messages

Awaiting your response.

Praveen



import java.util.*;

import javax.jms.*;
import javax.jms.Queue;
import javax.naming.*;


public class QpidMqTransport {

    private transient Connection connection;
    transient Session session;
    private transient MessageProducer emptyProducer;
    private static String connUrl = "amqp://guest:guest@test
/?brokerlist='tcp://localhost:5672'";

    final String INITIAL_CONTEXT_FACTORY =
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory";

    final String CONNECTION_JNDI_NAME = "local";

    private InitialContext _ctx;
    Map<String, Destination> queueNameToDestination = new HashMap<String,
Destination>();
    Map<String, Destination> topicNameToDestination = new HashMap<String,
Destination>();
    private static String options = ";{create: always , node : {type :
queue, durable : true, max-queue-size : 1000}}";


    @Override
    public String createQueue(String queueName) throws Exception {
       return this.createQueue(queueName, options);
    }

    public String createQueue(String queueName, String options) throws
Exception {
        Destination destination = session.createQueue(queueName + options);
        if (destination != null) {
            queueNameToDestination.put(queueName, destination);
            return queueName;
        } else {
            System.out.println("Queue Created Null");
            return null;
        }
    }


    @Override
    public String enqueue(String p2pConsumer, byte[] payload, boolean
isPersistent) throws Exception {

        BytesMessage message = session.createBytesMessage();
        message.writeBytes(payload);
        return this.enqueue(p2pConsumer, message, isPersistent);
    }

    public String enqueue(String p2pConsumer, BytesMessage message, boolean
isPersistent) throws Exception {
        Destination destination = queueNameToDestination.get(p2pConsumer);
        if (!isPersistent)
emptyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        emptyProducer.send(destination, message);
        return message.getJMSMessageID();
    }

    @Override
    public void listen(String p2pConsumer, QueueHandler handler) throws
Exception {
        Destination destination = queueNameToDestination.get(p2pConsumer);
        MessageConsumer consumer = session.createConsumer(destination);
        QpidMqHandler amqHandler = (QpidMqHandler) handler;
        consumer.setMessageListener(amqHandler);
    }


    @Override
    public void open(boolean useAsyncSend) throws Exception {
     // Set the properties ...
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY,
INITIAL_CONTEXT_FACTORY);
        properties.put("connectionfactory." + CONNECTION_JNDI_NAME,
connUrl);

        try
        {
            _ctx = new InitialContext(properties);
        }
        catch (NamingException e)
        {
            System.err.println("Error Setting up JNDI Context:" + e);
        }

        connection = ((ConnectionFactory)
_ctx.lookup(CONNECTION_JNDI_NAME)).createConnection();

        session = connection.createSession(false,
Session.SESSION_TRANSACTED);
        emptyProducer = session.createProducer(null);
        connection.start();
    }

    @Override
    public void close() throws Exception {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
        if( _ctx != null) {
            _ctx.close();
        }
    }


    public static void main(String args[]) {

       QpidMqTransport transport = new QpidMqTransport();
        transport.open(false);
        int queueCount = 25000;
        int expectedPerQueue = 1;
        String baseQueueName = "TestQueue-";
        Map<String, CountDownLatch> queuesToLatchMap = new HashMap<String,
CountDownLatch>();

        for (int i=0; i<queueCount; i++) {
            queuesToLatchMap.put(baseQueueName + i, new
CountDownLatch(expectedPerQueue));
        }
        List<BaseQueueHandler> handlers =
createQueuesAndRegisterHandlers(transport, queuesToLatchMap);

        // enqueue a message per queue.
        for (int i=0; i<expectedPerQueue; i++) {
            for (String queueName : queueNames) {
                transport.enqueue(queueName, "ABCDEF".getBytes(),
isPersistent));
            }
        }

        // wait for the all handlers to finish.
        for (BaseQueueHandler handler : handlers) {
            handler.latch.await(60, TimeUnit.MINUTES);
        }
        transport.close();
    }

    private List<BaseQueueHandler>
createQueuesAndRegisterHandlers(QpidMqTransport transport, Map<String,
CountDownLatch> queuesToLatchMap) throws Exception {
        List<BaseQueueHandler> handlers = new ArrayList<BaseQueueHandler>
handlers;
        try {
            List<String> queueNames = new ArrayList<String>();
            queueNames.addAll(queuesToLatchMap.keySet());
            // create the queues.
            for (String queueName : queueNames) {
                transport.createQueue(queueName);
            }
            // register the handlers.
            for (Map.Entry<String, CountDownLatch> entry :
queuesToLatchMap.entrySet()) {
                BaseQueueHandler handler =
getQueueHandler(entry.getValue());
                handlers.add(handler);
                transport.listen(entry.getKey(), handler);
            }

        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return handlers;
    }

    private BaseQueueHandler getQueueHandler(CountDownLatch latch) throws
Exception {
        BaseQueueHandler handler = null;
        handler = new BaseQueueHandler(latch);

        return handler;
    }

    public class BaseQueueHandler implements MessageListener {

        final CountDownLatch latch;
        double messageCount = 0;
        double latencyTotal =0;


        public BaseQueueHandler(CountDownLatch suppliedLatch) {
            latch = suppliedLatch;
        }


        public void handleMessage(long enqueueTime, byte[] message) throws
Exception {
            // do something here.
        }


        @Override
        final public void onMessage(Message arg0) {
            BytesMessage bytesMessage = (BytesMessage) arg0;
            byte [] byteArray = new byte[1024];
            try {
                long enqueueTime = bytesMessage.getJMSTimestamp();
                bytesMessage.readBytes(byteArray);
                handleMessage(enqueueTime, byteArray);
            } catch (Exception x) {
                throw new RuntimeException(x);
            }
        }

    }

}






On Mon, Oct 17, 2011 at 2:35 PM, Robbie Gemmell <robbie.gemmell@gmail.com>wrote:

> The individual queues have a very low memory overhead and dont have
> any byte[] buffers that i can recall so those are either for network
> data or more probably the session command buffers. I did fix a few
> issue after 0.12 where those could be retained unecessarily, however
> it depends on what you are doing whether that will be of any help, and
> the fact you see such a signficant difference when using Derby vs
> Memory store suggests it isnt just those at work.
>
> Could you please post the code you are using for your testing so we
> can try to replicate the scenario precisely.
>
> Thanks,
> Robbie
>
> On 14 October 2011 19:48, Praveen M <lefthandmagic@gmail.com> wrote:
> > Actually my bad.
> >
> > I went back and ran the test with MemoryMessageStore, and realized that
> > there is a big jump in memory usage when the messages get enqueued and
> are
> > processed. I realized that I wasn't running out of memory in that case
> > (using MemoryMessageStore) like that of using a DerbyMessageStore as the
> > broker with derby was taking up more memory(to keep the Queue state i
> > suppose).
> >
> > That said,
> >
> > Can someone please tell me what is the memory foot print of an individual
> > queue that was created.
> >
> > What is the max number of queues that you've created on Qpid? and how
> much
> > memory on the broker side will you say that each queue take?
> >
> > I'm bench marking my tests against  qpid client/broker 0.12
> >
> > Also, Can someone please let me know if there are any tweaks which will
> > reduce the queue buffer size? I did a heap dump and saw that a lot of my
> > memory was allocated to a byte[] buffer, which i assume is the queue's
> > buffer..Does anyone know what is the default buffer size? can i change
> that?
> >
> > I use m-Damqj.read_write_pool_size=32 -Dmax_prefetch=1
> >
> > My use case requires the operation of about 20K *persistant* queues in
> > parallel, and I'd like to see reasonable memory usage if all the queues
> have
> > messages to consume. I'm willing to compromise on the throughput if I can
> > save more heap.
> >
> > Thanks a lot,
> > Praveen
> >
> >
> >
> > On Wed, Oct 12, 2011 at 9:22 AM, Robbie Gemmell <
> robbie.gemmell@gmail.com>wrote:
> >
> >> What version of the client/broker were you using in your test? Can you
> >> send a copy of the code you used to reproduce the issue (it will
> >> probably get scraped by the mailing list if you attach it, so just
> >> paste it in).
> >>
> >> Regards,
> >> Robbie
> >>
> >> On 12 October 2011 06:21, Praveen M <lefthandmagic@gmail.com> wrote:
> >> > Thanks for your email. I have earlier run the benchmark with
> >> > MemoryMessageStore.
> >> >
> >> > I was able to go up to 50K queues with the exact same test (a message
> per
> >> > queue), and it took up merely 2.1 GB.
> >> >
> >> > So this seems to be something when I switch to DerbyMessageStore.
> >> >
> >> > Maybe I am missing some setting?
> >> > Or is Derby supposed to perform this way?
> >> >
> >> > Thanks for your help.
> >> >
> >> > Thanks,
> >> > Praveen
> >> >
> >> > On Tue, Oct 11, 2011 at 8:44 PM, Danushka Menikkumbura <
> >> > danushka.menikkumbura@gmail.com> wrote:
> >> >
> >> >> Hi Praveen,
> >> >>
> >> >> Do you notice the same behavior even when you run the broker without
> >> Derby
> >> >> message store?. AFAIK this has nothing to do with the persistence
> >> storage
> >> >> you use.
> >> >>
> >> >> Thanks,
> >> >> Danushka
> >> >>
> >> >> On Wed, Oct 12, 2011 at 4:14 AM, Praveen M <lefthandmagic@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > Hi,
> >> >> >
> >> >> > I'm an apache qpid newbie and am trying to benchmark Qpid Java
> Broker
> >> to
> >> >> > see
> >> >> > if it could be use for one of my usecase.
> >> >> >
> >> >> > My UseCase requires the ability to create atleast 20K persistent
> >> queues
> >> >> and
> >> >> > have them all running in parallel.
> >> >> >
> >> >> > I am using the DerbyMessageStore as I understand that the default
> >> >> > MemoryMessageQueue is not persistant across broker restarts.
> >> >> >
> >> >> > I'm running the broker with a heap of 4GB and options QPID_OPTS
set
> to
> >> >> > -Damqj.read_write_pool_size=32 -Dmax_prefetch=1
> >> >> >
> >> >> >
> >> >> > My test does the following:
> >> >> >
> >> >> > 1) Creates a queue and registers a listener on that queue. I do
> this
> >> upto
> >> >> > 20K times for 20K distinct queues. I create the queues with the
> >> following
> >> >> > option
> >> >> >    create: always , node : {type : queue, durable : true}}
> >> >> >    - this step goes quite fine.  I was monitoring the memory usage
> >> during
> >> >> > this step and it almost always stayed stable around 500-800MB
> >> >> > 2) I produce messages for the queues (one message for each queues)
> and
> >> >> the
> >> >> > messages are consumed by the registered handlers in step 1.
> >> >> >    - When this step starts, the memory usage just shoots up and
> >> exhausts
> >> >> my
> >> >> > 4GB memory all together.
> >> >> >
> >> >> >
> >> >> > Can someone please help me explaining why I am seeing this kind
of
> a
> >> >> > behavior?
> >> >> >
> >> >> > Also, Can you please point out if I'm missing some setting or
doing
> >> >> > something completely wrong/stupid?
> >> >> >
> >> >> >
> >> >> > Thanks,
> >> >> > --
> >> >> > -Praveen
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -Praveen
> >> >
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project:      http://qpid.apache.org
> >> Use/Interact: mailto:users-subscribe@qpid.apache.org
> >>
> >>
> >
> >
> > --
> > -Praveen
> >
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>
>


-- 
-Praveen

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message