camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Willem Jiang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CAMEL-7031) RabbitMQ Producer not able to use the default exchange
Date Thu, 30 Apr 2015 06:27:06 GMT

     [ https://issues.apache.org/jira/browse/CAMEL-7031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Willem Jiang updated CAMEL-7031:
--------------------------------
    Description: 
In RabbitMQ, the default exchange is a direct exchange with no name (empty string) and is
pre-declared by the broker. It has one special property that makes it very useful for simple
applications: every queue that is created is automatically bound to it with a routing key
which is the same as the queue name.  This is especially useful in RPC style messaging when
the producer specifies a REPLY_TO queue name that was created "exclusive"  Since RabbitMQ
binds that queue onto the default exchange, it makes RPC much simpler.

However, the camel rabbitmq producer throws an IllegalArgumentException if the exchange name
is empty, which prevents this simple RPC exchange.  The fix for this is simple, just don't
throw that IllegalArgumentException if the exchange name was set to empty string.

The same problem may exists with the Consumer as well.

This python script will send an rpc request (from RabbitMQ in Action)
{code}
import time, json, pika

creds_broker = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost",
                                         virtual_host = "/",
                                         credentials = creds_broker)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

msg = json.dumps({"client_name": "RPC Client 1.0",
                  "time" : time.time()})
result = channel.queue_declare(exclusive=True, auto_delete=True)
msg_props = pika.BasicProperties()
msg_props.reply_to = result.method.queue
msg_props.content_type = "application/json"
msg_props.correlation_id = "1"
msg_props.delivery_mode = 2

channel.basic_publish(body=msg,
                      exchange="rpc",
                      properties=msg_props,
                      routing_key="ping")

print "Sent 'Ping' RPC call.  Waiting for reply..."

def reply_callback(channel, method, header, body):
     """Receives RPC server replies."""
     print "RPC Reply --- " + body
     channel.stop_consuming()

channel.basic_consume(reply_callback,
                      queue=result.method.queue,
                      consumer_tag=result.method.queue)

channel.start_consuming()
{code}

This route would be what I would want to do when consuming from Rabbit the rpc call and sending
back a response:
{code}
        from("rabbitmq://192.168.213.130/rpc?queue=ping&routingKey=ping&durable=True&autoDelete=False&autoAck=False&username=guest&password=guest")
            .log("Incoming Headers: ${headers}")
            .setHeader("rabbitmq.ROUTING_KEY", header("rabbitmq.REPLY_TO"))
            .removeHeader("rabbitmq.REPLY_TO")
            .removeHeader("rabbitmq.EXCHANGE_NAME")
            .setBody(simple("Pong!"))
            .to("rabbitmq://192.168.213.130/?username=guest&password=guest");

{code}
If I remove the illegalargumentexception, the code works as expected.

  was:
In RabbitMQ, the default exchange is a direct exchange with no name (empty string) and is
pre-declared by the broker. It has one special property that makes it very useful for simple
applications: every queue that is created is automatically bound to it with a routing key
which is the same as the queue name.  This is especially useful in RPC style messaging when
the producer specifies a REPLY_TO queue name that was created "exclusive"  Since RabbitMQ
binds that queue onto the default exchange, it makes RPC much simpler.

However, the camel rabbitmq producer throws an IllegalArgumentException if the exchange name
is empty, which prevents this simple RPC exchange.  The fix for this is simple, just don't
throw that IllegalArgumentException if the exchange name was set to empty string.

The same problem may exists with the Consumer as well.

This python script will send an rpc request (from RabbitMQ in Action)
import time, json, pika

creds_broker = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters("localhost",
                                         virtual_host = "/",
                                         credentials = creds_broker)
conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

msg = json.dumps({"client_name": "RPC Client 1.0",
                  "time" : time.time()})
result = channel.queue_declare(exclusive=True, auto_delete=True)
msg_props = pika.BasicProperties()
msg_props.reply_to = result.method.queue
msg_props.content_type = "application/json"
msg_props.correlation_id = "1"
msg_props.delivery_mode = 2

channel.basic_publish(body=msg,
                      exchange="rpc",
                      properties=msg_props,
                      routing_key="ping")

print "Sent 'Ping' RPC call.  Waiting for reply..."

def reply_callback(channel, method, header, body):
     """Receives RPC server replies."""
     print "RPC Reply --- " + body
     channel.stop_consuming()

channel.basic_consume(reply_callback,
                      queue=result.method.queue,
                      consumer_tag=result.method.queue)

channel.start_consuming()


This route would be what I would want to do when consuming from Rabbit the rpc call and sending
back a response:

        from("rabbitmq://192.168.213.130/rpc?queue=ping&routingKey=ping&durable=True&autoDelete=False&autoAck=False&username=guest&password=guest")
            .log("Incoming Headers: ${headers}")
            .setHeader("rabbitmq.ROUTING_KEY", header("rabbitmq.REPLY_TO"))
            .removeHeader("rabbitmq.REPLY_TO")
            .removeHeader("rabbitmq.EXCHANGE_NAME")
            .setBody(simple("Pong!"))
            .to("rabbitmq://192.168.213.130/?username=guest&password=guest");


If I remove the illegalargumentexception, the code works as expected.


> RabbitMQ Producer not able to use the default exchange
> ------------------------------------------------------
>
>                 Key: CAMEL-7031
>                 URL: https://issues.apache.org/jira/browse/CAMEL-7031
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-rabbitmq
>    Affects Versions: 2.12.2
>            Reporter: Jason Foster
>            Assignee: Willem Jiang
>             Fix For: 2.12.3
>
>
> In RabbitMQ, the default exchange is a direct exchange with no name (empty string) and
is pre-declared by the broker. It has one special property that makes it very useful for simple
applications: every queue that is created is automatically bound to it with a routing key
which is the same as the queue name.  This is especially useful in RPC style messaging when
the producer specifies a REPLY_TO queue name that was created "exclusive"  Since RabbitMQ
binds that queue onto the default exchange, it makes RPC much simpler.
> However, the camel rabbitmq producer throws an IllegalArgumentException if the exchange
name is empty, which prevents this simple RPC exchange.  The fix for this is simple, just
don't throw that IllegalArgumentException if the exchange name was set to empty string.
> The same problem may exists with the Consumer as well.
> This python script will send an rpc request (from RabbitMQ in Action)
> {code}
> import time, json, pika
> creds_broker = pika.PlainCredentials("guest", "guest")
> conn_params = pika.ConnectionParameters("localhost",
>                                          virtual_host = "/",
>                                          credentials = creds_broker)
> conn_broker = pika.BlockingConnection(conn_params)
> channel = conn_broker.channel()
> msg = json.dumps({"client_name": "RPC Client 1.0",
>                   "time" : time.time()})
> result = channel.queue_declare(exclusive=True, auto_delete=True)
> msg_props = pika.BasicProperties()
> msg_props.reply_to = result.method.queue
> msg_props.content_type = "application/json"
> msg_props.correlation_id = "1"
> msg_props.delivery_mode = 2
> channel.basic_publish(body=msg,
>                       exchange="rpc",
>                       properties=msg_props,
>                       routing_key="ping")
> print "Sent 'Ping' RPC call.  Waiting for reply..."
> def reply_callback(channel, method, header, body):
>      """Receives RPC server replies."""
>      print "RPC Reply --- " + body
>      channel.stop_consuming()
> channel.basic_consume(reply_callback,
>                       queue=result.method.queue,
>                       consumer_tag=result.method.queue)
> channel.start_consuming()
> {code}
> This route would be what I would want to do when consuming from Rabbit the rpc call and
sending back a response:
> {code}
>         from("rabbitmq://192.168.213.130/rpc?queue=ping&routingKey=ping&durable=True&autoDelete=False&autoAck=False&username=guest&password=guest")
>             .log("Incoming Headers: ${headers}")
>             .setHeader("rabbitmq.ROUTING_KEY", header("rabbitmq.REPLY_TO"))
>             .removeHeader("rabbitmq.REPLY_TO")
>             .removeHeader("rabbitmq.EXCHANGE_NAME")
>             .setBody(simple("Pong!"))
>             .to("rabbitmq://192.168.213.130/?username=guest&password=guest");
> {code}
> If I remove the illegalargumentexception, the code works as expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message