qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: persistent messages
Date Tue, 15 May 2012 22:18:27 GMT
Out of interest, can I ask what client/broker versions you are using?
It would be useful to target advice accordingly, although my first
suggestion is that you switch to using the 0.16 release candidate (its
under vote and well on course to being declared the final) if you
arent already, currently available at:
http://people.apache.org/~jross/qpid-0.16-rc4/

On 15 May 2012 21:05, Gary Ogden <gogden@gmail.com> wrote:
> So I did some more reading and I now understand why the messages weren't
> being retained. So I need to create a queue, and have that queue created in
> the virtualhosts file so that the broker starts it up. But even after doing
> that, I still have the issue where the messages are getting dropped until
> the consumer starts.
>

The isue here is that you are sending messages to an exchange with a
given routing key, but that key (which is typically the queue name, or
topic name/subject when using the JMS client) has not yet been used as
a binding key to bind a queue to the exchange. In this case the
exchange is unable to route the messages anywhere, and has thus
discarded them (which is logged). When you connect your client, it
must then be causing an appropriate binding to be eastblished. I can
think of 1 bug we resolved some time ago (but it may only now be
included as part of 0.16) that would cause such annoyance when using
queues in the configuration file in this regard, causing them not to
be bound to the nameless 'default' exchange with their name, which is
the binding used by the clients default addressing syntax when
'sending to Queues'. However, I notice your producer code below
appears to be sending to the amq.direct exchange so it may not be
related here.

As an aside, you can create queues in the configuration files, through
the brokers JMX management interface, or using message clients
themselves.

> Here's my VH file:
>
>   <virtualhost>
>   <name>blah</name>
>   <blah>
>   <queue>
>      <name>blahqueue</name>
>      <blahqueue>
>        <exchange>amq.direct</exchange>
>        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
>        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
>        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins
-->
>      </blahqueue>
>   </queue>
>   </blah>
>  </virtualhost>
>

If this is truly all you have in there, your peristent messages will
not be retained when you restart because it defaults to the
MemoryMessageStore. There are two persistent store implementations,
one based on Derby and an optional one based on BerkeleyDB Java
Edition (which is available under the Sleepycat licence, which isnt
compatible with the Apache Licence). You can configure their on a
per-virtualhost basis use by inserting one of the following into a
given virtualhosts configuraton (see
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?revision=1195211&view=co
for context of a full config file), substituting the placeholders I
added:

MemoryMessageStore:
<store>
    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>

DerbyMessageStore:
<store>
    <class>org.apache.qpid.server.store.DerbyMessageStore</class>
    <environment-path>PATH.TO.STORE.FILES/derbystore/VHOST.NAME</environment-path>
</store>

<store>
    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
    <environment-path>PATH.TO.STORE.FILES/bdbstore/VHOST.NAME</environment-path>
</store>

In order to use the last one, you would also need to download BDB JE
from the Oracle website, and place the je-X.Y.Z.jar file into a
<broker.install>/lib/opt/ directory where it will then get picked up.

> Here's my properties file:
>
> java.naming.factory.initial =
> org.apache.qpid.jndi.PropertiesFileInitialContextFactory
> connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid
> /blah?brokerlist='tcp://localhost:5672'
> destination.queueExchange = amq.direct
>

The property above is defining the address of an Exchange, as opposed
to the specific Queue which you suggest above is what you actually
want to send to (and what you are consuming from? It may be helpful to
post the matching consumer code)

The format for the address strings is described fruther at:
http://qpid.apache.org/books/trunk/Programming-In-Apache-Qpid/html/section-addresses.html
but a basic address string which defines a queue named blahqueue and
tells the client never to create the queue (at the point of
establishing a producer or consumer) might be:

destination.queueAddress = blahqueue; {create: never, node: { type: queue } }

> Here's a simple method to produce:
> private void startProducing() {
>        Context context = null;
>        Connection connection = null;
>        try {
>            Properties properties = new Properties();
>
> properties.load(this.getClass().getResourceAsStream("blah.properties"));
>            context = new InitialContext(properties);
>
>            ConnectionFactory connectionFactory = (ConnectionFactory)
> context.lookup("qpidConnectionfactory");
>            connection = connectionFactory.createConnection();
>            connection.start();
>
>            Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>            Destination destination = (AMQAnyDestination)
> context.lookup("queueExchange");
>
>            MessageProducer messageProducer =
> session.createProducer(destination);
>            int count = 0;
>                while (true) {
>                    DataInputStream dis = new DataInputStream(new
> FileInputStream("test.xml"));
>                    try {
>                        File testFile = new File("test.xml");
>                        long len = new File("test.xml").length();
>                        System.out.println("test.xml length = " + len);
>                        byte[] bytes = new byte[(int) len];
>                        dis.readFully(bytes);
>                        for (int i = 0; i <= 100; i++) {
>                            count++;
>                            System.out.println(" [x] Sending test.xml. Time
> = " + System.currentTimeMillis());
>                            BytesMessage message =
> session.createBytesMessage();
>                            message.writeBytes(bytes);
>                            messageProducer.send(message);
>                            System.out.println(" [x] Sent test.xml + " +
> count + ". Time = " + System.currentTimeMillis());
>                        }
>                    } finally {
>                        dis.close();
>                    }
>                    Thread.sleep(4000);
>                }
>            } catch (Exception e) {
>                System.out.println("Exception caught: " + e.getMessage() +
> e.getStackTrace().toString());
>                try {
>                    if (context != null) {
>                        context.close();
>                    }
>                    if (connection != null) {
>                        connection.close();
>                    }
>                } catch (Exception ex) {
>                    System.out.println("Exception caught closing
> connection: " + e.getMessage());
>                }
>
>            }
>        }
>
> What am I missing? It still waits for the consumer and no messages get
> queued.

Regards,
Robbie

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message