qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: persistent messages
Date Wed, 16 May 2012 20:59:13 GMT
On 16 May 2012 18:02, Gary Ogden <gogden@gmail.com> wrote:
> Thanks for all the help. I'm now able to have a persistent queue using
> Derby.  Is it more advantageous in a production environment to use BDB
> instead?
>

The optional BDB based store certainly scales significantly better
with increasing numbers of concurrent producers/consumers due to
optomisations in the store implementation that its API is amenable to,
but it would depend on your throughput requirements whether that was
really beneficial to you. The Derby based store obviously has the
distinction of using a dependency which is compatible with the Apache
Licence, but I'm not actually sure how many use it sees in comparison
to the BDB based store.

> I found details on queue sizes. I see you can specify the max size and
> count of a queue, but what are the defaults if you don't specify? Will it
> just keep queuing them up forever if a consumer never attaches?
>

The Java broker currently has some misleading attribute names, such as
'max message count' and 'max queue depth' which are in fact not
maximums but threshold values at which point it will start alerting
(into the log file, and as JMX notifications on the queue mbeans) if
exceeded. Setting those values in no way limits the queue, and
messages will be queued until you run out of resources to continue
(i.e you run out of disk space for persistent messages, or run out of
heap memory for non-persistent messages).

It is possible to impose upper bounds on queue size by configuring the
flow control related values to block further production onto the
queues if exceeded until such time consumption brings it below a
threshold, but that is a seperate mechanism.

The Java broker does not currently support limiting queue size through
discard policies such as the ring queue support the C++ broker has.

Robbie

>
> On 15 May 2012 19:18, Robbie Gemmell <robbie.gemmell@gmail.com> wrote:
>
>> Out of interest, can I ask what client/broker versions you are using?
>> It would be useful to target advice accordingly, although my first
>> suggestion is that you switch to using the 0.16 release candidate (its
>> under vote and well on course to being declared the final) if you
>> arent already, currently available at:
>> http://people.apache.org/~jross/qpid-0.16-rc4/
>>
>> On 15 May 2012 21:05, Gary Ogden <gogden@gmail.com> wrote:
>> > So I did some more reading and I now understand why the messages weren't
>> > being retained. So I need to create a queue, and have that queue created
>> in
>> > the virtualhosts file so that the broker starts it up. But even after
>> doing
>> > that, I still have the issue where the messages are getting dropped until
>> > the consumer starts.
>> >
>>
>> The isue here is that you are sending messages to an exchange with a
>> given routing key, but that key (which is typically the queue name, or
>> topic name/subject when using the JMS client) has not yet been used as
>> a binding key to bind a queue to the exchange. In this case the
>> exchange is unable to route the messages anywhere, and has thus
>> discarded them (which is logged). When you connect your client, it
>> must then be causing an appropriate binding to be eastblished. I can
>> think of 1 bug we resolved some time ago (but it may only now be
>> included as part of 0.16) that would cause such annoyance when using
>> queues in the configuration file in this regard, causing them not to
>> be bound to the nameless 'default' exchange with their name, which is
>> the binding used by the clients default addressing syntax when
>> 'sending to Queues'. However, I notice your producer code below
>> appears to be sending to the amq.direct exchange so it may not be
>> related here.
>>
>> As an aside, you can create queues in the configuration files, through
>> the brokers JMX management interface, or using message clients
>> themselves.
>>
>> > Here's my VH file:
>> >
>> >   <virtualhost>
>> >   <name>blah</name>
>> >   <blah>
>> >   <queue>
>> >      <name>blahqueue</name>
>> >      <blahqueue>
>> >        <exchange>amq.direct</exchange>
>> >        <maximumQueueDepth>4235264</maximumQueueDepth>  <!--
4Mb -->
>> >        <maximumMessageSize>2117632</maximumMessageSize> <!--
2Mb -->
>> >        <maximumMessageAge>600000</maximumMessageAge>  <!--
10 mins -->
>> >      </blahqueue>
>> >   </queue>
>> >   </blah>
>> >  </virtualhost>
>> >
>>
>> If this is truly all you have in there, your peristent messages will
>> not be retained when you restart because it defaults to the
>> MemoryMessageStore. There are two persistent store implementations,
>> one based on Derby and an optional one based on BerkeleyDB Java
>> Edition (which is available under the Sleepycat licence, which isnt
>> compatible with the Apache Licence). You can configure their on a
>> per-virtualhost basis use by inserting one of the following into a
>> given virtualhosts configuraton (see
>>
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?revision=1195211&view=co
>> for context of a full config file), substituting the placeholders I
>> added:
>>
>> MemoryMessageStore:
>> <store>
>>    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
>> </store>
>>
>> DerbyMessageStore:
>> <store>
>>    <class>org.apache.qpid.server.store.DerbyMessageStore</class>
>>    <environment-path>PATH.TO.STORE.FILES/derbystore/VHOST.NAME
>> </environment-path>
>> </store>
>>
>> <store>
>>    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
>>    <environment-path>PATH.TO.STORE.FILES/bdbstore/VHOST.NAME
>> </environment-path>
>> </store>
>>
>> In order to use the last one, you would also need to download BDB JE
>> from the Oracle website, and place the je-X.Y.Z.jar file into a
>> <broker.install>/lib/opt/ directory where it will then get picked up.
>>
>> > Here's my properties file:
>> >
>> > java.naming.factory.initial =
>> > org.apache.qpid.jndi.PropertiesFileInitialContextFactory
>> > connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid
>> > /blah?brokerlist='tcp://localhost:5672'
>> > destination.queueExchange = amq.direct
>> >
>>
>> The property above is defining the address of an Exchange, as opposed
>> to the specific Queue which you suggest above is what you actually
>> want to send to (and what you are consuming from? It may be helpful to
>> post the matching consumer code)
>>
>> The format for the address strings is described fruther at:
>>
>> http://qpid.apache.org/books/trunk/Programming-In-Apache-Qpid/html/section-addresses.html
>> but a basic address string which defines a queue named blahqueue and
>> tells the client never to create the queue (at the point of
>> establishing a producer or consumer) might be:
>>
>> destination.queueAddress = blahqueue; {create: never, node: { type: queue
>> } }
>>
>> > Here's a simple method to produce:
>> > private void startProducing() {
>> >        Context context = null;
>> >        Connection connection = null;
>> >        try {
>> >            Properties properties = new Properties();
>> >
>> > properties.load(this.getClass().getResourceAsStream("blah.properties"));
>> >            context = new InitialContext(properties);
>> >
>> >            ConnectionFactory connectionFactory = (ConnectionFactory)
>> > context.lookup("qpidConnectionfactory");
>> >            connection = connectionFactory.createConnection();
>> >            connection.start();
>> >
>> >            Session session = connection.createSession(false,
>> > Session.AUTO_ACKNOWLEDGE);
>> >            Destination destination = (AMQAnyDestination)
>> > context.lookup("queueExchange");
>> >
>> >            MessageProducer messageProducer =
>> > session.createProducer(destination);
>> >            int count = 0;
>> >                while (true) {
>> >                    DataInputStream dis = new DataInputStream(new
>> > FileInputStream("test.xml"));
>> >                    try {
>> >                        File testFile = new File("test.xml");
>> >                        long len = new File("test.xml").length();
>> >                        System.out.println("test.xml length = " +
len);
>> >                        byte[] bytes = new byte[(int) len];
>> >                        dis.readFully(bytes);
>> >                        for (int i = 0; i <= 100; i++) {
>> >                            count++;
>> >                            System.out.println(" [x] Sending test.xml.
>> Time
>> > = " + System.currentTimeMillis());
>> >                            BytesMessage message =
>> > session.createBytesMessage();
>> >                            message.writeBytes(bytes);
>> >                            messageProducer.send(message);
>> >                            System.out.println(" [x] Sent test.xml
+ " +
>> > count + ". Time = " + System.currentTimeMillis());
>> >                        }
>> >                    } finally {
>> >                        dis.close();
>> >                    }
>> >                    Thread.sleep(4000);
>> >                }
>> >            } catch (Exception e) {
>> >                System.out.println("Exception caught: " + e.getMessage()
+
>> > e.getStackTrace().toString());
>> >                try {
>> >                    if (context != null) {
>> >                        context.close();
>> >                    }
>> >                    if (connection != null) {
>> >                        connection.close();
>> >                    }
>> >                } catch (Exception ex) {
>> >                    System.out.println("Exception caught closing
>> > connection: " + e.getMessage());
>> >                }
>> >
>> >            }
>> >        }
>> >
>> > What am I missing? It still waits for the consumer and no messages get
>> > queued.
>>
>> Regards,
>> Robbie
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message