qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Ogden <gog...@gmail.com>
Subject Re: persistent messages
Date Wed, 16 May 2012 11:35:31 GMT
I'm using 0.14.

Is the 0.16 client in a maven repo?

On 15 May 2012 19:18, Robbie Gemmell <robbie.gemmell@gmail.com> wrote:

> Out of interest, can I ask what client/broker versions you are using?
> It would be useful to target advice accordingly, although my first
> suggestion is that you switch to using the 0.16 release candidate (its
> under vote and well on course to being declared the final) if you
> arent already, currently available at:
> http://people.apache.org/~jross/qpid-0.16-rc4/
>
> On 15 May 2012 21:05, Gary Ogden <gogden@gmail.com> wrote:
> > So I did some more reading and I now understand why the messages weren't
> > being retained. So I need to create a queue, and have that queue created
> in
> > the virtualhosts file so that the broker starts it up. But even after
> doing
> > that, I still have the issue where the messages are getting dropped until
> > the consumer starts.
> >
>
> The isue here is that you are sending messages to an exchange with a
> given routing key, but that key (which is typically the queue name, or
> topic name/subject when using the JMS client) has not yet been used as
> a binding key to bind a queue to the exchange. In this case the
> exchange is unable to route the messages anywhere, and has thus
> discarded them (which is logged). When you connect your client, it
> must then be causing an appropriate binding to be eastblished. I can
> think of 1 bug we resolved some time ago (but it may only now be
> included as part of 0.16) that would cause such annoyance when using
> queues in the configuration file in this regard, causing them not to
> be bound to the nameless 'default' exchange with their name, which is
> the binding used by the clients default addressing syntax when
> 'sending to Queues'. However, I notice your producer code below
> appears to be sending to the amq.direct exchange so it may not be
> related here.
>
> As an aside, you can create queues in the configuration files, through
> the brokers JMX management interface, or using message clients
> themselves.
>
> > Here's my VH file:
> >
> >   <virtualhost>
> >   <name>blah</name>
> >   <blah>
> >   <queue>
> >      <name>blahqueue</name>
> >      <blahqueue>
> >        <exchange>amq.direct</exchange>
> >        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
> >        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb
-->
> >        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins
-->
> >      </blahqueue>
> >   </queue>
> >   </blah>
> >  </virtualhost>
> >
>
> If this is truly all you have in there, your peristent messages will
> not be retained when you restart because it defaults to the
> MemoryMessageStore. There are two persistent store implementations,
> one based on Derby and an optional one based on BerkeleyDB Java
> Edition (which is available under the Sleepycat licence, which isnt
> compatible with the Apache Licence). You can configure their on a
> per-virtualhost basis use by inserting one of the following into a
> given virtualhosts configuraton (see
>
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?revision=1195211&view=co
> for context of a full config file), substituting the placeholders I
> added:
>
> MemoryMessageStore:
> <store>
>    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
> </store>
>
> DerbyMessageStore:
> <store>
>    <class>org.apache.qpid.server.store.DerbyMessageStore</class>
>    <environment-path>PATH.TO.STORE.FILES/derbystore/VHOST.NAME
> </environment-path>
> </store>
>
> <store>
>    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
>    <environment-path>PATH.TO.STORE.FILES/bdbstore/VHOST.NAME
> </environment-path>
> </store>
>
> In order to use the last one, you would also need to download BDB JE
> from the Oracle website, and place the je-X.Y.Z.jar file into a
> <broker.install>/lib/opt/ directory where it will then get picked up.
>
> > Here's my properties file:
> >
> > java.naming.factory.initial =
> > org.apache.qpid.jndi.PropertiesFileInitialContextFactory
> > connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid
> > /blah?brokerlist='tcp://localhost:5672'
> > destination.queueExchange = amq.direct
> >
>
> The property above is defining the address of an Exchange, as opposed
> to the specific Queue which you suggest above is what you actually
> want to send to (and what you are consuming from? It may be helpful to
> post the matching consumer code)
>
> The format for the address strings is described fruther at:
>
> http://qpid.apache.org/books/trunk/Programming-In-Apache-Qpid/html/section-addresses.html
> but a basic address string which defines a queue named blahqueue and
> tells the client never to create the queue (at the point of
> establishing a producer or consumer) might be:
>
> destination.queueAddress = blahqueue; {create: never, node: { type: queue
> } }
>
> > Here's a simple method to produce:
> > private void startProducing() {
> >        Context context = null;
> >        Connection connection = null;
> >        try {
> >            Properties properties = new Properties();
> >
> > properties.load(this.getClass().getResourceAsStream("blah.properties"));
> >            context = new InitialContext(properties);
> >
> >            ConnectionFactory connectionFactory = (ConnectionFactory)
> > context.lookup("qpidConnectionfactory");
> >            connection = connectionFactory.createConnection();
> >            connection.start();
> >
> >            Session session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >            Destination destination = (AMQAnyDestination)
> > context.lookup("queueExchange");
> >
> >            MessageProducer messageProducer =
> > session.createProducer(destination);
> >            int count = 0;
> >                while (true) {
> >                    DataInputStream dis = new DataInputStream(new
> > FileInputStream("test.xml"));
> >                    try {
> >                        File testFile = new File("test.xml");
> >                        long len = new File("test.xml").length();
> >                        System.out.println("test.xml length = " + len);
> >                        byte[] bytes = new byte[(int) len];
> >                        dis.readFully(bytes);
> >                        for (int i = 0; i <= 100; i++) {
> >                            count++;
> >                            System.out.println(" [x] Sending test.xml.
> Time
> > = " + System.currentTimeMillis());
> >                            BytesMessage message =
> > session.createBytesMessage();
> >                            message.writeBytes(bytes);
> >                            messageProducer.send(message);
> >                            System.out.println(" [x] Sent test.xml + " +
> > count + ". Time = " + System.currentTimeMillis());
> >                        }
> >                    } finally {
> >                        dis.close();
> >                    }
> >                    Thread.sleep(4000);
> >                }
> >            } catch (Exception e) {
> >                System.out.println("Exception caught: " + e.getMessage() +
> > e.getStackTrace().toString());
> >                try {
> >                    if (context != null) {
> >                        context.close();
> >                    }
> >                    if (connection != null) {
> >                        connection.close();
> >                    }
> >                } catch (Exception ex) {
> >                    System.out.println("Exception caught closing
> > connection: " + e.getMessage());
> >                }
> >
> >            }
> >        }
> >
> > What am I missing? It still waits for the consumer and no messages get
> > queued.
>
> Regards,
> Robbie
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message