nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: NiFi PriorityAttributePrioritizer doesn't prioritize all items in a queue.
Date Thu, 25 Jun 2020 16:04:58 GMT
Ryan,

Yes, your analysis is correct. NiFi will prioritize things in the Active Queue. Once swapping
starts happening, it can’t constantly rebalance all of the FlowFiles, as doing so would
mean constantly re-reading the swap data from disk and re-writing all of the swap data, which
would be prohibitively expensive.

I went to find the section of the User Guide that explains this, but apparently this isn’t
explained in the user guide like I thought it was :( Sorry about that. I created a Jira [1]
to explain that.

So yes, if you increase the swap threshold, that means that you’ll have a larger active
queue and as a result you’ll be able to handle the prioritization better. But it means that
you’ll be holding more FlowFiles in memory and as a result can encounter OutOfMemoryError
easily if your heap is not large enough.

Thanks
-Mark

[1] https://issues.apache.org/jira/browse/NIFI-7583


On Jun 24, 2020, at 2:01 PM, Ryan Hendrickson <ryan.andrew.hendrickson@gmail.com<mailto:ryan.andrew.hendrickson@gmail.com>>
wrote:

Hello,
  We've noticed that the PriorityAttributePrioritizer doesn't prioritize
all items in a queue.

  We'll get a queue of 50,000 items in it, then a 50,001 item will be
added with a #1 priority.  This item won't be prioritized above lower
priority items in the queue.

  We did some research into this and wanted to confirm what we found...

   For any Relationship there are 2 underlying queues:
      (1) An Active Queue (java.util.PriorityQueue) for the first 20,000
items, defined by the nifi.queue.swap.threshold in nifi.properties.
      (2) A Swap Queue (java.util.ArrayList) for the rest of the
queue's items.

   If the Active Queue is full, every new item, regardless of priority, is
placed on the Swap Queue.  No item on the Swap Queue will be re-prioritized
until the entire Active Queue is empty (Line 284
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java#L284>).
Thus, the PriorityAttributePrioritize only "actively" sorts the first
20,000 items on the relationship.  Once the queue is empty, it'll re-sort
the rest and move the top 20,000 over.  Then the problem repeats.

   I hadn't read anywhere in the documentation that this is the case.
Even in the admin guide definition of the nifi.queue.swap.threshold it
doesn't mention that it affects the PriorityQueue system.  We just raised
it from 20,000 to 200,000 but we're all assuming that's going to have
detrimental effects elsewhere.

  If you guys could confirm our suspicions here, we'd appreciate that.
Also any suggestions on what to do here, or how an increased threshold size
negatively impacts the JVM.


Stack we followed to figure this out:

SwappablePriorityQueue.java: doPoll()
SwappablePriorityQueue.java: poll()
StandardFlowFileQueue.java: queue.poll()
StandardConnection.java flowFileQueue.poll()
StandardProcessSession.java: conn.poll()
InvokeHttp.java: session.get()

Thanks,
Ryan

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message