qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Praveen M <lefthandma...@gmail.com>
Subject Re: What is the memory footprint of an Apache qpid queue?
Date Thu, 03 Nov 2011 01:26:54 GMT
Hi Robbie,

Sorry for the delayed response. I've attached the file with the test (This
one compiles :D) through which i was able to confirm that the consumer
subscription time is pretty big. The order of 40-70ms per queue
subscription is what I see.

You can get the code below, if the attachment doesn't make it's way
through.

Thanks,
Praveen


import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.lang.System.*;
import java.io.PrintStream.*;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.*;

class QpidListenTest {

    private transient Connection connection;
    transient Session session;
    private transient MessageProducer emptyProducer;

    // The URL Used to connect tot he broker.
    private static String connUrl = "amqp://guest:guest@test
/?brokerlist='tcp://localhost:5672'&maxprefetch='1'";

    final String INITIAL_CONTEXT_FACTORY =
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory";

    final String CONNECTION_JNDI_NAME = "local";

    private InitialContext _ctx;
    Map<String, Destination> queueNameToDestination = new HashMap<String,
Destination>();
    Map<String, Destination> topicNameToDestination = new HashMap<String,
Destination>();

    // the options used when creating a new queue
    private static String options = ";{create: always , node : {type :
queue, durable : true}}";


    public static void main(String[] args) {
        int expectedPerQueue = 1;
        int queueCount = 100;
        String baseQueueName = "Testing-";
        Map<String, CountDownLatch> queuesToLatchMap = new HashMap<String,
CountDownLatch>();
        List<QpidMqHandler> handlers = null;
        QpidListenTest test = null;
        try {
        test = new QpidListenTest();
        // open connection to the broker
        test.open();

        for (int i=0; i<queueCount; i++) {
            queuesToLatchMap.put(baseQueueName + i, new
CountDownLatch(expectedPerQueue));
        }


            // create The queues in the broker
            for (String queueName : queuesToLatchMap.keySet()) {
                test.createQueue(queueName);
            }

            // register the handlers for the queue.
            handlers = new ArrayList<QpidMqHandler>();
            long totalTime = 0;
            for (Map.Entry<String, CountDownLatch> entry :
queuesToLatchMap.entrySet()) {
                QpidListenTest.QpidMqHandler handler = new
QpidMqHandler(entry.getValue());
                handlers.add(handler);
                long startTime = System.currentTimeMillis();
                test.listen(entry.getKey(), handler);
                totalTime += (System.currentTimeMillis() - startTime);
            }
            System.out.println("Average time to subscription for " +
queueCount + " queues  : " + totalTime/queueCount + " milliseconds");
        } catch (Throwable e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        // produce messages
        for (int i=0; i<expectedPerQueue; i++) {
            for (String queueName : queuesToLatchMap.keySet()) {
                byte[] msg = "ABCDEF".getBytes();
                try {
                    test.enqueue(queueName, msg);
                } catch(Throwable e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
        try {
            test.waitOnHandlers(handlers);
        } catch (Exception e) {

        }

    }

    /**
     * Open a new connection to the broker and start it.
     */
    public void open() throws Exception {
     // Set the properties ...
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY,
INITIAL_CONTEXT_FACTORY);
        properties.put("connectionfactory." + CONNECTION_JNDI_NAME,
connUrl);

        try
        {
            _ctx = new InitialContext(properties);
        }
        catch (NamingException e)
        {
            System.err.println("Error Setting up JNDI Context:" + e);
        }
        connection = ((ConnectionFactory)
_ctx.lookup(CONNECTION_JNDI_NAME)).createConnection();

        session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        emptyProducer = session.createProducer(null);
        emptyProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
    }

    /**
     * Creates a new queue and adds it to the destination to queue map.
     *
     */
    public String createQueue(String queueName) throws Exception {
        Destination destination = session.createQueue(queueName + options);
        if (destination != null) {
            queueNameToDestination.put(queueName, destination);
            return queueName;
        } else {
            System.out.println("Queue Created Null");
            return null;
        }
    }


    /**
     * Create a listener for the queue.
     */
    public void listen(String p2pConsumer, QpidMqHandler handler) throws
Exception {
        Destination destination = queueNameToDestination.get(p2pConsumer);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(handler);
    }

    /**
     * Wait for consumers to complete.
     */
    private void waitOnHandlers(List<QpidMqHandler> handlers) throws
Exception {
        for (QpidMqHandler handler : handlers) {
            handler.latch.await(10, TimeUnit.SECONDS);
        }
    }

    /**
     * Enqueue Messages
     */
    public String enqueue(String p2pConsumer, byte[] payload) throws
Exception {
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(payload);
        Destination destination = queueNameToDestination.get(p2pConsumer);
        emptyProducer.send(destination, message);
        return message.getJMSMessageID();
    }


    static public class QpidMqHandler  implements MessageListener {

        final CountDownLatch latch;

        public QpidMqHandler(CountDownLatch suppliedLatch) {
            this.latch = suppliedLatch;
        }

        @Override
        final public void onMessage(Message arg0) {
            BytesMessage bytesMessage = (BytesMessage) arg0;

            byte [] byteArray = new byte[1024];
            try {
                bytesMessage.readBytes(byteArray);
                latch.countDown();
            } catch (Exception x) {
                throw new RuntimeException(x);
            }
        }

    }



}


On Tue, Nov 1, 2011 at 4:35 AM, Robbie Gemmell <robbie.gemmell@gmail.com>wrote:

> On 1 November 2011 00:33, Praveen M <lefthandmagic@gmail.com> wrote:
> > Hi Robbie,
> >
> > I ran my benchmark again. Tried creating about 60K queues and had a
> message
> > going in and coming out.
> > The test just ran so smoothly this time without any glitch. You've sure
> > done some magic in the broker's end :)
> >
> > The heap usage has gone down 'A LOT' like you mentioned.
> >
>
> Good to hear. There is one other issue I know could be addressed to
> reduce heap usage further in the case where you have so many queues in
> use, but I havent finished looking into how I would do that yet and
> the effect will be far far less dramatic than the more trivial change
> I did make.
>
> > I just have one more question (for now),
> >
> > I was trying to find the time it takes to create a consumer and setup and
> > set up a message listener.
> >
> > *QpidMqHandler amqHandler = (QpidMqHandler) handler;*
> > *MessageConsumer consumer = session.createConsumer(destination);*
> > *consumer.setMessageListener(amqHandler);*
> >
> > It seems like the two steps above "createConsumer" and
> "setMessageLister()"
> > takes quite a bit of time. I clocked around 40-50 seconds on an average.
> > Can you please explain why these steps take such a long time?
> >
> > I'm a lil concerned about the time here, as in my use case, I expect my
> > consumer to be registering to about 300-500 queues.
> >
>
> I cant explain that, I have never seen it take remotly close to that
> long and there isnt really an obvious reason it should.
>
> Could you post the full (compilable :P) code you are using to measure this?
>
> > Thanks a lot for pushing in the fixes for the issues that I raised
> earlier.
> >
> > The pace at which you've gone about fixing the bugs has got me really
> > excited about Qpid. Ah..did I tell you "YOU ROCK" btw :)
> >
>
> I really really wanted these things fixed before we branched for the
> next release, so I jumped on them :)
>
> >
> > Thanks a lot to you and all the contributors of Qpid.
> >
> > Praveen
> >
> > On Sun, Oct 30, 2011 at 12:01 PM, Robbie Gemmell
> > <robbie.gemmell@gmail.com>wrote:
> >
> >> I have made a change on trunk that should dramatically reduce the heap
> >> usage observed when using persistent messages with the Derby or BDB
> >> stores.
> >>
> >> On 26 October 2011 00:13, Robbie Gemmell <robbie.gemmell@gmail.com>
> wrote:
> >>
> >> <snip>
> >> > cause it to use significantly more during the same test when using
> >> > either the Derby or BDB perisistent stores.
> >> <snip>
> >> > Robbie
> >> >
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project:      http://qpid.apache.org
> >> Use/Interact: mailto:users-subscribe@qpid.apache.org
> >>
> >>
> >
> >
> > --
> > -Praveen
> >
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
>
>


-- 
-Praveen

Mime
View raw message