apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shubham Pathak <shub...@datatorrent.com>
Subject Re: Improvement suggestions for AbstractRabbitMQInputOperator
Date Fri, 09 Jun 2017 12:17:47 GMT
+1 for the improvements
This way the it will become easier for a new user to try out the operator
as well.

Thanks,
Shubham Pathak

On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvikram@gmail.com> wrote:

> Hello All,
>
> While helping one of the Apex User, I found that current
> AbstractRabbitMQInputOperator can be made simpler to use . After
> investigation, I would like to suggest some improvements as below. If there
> are more improvements needed, please suggest and I will incorporate those
> suggestions to create Jira ticket.
>
> In current code for AbstractRabbitMQInputOperator exchange, exchangeType
> are made NotNull which doesn't allow app to be launched without specifying
> these values. For an input Operator, we are actually trying to create
> queues and exchanges with specified values. But it leads to conflict in
> some scenarios when default exchange is used for the queue as well when
> queue type is transient. To consume from rabbitmq, operator need to use
> only QueueName, host and port of rabbitmq.  Similar to KafkaInputOperator
> we can let the operator fail if QueueName is not specified and let
> developer correct an application or specified it from configuration.
>
> *Suggested Improvements:*
>
> 1) Drop requirements to specify exchange and its type .
> 2) We should not be attempting to create queue in Input Operator. For
> consumer only queue name is sufficient to start consuming data from queue.
> 3) Currently queue name is optional, we should make it mandatory instead of
> creating queue.
>
> I tried out following scenarios to test existing operator .
>
> *Scenarios with default exchange:*
> *1) Queue is already created as non-durable with default exchange*
> *Setup:*
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=403,
> reply-text=ACCESS_REFUSED - operation not permitted on the default
> exchange, class-id=40, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:66)
> at
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:36)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:398)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128
>
> *Reason:*
> Default exchange is specified in the code as “” . Though we are trying to
> consume from specified and already created queue, operator crashes as
> exchangeDeclare() call fails .
>
> *2)Queue is not created before launching an app   *
>
> *Setup:*
> rabbitMQInputOperator.setQueueName("test");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=403,
> reply-text=ACCESS_REFUSED - operation not permitted on the default
> exchange, class-id=40, method-id=10)
> at com.rabbitmq.utility.ValueOrException.getValue(
> ValueOrException.java:66)
> at
> com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> BlockingValueOrException.java:36)
> at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> getReply(AMQChannel.java:398)
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128
>
> *Reason:*
> Current operator failed to create default exchange .
>
> *Scnenarios with Custom Exchanges:*
>
> *1)Queue “test_2” created as non-durable with exchange “new_exchange” in
> rabbitmq*
> *Setup:*
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Exception:*
> Exception Caused: due to mismatch in param while declaring queue as
> “durable”
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for exchange
> 'new_exchange' in vhost '/': received 'false' but current is 'true',
> class-id=40, method-id=10)
> *Reason:*
> Existing queue was transient ( non-durable ) while code tries to create
> durable queue with the same name.
>
> *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> *Setup:*
> No queue and exchange are present in rabbitmq
> App with following configuration ends up creating durable queue “test_2”
>  in new exchange “new_exchange” as “fanout” type and routing key as “”.
>
> rabbitMQInputOperator.setQueueName("test_2");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
>
> *Result:*
> Now external entities can pushed data to newly created exchange and queue
> *3) Queue and exchanges are already created rabbitmq with queue as durable
> and exchange and exchange Type as specified specified. *
>
> *Setup:*
>
> rabbitMQInputOperator.setQueueName("test");
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setRoutingKey("test");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setPort(5672);
> *Result:*
> Worked with no issues. But it demands that it durable queue has to be
> created with proper exchange and routing key
>
> *4) Queuename not specified in an app but exchange and exchangeType
> specified for Operator *
>
> *Setup:*
> rabbitMQInputOperator.setExchangeType("fanout");
> rabbitMQInputOperator.setExchange("new_exchange");
> rabbitMQInputOperator.setHost("localhost");
> rabbitMQInputOperator.setRoutingKey("test");
> rabbitMQInputOperator.setPort(5672);
>
> *Result:*
> Operator ended up creating queue with unique names such as
> “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> consumer stops consuming from it.
>
>
> Thanks & Regards,
> Vikram
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message