nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oleg Zhurakousky <>
Subject Re: Nothing in rabbitMQ queue from Publish AMQP processor
Date Thu, 08 Dec 2016 20:19:59 GMT

You're welcome! And here is the new ticket


On Dec 8, 2016, at 3:13 PM, James McMahon <<>>

Outstanding work, Oleg. Thank you. I'll watch for those JIRA tickets and in the meanwhile
will battle to bring our rMQ out of the dark ages. Thank you very much once again for your
help investigating this behavior Oleg.

On Thu, Dec 8, 2016 at 3:08 PM, Oleg Zhurakousky <<>>

I got to the bottom of the problem and will raise a few bugs against it, but those will be
more about providing more information as to what is the cause of the problem, but  the problem
is most definitely on your end with regard to exchange, routingKey and queue setup.

Yes, what you’re describing does make some sense and that is exactly what I’ll be filing
for improvement. So here is the short version of it.
- When sending to a non-existing exchange is fatal and the channel will be automatically closed.
Do first improvement is to check for state of the channel right after basicPublish().
- If exchange is valid but routingKey is not and ‘mandatory’ flag set to true (which is
always true in our case) there is a ReturnListener that could be registered with the channel
that will let you know pretty much instantaneously if routing failed.
So for your current case I am almost positive that due to some old versions of Rabbit and
AMQP client version used by us there is some kind of incompatibility that results in either
of those condition to be presented to a client (even though I believe that you may have configured
everything correctly, but on the older version of Rabbit).

So, anyway, I’ll file these two bugs while your will try to convince the powerful men you’re
working for to upgrade RabbitMQ ;)

I’ll follow up with JIRA numbers so you can track


On Dec 8, 2016, at 2:58 PM, James McMahon <<>>

Glad to hear that you also saw this behavior Oleg. Excellent work - thank you. I think I've
proven that this is a problem caused by the version of rabbitMQ and Erlang I am running. The
one that breaks: rMQ 3.15, Erlang R14B04.

To prove my theory a co-worker exposed for my test use a dev instance of rMQ that he happened
to have handy (rMQ 3.65, Erlang 19.1) on another server and port. And my test messages started
popping up in the same exchange / queue / binding key / routing key configuration over there
like minions. I think I need to get approval to upgrade rMQ and E. While I won't be able to
upgrade my NiFi baseline because of other organizational dependencies, I can certainly eliminate
the problems due to the rMQ and Erlang versions.

Does this seem like a reasonable approach in your opinion?

On Thu, Dec 8, 2016 at 1:45 PM, Oleg Zhurakousky <<>>

So, first good news. I was able to reproduce what you’re seeing. That is always good.
Basically I am sending messages to a bogus exchange with a bogus routing key. The first send
results in success from Java API standpoint (no exception is thrown), however, the second
send fails due to the fact that the channel is closed and then when I try to stop the processor
it gives me the same error as you see during the stop call.
That said, one thing is clear is that your message does not reach Rabbit with any type of
success and that is most likely due to some misconfiguration between exchange, binding and
routing key.
Let me investigate a bit further and i’ll be in touch

Meanwhile if you look in your app logs can you confirm that you can find the stack trace message
that looks similar to what’s below?

13:37:30,449 ERROR Timer-Driven Process Thread-10 processors.PublishAMQP:268 -
java.lang.IllegalStateException: This instance of AMQPPublisher is invalid since its publishingChannel
is closed
at org.apache.nifi.amqp.processors.AMQPPublisher.publish(
at org.apache.nifi.amqp.processors.PublishAMQP.rendezvousWithAmqp(

On Dec 8, 2016, at 1:04 PM, Oleg Zhurakousky <<>>

Got it, let me dig in, i got time now

On Dec 8, 2016, at 12:46 PM, James McMahon <<>>

No problem Oleg. My fault for not being clear. The test works demonstrating the message delivery
from RMQ exchange to RMQ queue. I was trying to be systematic, and eliminate points of possible
failure, the binding of the exchange to the queue in RMQ being one.

It does not work, NiFi to RMQ. I see that a connection is created by RMQ but that connection
is blocking. I do not believe the content ever actually gets handed to the RMQ exchange by

On Thu, Dec 8, 2016 at 12:41 PM, Oleg Zhurakousky <<>>

My apology , but I am not sure I am clear on the second part of your explanation. It appears
you are saying that when you bound the Exchange to a legit routing key all works as expected
(in NiFi), but then what?
Could you please clarify?

Sorry about that.

On Dec 8, 2016, at 11:41 AM, James McMahon <<>>

Two quick updates. I generated a test message in the RMQ exchange using the RabbitMQ Management
Console. My goal was to see if the Idle and blocking were problems or expected, and whether
those would change when I sent a test message to the queue. It's also a good validation that
I've bound the exchange and queue with a legit routing key. This worked - the test message
was delivered successfully.

I noticed that for each AMQP message nifi attempts to send to RMQ, RabbitMQ Management Console
creates a Connection that is shown with State of blocking. and that is how it remains indefinitely.
Nothing shows up in the exchange or in the queue.


On Thu, Dec 8, 2016 at 8:26 AM, Oleg Zhurakousky <<>>
Fair enough

On Dec 8, 2016, at 8:16 AM, James McMahon <<>>

I'm a captive audience in this respect. I do not have the latitude to change that, as it is
dictated by a parent organization much broader than my level.

On Thu, Dec 8, 2016 at 8:10 AM, Oleg Zhurakousky <<>>

Could you also see if you can upgrade to a newer version of NiFi? Current release is 1.1.0,
so 0.6.1 is about 3 releases behind and there were quite a few bug fixes and improvements
since then that could shed some more light into your issue.
Meanwhile I’ll see what I can dig up on my end.


On Dec 8, 2016, at 8:00 AM, James McMahon <<>>

Yes absolutely Oleg. I will try to figure out how to make my rabbitMQ queues explicitly active.
I'll need to dig into that because it is not plainly obvious to me how / if that needs to
be done. I'll dig for that.

I am using NiFi 0.6.1.

Cheers and thanks again,


On Thu, Dec 8, 2016 at 7:46 AM, Oleg Zhurakousky <<>>
James I have not used AMQP in a while (although I am the one responsible for developing the
functionality you’re having problem with ;)), so I need to brush up a bit on some of the
administrative actions you describe (idle to active). Do i’ll get back to you, but as a
joint debug step, can you please try to make your queens explicitly active and see if behavior

Also, what version of NiFi you are using?
Cheers when

On Dec 8, 2016, at 7:37 AM, James McMahon <<>>

Good morning and thank you Oleg. No, no other stack traces or errors are indicated. I've got
my Publish AMQP configuration set to DEBUG for Bulletin Level.

I do get an error at the UI when I try to Start the Publish AMQP step after I have Stopped
it. It tells me "PublishAMQP[id=47f88.....744] cannot be started because it is not stopped.
Current state is STOPPING". I suspect this is a reflection of the WARN log message.

In your experience must I take some action at the RabbitMQ Management Console to change my
Exchange and Queue from Idle to Active? I believe that would happen automatically when AMQP
messages starting hitting the exchange, but I thought it best to ask. I see nothing on the
management console that alludes to enabling the exchange or the queue.


On Thu, Dec 8, 2016 at 7:24 AM, Oleg Zhurakousky <<>>

Aside from “closing” failure message, do you see any other errors or stack traces in the
Meanwhile I’ll try to dig more with what I have?


On Dec 8, 2016, at 7:13 AM, James McMahon <<>>

Hello I am trying to use a Publish AMQP step in my NiFi workflow. I generate a flow file with
random text content using Generate Flow File, and then send to a rabbitMQ Exchange named nifiAbcX
using Publish AMQP. My exchange is bound to queue AbcQ by key *.Abc.*, and my virtual host
is nifiVhost. I have established my exchange as a Topic type.

My NiFi instance and my RabbitMQ instance are co-located for development test purposes on
the same server.

It appears that my Publish AMQP does indeed generate output which it sends down the "success"
output flow path, but at the RabbitMQ Management Console I find that my Exchange and my Queue
are Idle and show no messages. My nifi log shows this Warning message: Failure while closing
target resource AMQPublisher:amqp://test@<http://test@>,
EXCHANGE:nifiAbcX,  What is this telling me? Why am I seeing no messages
posted to my rabbitMQ queue through the exchange? Thank you in advance.

View raw message