nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Oleg Zhurakousky (JIRA)" <>
Subject [jira] [Commented] (NIFI-2735) Add processor to perform simple aggregations
Date Tue, 06 Sep 2016 19:02:21 GMT


Oleg Zhurakousky commented on NIFI-2735:

[~mattyb149] This is indeed an interesting problem to solve but it is also one of the most
complex EIP problems to solve. . .
I have not started looking at the PR yet, but will raise a few questions/comments primarily
for documentation purposes for this JIRA.
1. Aggregation implies collection of multiple values. Typically such multiplication is limited
by a number that is derived from the previous Split operation. This creates a dilemma for
partial aggregation (not all splits came in). Have we thought on the approach for that (e.g.,
some expiration)?
2. I like what you are proposing with regards to simple aggregation, however, do you think
that such simplicity could be easily achieved with allowing user to provide an expression,
thus only limiting what can be done to the capabilities of the expression language itself?
3. Another dilemma is NiFi restarts or processor restarts before aggregation is complete.
Basically I need to be able to stop the processor in the middle of aggregation (especially
that it may actually be performing several aggregations) and i need to be able to resume right
where it left off once it is restarted. So we need some kind of a state store. Thoughts?

Anyway, some of it may have already been addressed, but these 3 are the key IMHO, so I wanted
to make sure they are at least documented. 

> Add processor to perform simple aggregations
> --------------------------------------------
>                 Key: NIFI-2735
>                 URL:
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
> This is a proposal for a new processor (AggregateValues, for example) that can perform
simple aggregation operations such as count, sum, average, min, max, and concatenate, over
a set of "related" flow files. For example, when a JSON file is split on an array (using the
SplitJson processor), the total count of the splits, the index of each split, and the unique
indentifier (shared by each split) are stored as attributes in each flow file sent to the
"splits" relationship:
> These attributes are the "fragment.*" attributes in the documentation for SplitText,
SplitXml, and SplitJson, for example.
> Such a processor could perform these operations for each flow file split from the original
document, and when all documents from a split have been processed, a flow file could be transferred
to an "aggregate" relationship containing attributes for the operation, aggregate value, etc.
> An interesting application of this (besides the actual aggregation operations) is that
you can use the "aggregate" relationship as an event trigger. For example if you need to wait
until all files from a group are processed, you can use AggregateValues and the "aggregate"
relationship to indicate downstream that the entire group has been processed. If there is
not a Split processor upstream, then the attributes (fragment.*) would have to be manipulated
by the data flow designer, but this can be accomplished with other processors (including the
scripting processors if necessary). 

This message was sent by Atlassian JIRA

View raw message