apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vikram patil <patilvik...@gmail.com>
Subject Re: Improvement suggestions for AbstractRabbitMQInputOperator
Date Wed, 14 Jun 2017 08:58:12 GMT
I have created Jira for this improvement .
https://issues.apache.org/jira/browse/APEXMALHAR-2509

-Vikram

On Mon, Jun 12, 2017 at 1:47 PM, Mohit Jotwani <mohit@datatorrent.com>
wrote:

> +1 - The queue_name, host, port should be sufficient to start reading the
> messages.
>
> Regards,
> Mohit
>
>
>
> On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <shubham@datatorrent.com>
> wrote:
>
> > +1 for the improvements
> > This way the it will become easier for a new user to try out the operator
> > as well.
> >
> > Thanks,
> > Shubham Pathak
> >
> > On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvikram@gmail.com>
> > wrote:
> >
> > > Hello All,
> > >
> > > While helping one of the Apex User, I found that current
> > > AbstractRabbitMQInputOperator can be made simpler to use . After
> > > investigation, I would like to suggest some improvements as below. If
> > there
> > > are more improvements needed, please suggest and I will incorporate
> those
> > > suggestions to create Jira ticket.
> > >
> > > In current code for AbstractRabbitMQInputOperator exchange,
> exchangeType
> > > are made NotNull which doesn't allow app to be launched without
> > specifying
> > > these values. For an input Operator, we are actually trying to create
> > > queues and exchanges with specified values. But it leads to conflict in
> > > some scenarios when default exchange is used for the queue as well when
> > > queue type is transient. To consume from rabbitmq, operator need to use
> > > only QueueName, host and port of rabbitmq.  Similar to
> KafkaInputOperator
> > > we can let the operator fail if QueueName is not specified and let
> > > developer correct an application or specified it from configuration.
> > >
> > > *Suggested Improvements:*
> > >
> > > 1) Drop requirements to specify exchange and its type .
> > > 2) We should not be attempting to create queue in Input Operator. For
> > > consumer only queue name is sufficient to start consuming data from
> > queue.
> > > 3) Currently queue name is optional, we should make it mandatory
> instead
> > of
> > > creating queue.
> > >
> > > I tried out following scenarios to test existing operator .
> > >
> > > *Scenarios with default exchange:*
> > > *1) Queue is already created as non-durable with default exchange*
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=403,
> > > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > > exchange, class-id=40, method-id=10)
> > > at com.rabbitmq.utility.ValueOrException.getValue(
> > > ValueOrException.java:66)
> > > at
> > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > > BlockingValueOrException.java:36)
> > > at
> > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > > getReply(AMQChannel.java:398)
> > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> > AMQChannel.java:128
> > >
> > > *Reason:*
> > > Default exchange is specified in the code as “” . Though we are trying
> to
> > > consume from specified and already created queue, operator crashes as
> > > exchangeDeclare() call fails .
> > >
> > > *2)Queue is not created before launching an app   *
> > >
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=403,
> > > reply-text=ACCESS_REFUSED - operation not permitted on the default
> > > exchange, class-id=40, method-id=10)
> > > at com.rabbitmq.utility.ValueOrException.getValue(
> > > ValueOrException.java:66)
> > > at
> > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(
> > > BlockingValueOrException.java:36)
> > > at
> > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.
> > > getReply(AMQChannel.java:398)
> > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244)
> > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(
> > AMQChannel.java:128
> > >
> > > *Reason:*
> > > Current operator failed to create default exchange .
> > >
> > > *Scnenarios with Custom Exchanges:*
> > >
> > > *1)Queue “test_2” created as non-durable with exchange “new_exchange”
> in
> > > rabbitmq*
> > > *Setup:*
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Exception:*
> > > Exception Caused: due to mismatch in param while declaring queue as
> > > “durable”
> > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> > > protocol method: #method<channel.close>(reply-code=406,
> > > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for
> exchange
> > > 'new_exchange' in vhost '/': received 'false' but current is 'true',
> > > class-id=40, method-id=10)
> > > *Reason:*
> > > Existing queue was transient ( non-durable ) while code tries to create
> > > durable queue with the same name.
> > >
> > > *2) When Queue and exchange are not created ( declared ) in rabbitmq*
> > > *Setup:*
> > > No queue and exchange are present in rabbitmq
> > > App with following configuration ends up creating durable queue
> “test_2”
> > >  in new exchange “new_exchange” as “fanout” type and routing key as
“”.
> > >
> > > rabbitMQInputOperator.setQueueName("test_2");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Result:*
> > > Now external entities can pushed data to newly created exchange and
> queue
> > > *3) Queue and exchanges are already created rabbitmq with queue as
> > durable
> > > and exchange and exchange Type as specified specified. *
> > >
> > > *Setup:*
> > >
> > > rabbitMQInputOperator.setQueueName("test");
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setRoutingKey("test");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setPort(5672);
> > > *Result:*
> > > Worked with no issues. But it demands that it durable queue has to be
> > > created with proper exchange and routing key
> > >
> > > *4) Queuename not specified in an app but exchange and exchangeType
> > > specified for Operator *
> > >
> > > *Setup:*
> > > rabbitMQInputOperator.setExchangeType("fanout");
> > > rabbitMQInputOperator.setExchange("new_exchange");
> > > rabbitMQInputOperator.setHost("localhost");
> > > rabbitMQInputOperator.setRoutingKey("test");
> > > rabbitMQInputOperator.setPort(5672);
> > >
> > > *Result:*
> > > Operator ended up creating queue with unique names such as
> > > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w”  which gets auto deleted when last
> > > consumer stops consuming from it.
> > >
> > >
> > > Thanks & Regards,
> > > Vikram
> > >
> >
>
>
>
> --
>
> Regards,
>
> ___________________________________________________
>
> *Mohit Jotwani*
>
> Product Manager
>
> E: mohit@datatorrent.com | M: +91 97699 62740
>
> www.datatorrent.com  |  apex.apache.org
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message