qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan Kunkee <nkun...@genome.wustl.edu>
Subject Re: can not create MessageConsumer to RabbitMQ queue using qpid-client-0.30
Date Thu, 18 Dec 2014 17:52:15 GMT
On 12/18/2014 10:07 AM, Wayna Runa wrote:
> Hi everybody !
>
> I am using Hello.java sample of qpid-client-0.30 to publish and consume
> messages from a RabbitMQ queue, but i get this error:
>
> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> javax.jms.JMSException: Error registering consumer:
> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED -
> active=false [error code 540: not implemented]
>      at
> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719)
>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2169)
>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2121)
>      at
> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:341)
>      at
> org.apache.qpid.client.AMQConnection.executeRetrySupport(AMQConnection.java:652)
>      at
> org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:96)
>      at
> org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:2119)
>      at
> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067)
>      at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53)
>      at info.intix.rabbitmq.samples.Hello.main(Hello.java:28)
> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>      at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>      at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>      at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>      at
> org.apache.qpid.AMQException.cloneForCurrentThread(AMQException.java:126)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:730)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:724)
>      at
> org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:539)
>      at
> org.apache.qpid.client.AMQSession_0_8.sendConsume(AMQSession_0_8.java:71)
>      at
> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716)
>      at
> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143)
>      at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97)
>      at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:2146)
>      ... 8 more
> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>      at
> org.apache.qpid.client.handler.ConnectionCloseMethodHandler.methodReceived(ConnectionCloseMethodHandler.java:92)
>      at
> org.apache.qpid.client.handler.ClientMethodDispatcherImpl.dispatchConnectionClose(ClientMethodDispatcherImpl.java:197)
>      at
> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(ConnectionCloseBodyImpl.java:127)
>      at
> org.apache.qpid.client.state.AMQStateManager.methodReceived(AMQStateManager.java:116)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(AMQProtocolHandler.java:533)
>      at
> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(AMQProtocolSession.java:467)
>      at
> org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:99)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:490)
>      at
> org.apache.qpid.client.protocol.AMQProtocolHandler.received(AMQProtocolHandler.java:118)
>      at
> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
>      at java.lang.Thread.run(Thread.java:744)
> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
>
> Seems there is an issue about this here:
> https://issues.apache.org/jira/browse/QPID-5906
>
> My JNDI config is:
>
> connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid
> /DES_DEV?brokerlist='tcp://10.105.135.53:5672'
> # queueName: des_queue1
> destination.myJndiDestQueue1 =
> BURL:direct://amq.direct//?routingkey='des_rk_queue1'
>
> But if I add the next line in Hello.java
> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and
> consume the message but with collateral effects:
>
> 1) In the RabbitMQ side will be created a TemporalQueue every time that i
> run Hello.java.
> 2) The message "read" in the original queue is not removed.
>
> My question is: Is there any workaround to create just a MessageConsumer
> without IMMEDIATE_PREFETCH=true ?

Hi,

No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as 
described in the ticket (+1 for getting it addressed!). Or, convince 
RabbitMQ to implement or provide a plugin for 'active=false' to work as 
expected.... To get things working for me, I also had to use the 
following patch:

Index: src/main/java/org/apache/qpid/client/AMQSession.java
===================================================================
--- src/main/java/org/apache/qpid/client/AMQSession.java	(revision 1640219)
+++ src/main/java/org/apache/qpid/client/AMQSession.java	(working copy)
@@ -3241,6 +3241,11 @@
      {
          synchronized (_suspensionLock)
          {
+            if(_immediatePrefetch)
+            {
+                _logger.warn("IMMEDIATE_PREFETCH is in force; no 
channel suspends processed");
+                return;
+            }
              try
              {
                  if (_logger.isDebugEnabled())


I haven't submitted the patch as it is a blunt force attempt to avoid 
the issue rather than fix it.

1) Your binding URL for consuming messages does not name a queue, so 
RabbitMQ is helpfully making one. I found for Qpid that I ended up with 
a set of JNDI resources for consuming (direct://exchg/route/q) and a set 
for sending (direct://exchg/route/). Note that consuming lists a queue 
name, explicitly from that queue, while sending does not, letting the 
routing key determine the consumers. Consuming worked best with a queue 
pre-declared and pre-bound on the RabbitMQ side.

2) There is an exception being thrown during your onMessage method, so 
the framework is not sending an ACK for the message. That is the 
mechanism of createSession(false, Session.AUTO_ACK), that the framework 
ACK's a message once the onMessage method returns without error.

Taking into account those two things I think you'll find you can send 
and consume messages just fine.

Hope that helps,
Nathan

>
> If I can not create a MessageConsumer for RabbitMQ queue, I should try
> change RabbitMQ for other as Qpid    :(
>
> Any idea will be welcome.
>
> Regards.
>
> -- wr
>


____
This email message is a private communication. The information transmitted, including attachments,
is intended only for the person or entity to which it is addressed and may contain confidential,
privileged, and/or proprietary material. Any review, duplication, retransmission, distribution,
or other use of, or taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is unauthorized by the sender and is prohibited.
If you have received this message in error, please contact the sender immediately by return
email and delete the original message from all computer systems. Thank you.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message