Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6716610632 for ; Mon, 22 Dec 2014 17:01:39 +0000 (UTC) Received: (qmail 54581 invoked by uid 500); 22 Dec 2014 17:01:39 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 54549 invoked by uid 500); 22 Dec 2014 17:01:39 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 54518 invoked by uid 99); 22 Dec 2014 17:01:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Dec 2014 17:01:38 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of waynaruna@gmail.com designates 209.85.215.47 as permitted sender) Received: from [209.85.215.47] (HELO mail-la0-f47.google.com) (209.85.215.47) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Dec 2014 17:01:11 +0000 Received: by mail-la0-f47.google.com with SMTP id hz20so4360879lab.34 for ; Mon, 22 Dec 2014 09:01:09 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=7X1WsRePw63zEtKbiQHXS9j1j+7WWRZpXX8KiPrdOwE=; b=mD7LPdfW1Hc+KgJ8l/dnzuICim3DJE0j2l0kBmBmLfP6itl/2Ae7//3Zksv/ubI7Nd 7xJZSBShgDXIE8pobhNC7uiEbb5c4jy+M5JuHbVq9l8Qd52fQFl5Kks0NgMVbqxkO8Q/ 7NhZbLRvoE2GuBoCMk9eKdiHXXuLAGwjbTHrEklWWoyD9EQpkFzEgGJNBj/bgpjJIaZ3 5y+Sp456fgR2bg7faMK71hpF/LKB2RVEjHtR+SEynBFLIE6ERjPTk6TlDDPTb1+M5jRy qi+tTFbdqZc1VKlh2jzbTvbxuIJY2texq4bLHz/liuz9mKKJAd4VYty4N5laFj4UIx2Z nc6A== MIME-Version: 1.0 X-Received: by 10.112.16.129 with SMTP id g1mr23047365lbd.30.1419267669827; Mon, 22 Dec 2014 09:01:09 -0800 (PST) Received: by 10.25.216.217 with HTTP; Mon, 22 Dec 2014 09:01:09 -0800 (PST) In-Reply-To: References: <5493144F.3040900@genome.wustl.edu> Date: Mon, 22 Dec 2014 17:01:09 +0000 Message-ID: Subject: Re: can not create MessageConsumer to RabbitMQ queue using qpid-client-0.30 From: Wayna Runa To: users@qpid.apache.org Content-Type: multipart/alternative; boundary=001a11c3c0524d279c050ad101c9 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c3c0524d279c050ad101c9 Content-Type: text/plain; charset=UTF-8 Hi there! I would like to share my configurations and Hello.java updated with you. I hope this helps someone. ---------[hello.properties]---------- java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # RabbitMQ v3.3.5 connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid /DES_DEV?brokerlist='tcp://10.105.135.53:5672' destination.myJndiDestQueuePublisher1 = BURL:direct://amq.direct//?routingkey='des_rk_queue1' destination.myJndiDestQueueConsumer1 = BURL:direct://amq.direct/des_rk_queue1/des_queue1 ------------------------------------------- -----[helloRabbitMQ.java]----- import java.io.InputStream; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class HelloRabbitMQ { public HelloRabbitMQ() { } public static void main(String[] args) { HelloRabbitMQ hello = new HelloRabbitMQ(); hello.runTest(); } private void runTest() { try (InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties")) { //System.setProperty("qpid.amqp.version", "0-91"); //System.setProperty("qpid.dest_syntax", "BURL"); System.setProperty("IMMEDIATE_PREFETCH", "true"); System.setProperty("qpid.declare_exchanges", "false"); System.setProperty("qpid.declare_queues", "false"); Properties properties = new Properties(); properties.load(resourceAsStream); Context context = new InitialContext(properties); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myRabbitMQConnectionFactory1"); //Connection connection = connectionFactory.createConnection("admin","JRq7b1Ct"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destinationPublisher = (Destination) context.lookup("myJndiDestQueuePublisher1"); Destination destinationConsumer = (Destination) context.lookup("myJndiDestQueueConsumer1"); MessageProducer messageProducer = session.createProducer(destinationPublisher); MessageConsumer messageConsumer = session.createConsumer(destinationConsumer); TextMessage message = session.createTextMessage("Hello RabbitMQ 3.3.5 !!"); messageProducer.setDeliveryMode(Session.AUTO_ACKNOWLEDGE); messageProducer.send(message); messageProducer.close(); Thread.sleep(10000); // 10 secs message = (TextMessage)messageConsumer.receive(); System.out.println(message.getText()); session.close(); connection.close(); context.close(); } catch (Exception exp) { exp.printStackTrace(); } } } ---------------------------------------- Special thanks to Nathan Kunkee for his support. Kind regards. - Wayna Runa On 19 December 2014 at 08:55, Wayna Runa wrote: > > Great Nathan ! > > This is the best gift for me in x-mas ! > > Regards. > > > > On 18 December 2014 at 17:52, Nathan Kunkee > wrote: >> >> On 12/18/2014 10:07 AM, Wayna Runa wrote: >> >>> Hi everybody ! >>> >>> I am using Hello.java sample of qpid-client-0.30 to publish and consume >>> messages from a RabbitMQ queue, but i get this error: >>> >>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". >>> SLF4J: Defaulting to no-operation (NOP) logger implementation >>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for >>> further >>> details. >>> javax.jms.JMSException: Error registering consumer: >>> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED - >>> active=false [error code 540: not implemented] >>> at >>> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2169) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2121) >>> at >>> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport( >>> AMQConnectionDelegate_8_0.java:341) >>> at >>> org.apache.qpid.client.AMQConnection.executeRetrySupport( >>> AMQConnection.java:652) >>> at >>> org.apache.qpid.client.failover.FailoverRetrySupport. >>> execute(FailoverRetrySupport.java:96) >>> at >>> org.apache.qpid.client.AMQSession.createConsumerImpl( >>> AMQSession.java:2119) >>> at >>> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067) >>> at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53) >>> at info.intix.rabbitmq.samples.Hello.main(Hello.java:28) >>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >>> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> sun.reflect.NativeConstructorAccessorImpl.newInstance( >>> NativeConstructorAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance( >>> DelegatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>> at >>> org.apache.qpid.AMQException.cloneForCurrentThread( >>> AMQException.java:126) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> syncWrite(AMQProtocolHandler.java:730) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> syncWrite(AMQProtocolHandler.java:724) >>> at >>> org.apache.qpid.client.AMQSession_0_8.sendConsume( >>> AMQSession_0_8.java:539) >>> at >>> org.apache.qpid.client.AMQSession_0_8.sendConsume( >>> AMQSession_0_8.java:71) >>> at >>> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716) >>> at >>> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143) >>> at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97) >>> at org.apache.qpid.client.AMQSession$4.execute( >>> AMQSession.java:2146) >>> ... 8 more >>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error: >>> NOT_IMPLEMENTED - active=false [error code 540: not implemented] >>> at >>> org.apache.qpid.client.handler.ConnectionCloseMethodHandler. >>> methodReceived(ConnectionCloseMethodHandler.java:92) >>> at >>> org.apache.qpid.client.handler.ClientMethodDispatcherImpl. >>> dispatchConnectionClose(ClientMethodDispatcherImpl.java:197) >>> at >>> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute( >>> ConnectionCloseBodyImpl.java:127) >>> at >>> org.apache.qpid.client.state.AMQStateManager.methodReceived( >>> AMQStateManager.java:116) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived( >>> AMQProtocolHandler.java:533) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived( >>> AMQProtocolSession.java:467) >>> at >>> org.apache.qpid.framing.AMQMethodBodyImpl.handle( >>> AMQMethodBodyImpl.java:99) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> received(AMQProtocolHandler.java:490) >>> at >>> org.apache.qpid.client.protocol.AMQProtocolHandler. >>> received(AMQProtocolHandler.java:118) >>> at >>> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161) >>> at java.lang.Thread.run(Thread.java:744) >>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.-- >>> >>> Seems there is an issue about this here: >>> https://issues.apache.org/jira/browse/QPID-5906 >>> >>> My JNDI config is: >>> >>> connectionfactory.myRabbitMQConnectionFactory1 = >>> amqp://admin:xxx@clientid >>> /DES_DEV?brokerlist='tcp://10.105.135.53:5672' >>> # queueName: des_queue1 >>> destination.myJndiDestQueue1 = >>> BURL:direct://amq.direct//?routingkey='des_rk_queue1' >>> >>> But if I add the next line in Hello.java >>> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and >>> consume the message but with collateral effects: >>> >>> 1) In the RabbitMQ side will be created a TemporalQueue every time that i >>> run Hello.java. >>> 2) The message "read" in the original queue is not removed. >>> >>> My question is: Is there any workaround to create just a MessageConsumer >>> without IMMEDIATE_PREFETCH=true ? >>> >> >> Hi, >> >> No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as >> described in the ticket (+1 for getting it addressed!). Or, convince >> RabbitMQ to implement or provide a plugin for 'active=false' to work as >> expected.... To get things working for me, I also had to use the following >> patch: >> >> Index: src/main/java/org/apache/qpid/client/AMQSession.java >> =================================================================== >> --- src/main/java/org/apache/qpid/client/AMQSession.java >> (revision 1640219) >> +++ src/main/java/org/apache/qpid/client/AMQSession.java (working >> copy) >> @@ -3241,6 +3241,11 @@ >> { >> synchronized (_suspensionLock) >> { >> + if(_immediatePrefetch) >> + { >> + _logger.warn("IMMEDIATE_PREFETCH is in force; no >> channel suspends processed"); >> + return; >> + } >> try >> { >> if (_logger.isDebugEnabled()) >> >> >> I haven't submitted the patch as it is a blunt force attempt to avoid the >> issue rather than fix it. >> >> 1) Your binding URL for consuming messages does not name a queue, so >> RabbitMQ is helpfully making one. I found for Qpid that I ended up with a >> set of JNDI resources for consuming (direct://exchg/route/q) and a set for >> sending (direct://exchg/route/). Note that consuming lists a queue name, >> explicitly from that queue, while sending does not, letting the routing key >> determine the consumers. Consuming worked best with a queue pre-declared >> and pre-bound on the RabbitMQ side. >> >> 2) There is an exception being thrown during your onMessage method, so >> the framework is not sending an ACK for the message. That is the mechanism >> of createSession(false, Session.AUTO_ACK), that the framework ACK's a >> message once the onMessage method returns without error. >> >> Taking into account those two things I think you'll find you can send and >> consume messages just fine. >> >> Hope that helps, >> Nathan >> >> >>> If I can not create a MessageConsumer for RabbitMQ queue, I should try >>> change RabbitMQ for other as Qpid :( >>> >>> Any idea will be welcome. >>> >>> Regards. >>> >>> -- wr >>> >>> >> >> ____ >> This email message is a private communication. The information >> transmitted, including attachments, is intended only for the person or >> entity to which it is addressed and may contain confidential, privileged, >> and/or proprietary material. Any review, duplication, retransmission, >> distribution, or other use of, or taking of any action in reliance upon, >> this information by persons or entities other than the intended recipient >> is unauthorized by the sender and is prohibited. If you have received this >> message in error, please contact the sender immediately by return email and >> delete the original message from all computer systems. Thank you. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org >> For additional commands, e-mail: users-help@qpid.apache.org >> >> > > -- > > -- > Wayna Runa > > +34 668872813 | Skype waynaruna2 > waynaruna@gmail.com | Barcelona, Spain > -- -- Wayna Runa +34 668872813 | Skype waynaruna2 waynaruna@gmail.com | Barcelona, Spain --001a11c3c0524d279c050ad101c9--