Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB89BD479 for ; Wed, 12 Dec 2012 02:22:08 +0000 (UTC) Received: (qmail 37640 invoked by uid 500); 12 Dec 2012 02:22:08 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 37614 invoked by uid 500); 12 Dec 2012 02:22:08 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 37605 invoked by uid 99); 12 Dec 2012 02:22:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 02:22:08 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (nike.apache.org: local policy) Received: from [209.85.223.181] (HELO mail-ie0-f181.google.com) (209.85.223.181) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2012 02:22:01 +0000 Received: by mail-ie0-f181.google.com with SMTP id 16so429829iea.26 for ; Tue, 11 Dec 2012 18:21:40 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:x-gm-message-state; bh=rss1tQ5Dp8y6km17u27NeGOzHMVG7yB/2eHG3mi0LiA=; b=KDoKW740BQZwl6MCct7JbZ4QsyxckxiA3Sw5ObiQui0UxO3eDtcdCDj7JLVXa9wc2x Xz5h0jYzELcceSZGAOw+11TiLb6Dj/Xl+Gn5ZmMp1ppX6xR9PObJgAauKd1LOEcnX6m7 V/HmJCoNJuwlwbBd8WlmAWCeC2yn3yeUf9Z2hvBfHcDr8ZPhq3s7XxBY2hDo/Opc/GWt w8sZRSg4g9p4qdt95GzVsaY6JwpZeUkvVCvodJlRmN9P4FisAYm8Go76U/udD0iDrc3w Il9AH+yu0FHqNwA46GoYI1qVhfZCpkEt3cLMOplKqbgWPjv4K6bbvmstsT779A2ion5z R1OA== MIME-Version: 1.0 Received: by 10.50.202.97 with SMTP id kh1mr12140742igc.15.1355278899560; Tue, 11 Dec 2012 18:21:39 -0800 (PST) Received: by 10.50.96.130 with HTTP; Tue, 11 Dec 2012 18:21:39 -0800 (PST) In-Reply-To: References: Date: Tue, 11 Dec 2012 18:21:39 -0800 Message-ID: Subject: Re: Using the same producer multiple times does not work... From: Venkat Rangan To: users@qpid.apache.org Content-Type: multipart/alternative; boundary=f46d044794f96140b304d09e7575 X-Gm-Message-State: ALoCoQmcMFjw6riyBekgvodZ4pWmSP31mVzj+xqRaUKVqtbFtaKTF0yYzEca8v4zrQGOUrZEDpcw X-Virus-Checked: Checked by ClamAV on apache.org --f46d044794f96140b304d09e7575 Content-Type: text/plain; charset=ISO-8859-1 Robbie, Thanks for your prompt response. Closing the consumer did indeed fix the second client consumer. Thanks again... venkat On Tue, Dec 11, 2012 at 3:11 PM, Robbie Gemmell wrote: > Hi Venkat, > > It appears that you are not closing your consumers, so I would guess what > you are seeing is the result of message prefetch. > > The consumer has a prefetch buffer for messages (defaults to 500 but is > configurable, see > http://qpid.apache.org/books/0.18/Programming-In-Apache-Qpid/html/ for > details) such that the broker is able to send messages to the consumer so > they can be available client side in advance of receive() being called, > which increases performance. By leaving your consumer open, it is possible > that the second test execution whicih promted the second message published > onto the queue could actually be delivered to the first client by the > broker, which would lead to the second consumer awaiting the arrival of a > message in its [unbounded time limit] recieve() call whereas the first > consumer could have had recieve() called on it and return the message > immediately. Closing the consumer if you are not going to use it again will > lead to prefetched messages becoming available for delivery to other > clients if that is the case. > > Regarding 'committing the send instead of closing', that shouldnt be having > any effect on whether your consumer actually recieves the message here, but > it sounds like what you want to do is use transacted sessions for your > producers which would be achieved by using the session creation code: > connection.createSession(true, > Session.SESSION_TRANSACTED), and then using session.commit() to complete > the session transaction after each unit of work is performed. Note that > this might mean you want to use a different sesion for your producer and > consumer so that you can commit their actions independently of each other > (unless you for example wanted to 'atomically' consumer a message and send > a reply based on it, in which case you would want to use a single session). > > Also, you shouldnt use the AMQAnyDestination object in your code as it ties > your code to an underlying client implementation object that is subject to > change. You can use the JMS API to create your Destination objects, e.g > calling session.createQueue(..) and supplying the address string there > instead. > > Robbie > > On 11 December 2012 19:15, Venkat Rangan > wrote: > > > Hi, > > > > > > I have a situation where I reuse the same destination queue for multiple > > producers. What I am finding is that if I do this, the second instance > > message is stuck and is not available for the MessageConsumer. Assume > that > > there is a connection that is started. I would have expected both test1() > > and test2() below to succeed. What I observe is that the test2() is stuck > > on the consumer.receive(). Also, is there a way to "commit" the send > > without a producer.close() and would that make the second instance of the > > test below also have the receive() pull out the message? > > > > > > Thanks! > > > > > > public void test1() { > > > > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE > > );; > > > > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create: > > always}" > > ); > > > > MessageProducer producer = session.createProducer(queue); > > > > TextMessage msg = session.createTextMessage("hello, world-1"); > > > > producer.send(msg); > > > > producer.close(); > > > > MessageConsumer consumer = session.createConsumer(queue); > > > > TextMessage recvdMessage = (TextMessage)consumer.receive(); > > > > System.out.println("Received: " + recvdMessage.getText()); > > > > } > > > > > > public void test2() { > > > > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE > > );; > > > > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create: > > always}" > > ); > > > > MessageProducer producer = session.createProducer(queue); > > > > TextMessage msg = session.createTextMessage("hello, world-2"); > > > > producer.send(msg); > > > > producer.close(); > > > > MessageConsumer consumer = session.createConsumer(queue); > > > > TextMessage recvdMessage = (TextMessage)consumer.receive(); > > > > System.out.println("Received: " + recvdMessage.getText()); > > > > } > > > > Thanks! > > > --f46d044794f96140b304d09e7575--