nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy LoPresto <>
Subject Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.
Date Wed, 24 Jun 2020 23:06:37 GMT
Hi Ryan,

Thanks for writing such a detailed report. What version of NiFi are you observing this behavior
against? I know there were some issues in older versions with queue swapping, but since you
linked to current code, I’m assuming you’re experiencing this on 1.11.4?

Andy LoPresto
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Jun 24, 2020, at 11:01 AM, Ryan Hendrickson <>
> Hello,
>   We've noticed that the PriorityAttributePrioritizer doesn't prioritize
> all items in a queue.
>   We'll get a queue of 50,000 items in it, then a 50,001 item will be
> added with a #1 priority.  This item won't be prioritized above lower
> priority items in the queue.
>   We did some research into this and wanted to confirm what we found...
>    For any Relationship there are 2 underlying queues:
>       (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
> items, defined by the nifi.queue.swap.threshold in
>       (2) A Swap Queue (java.util.ArrayList) for the rest of the
> queue's items.
>    If the Active Queue is full, every new item, regardless of priority, is
> placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
> until the entire Active Queue is empty (Line 284
> <>).
> Thus, the PriorityAttributePrioritize only "actively" sorts the first
> 20,000 items on the relationship.  Once the queue is empty, it'll re-sort
> the rest and move the top 20,000 over.  Then the problem repeats.
>    I hadn't read anywhere in the documentation that this is the case.
> Even in the admin guide definition of the nifi.queue.swap.threshold it
> doesn't mention that it affects the PriorityQueue system.  We just raised
> it from 20,000 to 200,000 but we're all assuming that's going to have
> detrimental effects elsewhere.
>   If you guys could confirm our suspicions here, we'd appreciate that.
> Also any suggestions on what to do here, or how an increased threshold size
> negatively impacts the JVM.
> Stack we followed to figure this out:
> doPoll()
> poll()
> queue.poll()
> flowFileQueue.poll()
> conn.poll()
> session.get()
> Thanks,
> Ryan

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message