nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jon Kessler (Jira)" <>
Subject [jira] [Commented] (NIFI-6831) Create a flowfile queue implementation with global data priority awareness
Date Tue, 12 Nov 2019 14:57:00 GMT


Jon Kessler commented on NIFI-6831:

I've attached a template and companion sample ruleset to give a basic demonstration of this
new capability. I suggest allowing flowfiles to accumulate in the queues before the last set
of UpdateAttribute processors for a while (get 20kish in there) before starting them. You
will see flowfiles travel through that last set of processors at different rates based on
their priorities.

Prior to starting nifi you must enable this feature in[1]. I also suggest
increasing the swap threshold for queues to avoid swapping (swap is implemented, it just obviously
slows things down). Lastly you must add priority rules via the context menu at the top right
of the UI before any staggering will occur. See the attached sample for what I used for this

[1] nifi.controller.flowfilequeue.buckets=true

> Create a flowfile queue implementation with global data priority awareness
> --------------------------------------------------------------------------
>                 Key: NIFI-6831
>                 URL:
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.11.0
>            Reporter: Jon Kessler
>            Assignee: Jon Kessler
>            Priority: Major
>         Attachments: Priority Rules for demo.jpg, Priority_Demo.xml
>          Time Spent: 10m
>  Remaining Estimate: 0h
> There is currently no way to process data in order by priority on a flow-wide or global
scale. There are several issues with the way sorting by priority attribute is currently done
in the framework that I believe we can address with a new flowfile queue implementation. Those
shortcomings are:
>  * Scheduling: No consideration is given to data priority when determining which component
is given the next available thread with which to work
>  * Constant sorting: Because all flowfiles in a given connection share the same PriorityQueue
they must be sorted every time they move. While this sort is efficient it can add up as queues
grow deep.
>  * Administration: There is a costly human element to managing the value used as a priority
ranking as priorities change. You must also ensure every
>  connection in the appropriate flow has the proper prioritizer assigned to it to make
use of the property.
> The design goals of this new priority mechanism and flowfile queue implementation are:
>  * Instead of using the value of a FlowFile attribute as a ranking, maintain a set of
expression language rules to define your priorities. The highest ranked rule that a given
FlowFile satisfies will be that FlowFile's priority
>  * Because we have a finite set of priority rules we can utilize a bucket sort in our
connections. One bucket per priority rule. The bucket/rule with which a FlowFile is associated
with will be maintained so that as it moves through the system we do not have to re-evaluate
that Flowfile against our ruleset unless we have reason to do so.
>  * Control where in your flow FlowFiles are evaluated against the ruleset with a new
Prioritizer implementation: BucketPrioritizer.
>  * When this queue implementation is polled it will be able to check state to see if
any data of a higher priority than what it currently contains recently (within 5s) moved elsewhere
in the system. If higher priority data has recently moved elsewhere, the connection will only
provide a FlowFile X% of the time where X is defined along with the rule. This allows higher
priority data to have more frequent access to threads without thread-starving lower priority
>  * Rules will be managed via a menu option for the flow and changes to them take effect
instantly. This allows you to change your priorities without stopping/editing/restarting various
components on the graph.
> Additional design considerations:
> The sorting function here takes place on insertion into any connection on which a BucketPrioritizer
is set. Once a FlowFile has been sorted into a bucket we maintain that state so that each
time it moves into a new connection we already know in which bucket it should be placed without
needing to have a BucketPrioritizer set on that connection. Each bucket in a connection is
just a FIFO queue so no additional sorting is done. You should only have to configure connections
to use the BucketPrioritizer at points in your flow where you believe you'll have enough information
to accurately determine priority but not beyond that point unless you want to re-evaluate
downstream for some reason. There is administration involved in setting these BucketPrioritizers
on some connections but it should be minimal per flow (sometimes as few as one).
> When you delete a rule the next time each FlowFile moves that was already associated
with that rule it will be re-evaluated against the ruleset when it enters the next connection
regardless of whether or not a BucketPrioritizer was set on that connection. Also FlowFiles
that have yet to be evaluated (have yet to encounter a BucketPrioritizer) will not be staggered.
This was a design decision that if we don't know what a priority is for a given FlowFile we
should get it to that point in the flow as soon as possible. This decision was a result of
empirical evidence that when we did stagger unevaluated data an incoming flow of high priority
data slowed its own upstream processing down once it was identified and processed as high

This message was sent by Atlassian Jira

View raw message