qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: persistent messages
Date Wed, 16 May 2012 13:23:52 GMT
They are temporarily available in a staging repo at:
https://repository.apache.org/content/repositories/orgapacheqpid-085/

Once the release vote closes they can be released and will sync up to
the central repo.

Robbie

On 16 May 2012 12:35, Gary Ogden <gogden@gmail.com> wrote:
> I'm using 0.14.
>
> Is the 0.16 client in a maven repo?
>
> On 15 May 2012 19:18, Robbie Gemmell <robbie.gemmell@gmail.com> wrote:
>
>> Out of interest, can I ask what client/broker versions you are using?
>> It would be useful to target advice accordingly, although my first
>> suggestion is that you switch to using the 0.16 release candidate (its
>> under vote and well on course to being declared the final) if you
>> arent already, currently available at:
>> http://people.apache.org/~jross/qpid-0.16-rc4/
>>
>> On 15 May 2012 21:05, Gary Ogden <gogden@gmail.com> wrote:
>> > So I did some more reading and I now understand why the messages weren't
>> > being retained. So I need to create a queue, and have that queue created
>> in
>> > the virtualhosts file so that the broker starts it up. But even after
>> doing
>> > that, I still have the issue where the messages are getting dropped until
>> > the consumer starts.
>> >
>>
>> The isue here is that you are sending messages to an exchange with a
>> given routing key, but that key (which is typically the queue name, or
>> topic name/subject when using the JMS client) has not yet been used as
>> a binding key to bind a queue to the exchange. In this case the
>> exchange is unable to route the messages anywhere, and has thus
>> discarded them (which is logged). When you connect your client, it
>> must then be causing an appropriate binding to be eastblished. I can
>> think of 1 bug we resolved some time ago (but it may only now be
>> included as part of 0.16) that would cause such annoyance when using
>> queues in the configuration file in this regard, causing them not to
>> be bound to the nameless 'default' exchange with their name, which is
>> the binding used by the clients default addressing syntax when
>> 'sending to Queues'. However, I notice your producer code below
>> appears to be sending to the amq.direct exchange so it may not be
>> related here.
>>
>> As an aside, you can create queues in the configuration files, through
>> the brokers JMX management interface, or using message clients
>> themselves.
>>
>> > Here's my VH file:
>> >
>> >   <virtualhost>
>> >   <name>blah</name>
>> >   <blah>
>> >   <queue>
>> >      <name>blahqueue</name>
>> >      <blahqueue>
>> >        <exchange>amq.direct</exchange>
>> >        <maximumQueueDepth>4235264</maximumQueueDepth>  <!--
4Mb -->
>> >        <maximumMessageSize>2117632</maximumMessageSize> <!--
2Mb -->
>> >        <maximumMessageAge>600000</maximumMessageAge>  <!--
10 mins -->
>> >      </blahqueue>
>> >   </queue>
>> >   </blah>
>> >  </virtualhost>
>> >
>>
>> If this is truly all you have in there, your peristent messages will
>> not be retained when you restart because it defaults to the
>> MemoryMessageStore. There are two persistent store implementations,
>> one based on Derby and an optional one based on BerkeleyDB Java
>> Edition (which is available under the Sleepycat licence, which isnt
>> compatible with the Apache Licence). You can configure their on a
>> per-virtualhost basis use by inserting one of the following into a
>> given virtualhosts configuraton (see
>>
>> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?revision=1195211&view=co
>> for context of a full config file), substituting the placeholders I
>> added:
>>
>> MemoryMessageStore:
>> <store>
>>    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
>> </store>
>>
>> DerbyMessageStore:
>> <store>
>>    <class>org.apache.qpid.server.store.DerbyMessageStore</class>
>>    <environment-path>PATH.TO.STORE.FILES/derbystore/VHOST.NAME
>> </environment-path>
>> </store>
>>
>> <store>
>>    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
>>    <environment-path>PATH.TO.STORE.FILES/bdbstore/VHOST.NAME
>> </environment-path>
>> </store>
>>
>> In order to use the last one, you would also need to download BDB JE
>> from the Oracle website, and place the je-X.Y.Z.jar file into a
>> <broker.install>/lib/opt/ directory where it will then get picked up.
>>
>> > Here's my properties file:
>> >
>> > java.naming.factory.initial =
>> > org.apache.qpid.jndi.PropertiesFileInitialContextFactory
>> > connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid
>> > /blah?brokerlist='tcp://localhost:5672'
>> > destination.queueExchange = amq.direct
>> >
>>
>> The property above is defining the address of an Exchange, as opposed
>> to the specific Queue which you suggest above is what you actually
>> want to send to (and what you are consuming from? It may be helpful to
>> post the matching consumer code)
>>
>> The format for the address strings is described fruther at:
>>
>> http://qpid.apache.org/books/trunk/Programming-In-Apache-Qpid/html/section-addresses.html
>> but a basic address string which defines a queue named blahqueue and
>> tells the client never to create the queue (at the point of
>> establishing a producer or consumer) might be:
>>
>> destination.queueAddress = blahqueue; {create: never, node: { type: queue
>> } }
>>
>> > Here's a simple method to produce:
>> > private void startProducing() {
>> >        Context context = null;
>> >        Connection connection = null;
>> >        try {
>> >            Properties properties = new Properties();
>> >
>> > properties.load(this.getClass().getResourceAsStream("blah.properties"));
>> >            context = new InitialContext(properties);
>> >
>> >            ConnectionFactory connectionFactory = (ConnectionFactory)
>> > context.lookup("qpidConnectionfactory");
>> >            connection = connectionFactory.createConnection();
>> >            connection.start();
>> >
>> >            Session session = connection.createSession(false,
>> > Session.AUTO_ACKNOWLEDGE);
>> >            Destination destination = (AMQAnyDestination)
>> > context.lookup("queueExchange");
>> >
>> >            MessageProducer messageProducer =
>> > session.createProducer(destination);
>> >            int count = 0;
>> >                while (true) {
>> >                    DataInputStream dis = new DataInputStream(new
>> > FileInputStream("test.xml"));
>> >                    try {
>> >                        File testFile = new File("test.xml");
>> >                        long len = new File("test.xml").length();
>> >                        System.out.println("test.xml length = " +
len);
>> >                        byte[] bytes = new byte[(int) len];
>> >                        dis.readFully(bytes);
>> >                        for (int i = 0; i <= 100; i++) {
>> >                            count++;
>> >                            System.out.println(" [x] Sending test.xml.
>> Time
>> > = " + System.currentTimeMillis());
>> >                            BytesMessage message =
>> > session.createBytesMessage();
>> >                            message.writeBytes(bytes);
>> >                            messageProducer.send(message);
>> >                            System.out.println(" [x] Sent test.xml
+ " +
>> > count + ". Time = " + System.currentTimeMillis());
>> >                        }
>> >                    } finally {
>> >                        dis.close();
>> >                    }
>> >                    Thread.sleep(4000);
>> >                }
>> >            } catch (Exception e) {
>> >                System.out.println("Exception caught: " + e.getMessage()
+
>> > e.getStackTrace().toString());
>> >                try {
>> >                    if (context != null) {
>> >                        context.close();
>> >                    }
>> >                    if (connection != null) {
>> >                        connection.close();
>> >                    }
>> >                } catch (Exception ex) {
>> >                    System.out.println("Exception caught closing
>> > connection: " + e.getMessage());
>> >                }
>> >
>> >            }
>> >        }
>> >
>> > What am I missing? It still waits for the consumer and no messages get
>> > queued.
>>
>> Regards,
>> Robbie
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message