apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Jotwani <mo...@datatorrent.com>
Subject Re: Improvement suggestions for AbstractRabbitMQInputOperator
Date Mon, 12 Jun 2017 08:17:43 GMT
+1 - The queue_name, host, port should be sufficient to start reading the
messages.

Regards,
Mohit



On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <shubham@datatorrent.com>
wrote:

> +1 for the improvements
> This way the it will become easier for a new user to try out the operator
> as well.
>
> Thanks,
> Shubham Pathak
>
> On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvikram@gmail.com>
> wrote:
>
> > Hello All,
> >
> > While helping one of the Apex User, I found that current
> > AbstractRabbitMQInputOperator can be made simpler to use . After
> > investigation, I would like to suggest some improvements as below. If
> there
> > are more improvements needed, please suggest and I will incorporate those
> > suggestions to create Jira ticket.
> >
> > In current code for AbstractRabbitMQInputOperator exchange, exchangeType
> > are made NotNull which doesn't allow app to be launched without
> specifying
> > these values. For an input Operator, we are actually trying to create
> > queues and exchanges with specified values. But it leads to conflict in
> > some scenarios when default exchange is used for the queue as well when
> > queue type is transient. To consume from rabbitmq, operator need to use
> > only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
> > we can let the operator fail if QueueName is not specified and let
> > developer correct an application or specified it from configuration.
> >
> > *Suggested Improvements:*
> >
> > 1) Drop requirements to specify exchange and its type .
> > 2) We should not be attempting to create queue in Input Operator. For
> > consumer only queue name is sufficient to start consuming data from
> queue.
> > 3) Currently queue name is optional, we should make it mandatory instead
> of
> > creating queue.
> >
> > I tried out following scenarios to test existing operator .
> >
> > *Scenarios with default exchange:*
> > *1) Queue is already created as non-durable with default exchange*
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=403,
> > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > exchange, class-id=40, method-id=10)
> > at com.rabbitmq.utility.ValueOrException.getValue(
> > ValueOrException.java:66)
> > at
> > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > BlockingValueOrException.java:36)
> > at
> > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > getReply(AMQChannel.java:398)
> > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> AMQChannel.java:128
> >
> > *Reason:*
> > Default exchange is specified in the code as “” . Though we are trying to
> > consume from specified and already created queue, operator crashes as
> > exchangeDeclare() call fails .
> >
> > *2)Queue is not created before launching an app   *
> >
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=403,
> > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > exchange, class-id=40, method-id=10)
> > at com.rabbitmq.utility.ValueOrException.getValue(
> > ValueOrException.java:66)
> > at
> > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > BlockingValueOrException.java:36)
> > at
> > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > getReply(AMQChannel.java:398)
> > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> AMQChannel.java:128
> >
> > *Reason:*
> > Current operator failed to create default exchange .
> >
> > *Scnenarios with Custom Exchanges:*
> >
> > *1)Queue “test_2” created as non-durable with exchange “new_exchange” in
> > rabbitmq*
> > *Setup:*
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Exception:*
> > Exception Caused: due to mismatch in param while declaring queue as
> > “durable”
> > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > protocol method: #method<channel.close>(reply-code=406,
> > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
> > 'new_exchange' in vhost '/': received 'false' but current is 'true',
> > class-id=40, method-id=10)
> > *Reason:*
> > Existing queue was transient ( non-durable ) while code tries to create
> > durable queue with the same name.
> >
> > *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> > *Setup:*
> > No queue and exchange are present in rabbitmq
> > App with following configuration ends up creating durable queue “test_2”
> >  in new exchange “new_exchange” as “fanout” type and routing key as “”.
> >
> > rabbitMQInputOperator.setQueueName("test_2");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Result:*
> > Now external entities can pushed data to newly created exchange and queue
> > *3) Queue and exchanges are already created rabbitmq with queue as
> durable
> > and exchange and exchange Type as specified specified. *
> >
> > *Setup:*
> >
> > rabbitMQInputOperator.setQueueName("test");
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setRoutingKey("test");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setPort(5672);
> > *Result:*
> > Worked with no issues. But it demands that it durable queue has to be
> > created with proper exchange and routing key
> >
> > *4) Queuename not specified in an app but exchange and exchangeType
> > specified for Operator *
> >
> > *Setup:*
> > rabbitMQInputOperator.setExchangeType("fanout");
> > rabbitMQInputOperator.setExchange("new_exchange");
> > rabbitMQInputOperator.setHost("localhost");
> > rabbitMQInputOperator.setRoutingKey("test");
> > rabbitMQInputOperator.setPort(5672);
> >
> > *Result:*
> > Operator ended up creating queue with unique names such as
> > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> > consumer stops consuming from it.
> >
> >
> > Thanks & Regards,
> > Vikram
> >
>



-- 

Regards,

___________________________________________________

*Mohit Jotwani*

Product Manager

E: mohit@datatorrent.com | M: +91 97699 62740

www.datatorrent.com  |  apex.apache.org

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message