qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wayna Runa <waynar...@gmail.com>
Subject Re: can not create MessageConsumer to RabbitMQ queue using qpid-client-0.30
Date Mon, 22 Dec 2014 17:01:09 GMT
Hi there!

I would like to share my configurations and Hello.java updated with you.
I hope this helps someone.

---------[hello.properties]----------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory

# RabbitMQ v3.3.5
connectionfactory.myRabbitMQConnectionFactory1 = amqp://admin:xxx@clientid
/DES_DEV?brokerlist='tcp://10.105.135.53:5672'
destination.myJndiDestQueuePublisher1 =
BURL:direct://amq.direct//?routingkey='des_rk_queue1'
destination.myJndiDestQueueConsumer1 =
BURL:direct://amq.direct/des_rk_queue1/des_queue1
-------------------------------------------

-----[helloRabbitMQ.java]-----
import java.io.InputStream;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class HelloRabbitMQ
{

    public HelloRabbitMQ()
    {
    }

    public static void main(String[] args)
    {
        HelloRabbitMQ hello = new HelloRabbitMQ();
        hello.runTest();
    }

    private void runTest()
    {
        try (InputStream resourceAsStream =
this.getClass().getResourceAsStream("hello.properties"))
        {
            //System.setProperty("qpid.amqp.version", "0-91");
            //System.setProperty("qpid.dest_syntax", "BURL");
            System.setProperty("IMMEDIATE_PREFETCH", "true");
            System.setProperty("qpid.declare_exchanges", "false");
            System.setProperty("qpid.declare_queues", "false");

            Properties properties = new Properties();
            properties.load(resourceAsStream);

            Context context = new InitialContext(properties);

            ConnectionFactory connectionFactory = (ConnectionFactory)
context.lookup("myRabbitMQConnectionFactory1");
            //Connection connection =
connectionFactory.createConnection("admin","JRq7b1Ct");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

            Destination destinationPublisher = (Destination)
context.lookup("myJndiDestQueuePublisher1");
            Destination destinationConsumer = (Destination)
context.lookup("myJndiDestQueueConsumer1");

            MessageProducer messageProducer =
session.createProducer(destinationPublisher);
            MessageConsumer messageConsumer =
session.createConsumer(destinationConsumer);

            TextMessage message = session.createTextMessage("Hello RabbitMQ
3.3.5 !!");
            messageProducer.setDeliveryMode(Session.AUTO_ACKNOWLEDGE);
            messageProducer.send(message);
            messageProducer.close();

            Thread.sleep(10000);  // 10 secs

            message = (TextMessage)messageConsumer.receive();
            System.out.println(message.getText());

            session.close();
            connection.close();
            context.close();
        }
        catch (Exception exp)
        {
            exp.printStackTrace();
        }
    }
}
----------------------------------------


Special thanks to Nathan Kunkee for his support.

Kind regards.

- Wayna Runa



On 19 December 2014 at 08:55, Wayna Runa <waynaruna@gmail.com> wrote:
>
> Great Nathan !
>
> This is the best gift for me in x-mas !
>
> Regards.
>
>
>
> On 18 December 2014 at 17:52, Nathan Kunkee <nkunkee@genome.wustl.edu>
> wrote:
>>
>> On 12/18/2014 10:07 AM, Wayna Runa wrote:
>>
>>> Hi everybody !
>>>
>>> I am using Hello.java sample of qpid-client-0.30 to publish and consume
>>> messages from a RabbitMQ queue, but i get this error:
>>>
>>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
>>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further
>>> details.
>>> javax.jms.JMSException: Error registering consumer:
>>> org.apache.qpid.AMQConnectionClosedException: Error: NOT_IMPLEMENTED -
>>> active=false [error code 540: not implemented]
>>>      at
>>> org.apache.qpid.client.AMQSession.toJMSException(AMQSession.java:3719)
>>>      at org.apache.qpid.client.AMQSession$4.execute(
>>> AMQSession.java:2169)
>>>      at org.apache.qpid.client.AMQSession$4.execute(
>>> AMQSession.java:2121)
>>>      at
>>> org.apache.qpid.client.AMQConnectionDelegate_8_0.executeRetrySupport(
>>> AMQConnectionDelegate_8_0.java:341)
>>>      at
>>> org.apache.qpid.client.AMQConnection.executeRetrySupport(
>>> AMQConnection.java:652)
>>>      at
>>> org.apache.qpid.client.failover.FailoverRetrySupport.
>>> execute(FailoverRetrySupport.java:96)
>>>      at
>>> org.apache.qpid.client.AMQSession.createConsumerImpl(
>>> AMQSession.java:2119)
>>>      at
>>> org.apache.qpid.client.AMQSession.createConsumer(AMQSession.java:1067)
>>>      at info.intix.rabbitmq.samples.Hello.runTest(Hello.java:53)
>>>      at info.intix.rabbitmq.samples.Hello.main(Hello.java:28)
>>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
>>> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>>>      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>      at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(
>>> NativeConstructorAccessorImpl.java:57)
>>>      at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>>> DelegatingConstructorAccessorImpl.java:45)
>>>      at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>      at
>>> org.apache.qpid.AMQException.cloneForCurrentThread(
>>> AMQException.java:126)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>>> writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:693)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>>> syncWrite(AMQProtocolHandler.java:730)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>>> syncWrite(AMQProtocolHandler.java:724)
>>>      at
>>> org.apache.qpid.client.AMQSession_0_8.sendConsume(
>>> AMQSession_0_8.java:539)
>>>      at
>>> org.apache.qpid.client.AMQSession_0_8.sendConsume(
>>> AMQSession_0_8.java:71)
>>>      at
>>> org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2716)
>>>      at
>>> org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:3143)
>>>      at org.apache.qpid.client.AMQSession.access$400(AMQSession.java:97)
>>>      at org.apache.qpid.client.AMQSession$4.execute(
>>> AMQSession.java:2146)
>>>      ... 8 more
>>> Caused by: org.apache.qpid.AMQConnectionClosedException: Error:
>>> NOT_IMPLEMENTED - active=false [error code 540: not implemented]
>>>      at
>>> org.apache.qpid.client.handler.ConnectionCloseMethodHandler.
>>> methodReceived(ConnectionCloseMethodHandler.java:92)
>>>      at
>>> org.apache.qpid.client.handler.ClientMethodDispatcherImpl.
>>> dispatchConnectionClose(ClientMethodDispatcherImpl.java:197)
>>>      at
>>> org.apache.qpid.framing.amqp_0_91.ConnectionCloseBodyImpl.execute(
>>> ConnectionCloseBodyImpl.java:127)
>>>      at
>>> org.apache.qpid.client.state.AMQStateManager.methodReceived(
>>> AMQStateManager.java:116)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.methodBodyReceived(
>>> AMQProtocolHandler.java:533)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolSession.methodFrameReceived(
>>> AMQProtocolSession.java:467)
>>>      at
>>> org.apache.qpid.framing.AMQMethodBodyImpl.handle(
>>> AMQMethodBodyImpl.java:99)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>>> received(AMQProtocolHandler.java:490)
>>>      at
>>> org.apache.qpid.client.protocol.AMQProtocolHandler.
>>> received(AMQProtocolHandler.java:118)
>>>      at
>>> org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
>>>      at java.lang.Thread.run(Thread.java:744)
>>> --.--.--.--.--.--.--.--.--.--.--.--.--.--.--.--
>>>
>>> Seems there is an issue about this here:
>>> https://issues.apache.org/jira/browse/QPID-5906
>>>
>>> My JNDI config is:
>>>
>>> connectionfactory.myRabbitMQConnectionFactory1 =
>>> amqp://admin:xxx@clientid
>>> /DES_DEV?brokerlist='tcp://10.105.135.53:5672'
>>> # queueName: des_queue1
>>> destination.myJndiDestQueue1 =
>>> BURL:direct://amq.direct//?routingkey='des_rk_queue1'
>>>
>>> But if I add the next line in Hello.java
>>> (System.setProperty("IMMEDIATE_PREFETCH", "true")) I can publish and
>>> consume the message but with collateral effects:
>>>
>>> 1) In the RabbitMQ side will be created a TemporalQueue every time that i
>>> run Hello.java.
>>> 2) The message "read" in the original queue is not removed.
>>>
>>> My question is: Is there any workaround to create just a MessageConsumer
>>> without IMMEDIATE_PREFETCH=true ?
>>>
>>
>> Hi,
>>
>> No, IMMEDIATE_PREFETCH is required to interoperate with RabbitMQ as
>> described in the ticket (+1 for getting it addressed!). Or, convince
>> RabbitMQ to implement or provide a plugin for 'active=false' to work as
>> expected.... To get things working for me, I also had to use the following
>> patch:
>>
>> Index: src/main/java/org/apache/qpid/client/AMQSession.java
>> ===================================================================
>> --- src/main/java/org/apache/qpid/client/AMQSession.java
>> (revision 1640219)
>> +++ src/main/java/org/apache/qpid/client/AMQSession.java        (working
>> copy)
>> @@ -3241,6 +3241,11 @@
>>      {
>>          synchronized (_suspensionLock)
>>          {
>> +            if(_immediatePrefetch)
>> +            {
>> +                _logger.warn("IMMEDIATE_PREFETCH is in force; no
>> channel suspends processed");
>> +                return;
>> +            }
>>              try
>>              {
>>                  if (_logger.isDebugEnabled())
>>
>>
>> I haven't submitted the patch as it is a blunt force attempt to avoid the
>> issue rather than fix it.
>>
>> 1) Your binding URL for consuming messages does not name a queue, so
>> RabbitMQ is helpfully making one. I found for Qpid that I ended up with a
>> set of JNDI resources for consuming (direct://exchg/route/q) and a set for
>> sending (direct://exchg/route/). Note that consuming lists a queue name,
>> explicitly from that queue, while sending does not, letting the routing key
>> determine the consumers. Consuming worked best with a queue pre-declared
>> and pre-bound on the RabbitMQ side.
>>
>> 2) There is an exception being thrown during your onMessage method, so
>> the framework is not sending an ACK for the message. That is the mechanism
>> of createSession(false, Session.AUTO_ACK), that the framework ACK's a
>> message once the onMessage method returns without error.
>>
>> Taking into account those two things I think you'll find you can send and
>> consume messages just fine.
>>
>> Hope that helps,
>> Nathan
>>
>>
>>> If I can not create a MessageConsumer for RabbitMQ queue, I should try
>>> change RabbitMQ for other as Qpid    :(
>>>
>>> Any idea will be welcome.
>>>
>>> Regards.
>>>
>>> -- wr
>>>
>>>
>>
>> ____
>> This email message is a private communication. The information
>> transmitted, including attachments, is intended only for the person or
>> entity to which it is addressed and may contain confidential, privileged,
>> and/or proprietary material. Any review, duplication, retransmission,
>> distribution, or other use of, or taking of any action in reliance upon,
>> this information by persons or entities other than the intended recipient
>> is unauthorized by the sender and is prohibited. If you have received this
>> message in error, please contact the sender immediately by return email and
>> delete the original message from all computer systems. Thank you.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
>> For additional commands, e-mail: users-help@qpid.apache.org
>>
>>
>
> --
>
> --
> Wayna Runa
>
> +34 668872813 | Skype waynaruna2
> waynaruna@gmail.com | Barcelona, Spain
>


-- 

--
Wayna Runa

+34 668872813 | Skype waynaruna2
waynaruna@gmail.com | Barcelona, Spain

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message