nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thierry Hanot <>
Subject RE: MergeContent Inquiry
Date Thu, 20 Jul 2017 16:10:40 GMT
Hello Mark
Thanks for your quick answer.
When I say merge content based on a temporal window can be seen as 2 case :
                1 : Do the merge on 5 min temporal ( or any other period) period aligned (with
the beginning of the minute for second , with the beginning of the hour for minute , beginning
of the day for hours …).  What I want to is merging the data coming in by bulk of 5min.
The only relation between the data and the merge is the arrival date. So this use case is
simple as there is only ordered flowfile coming in. It look like an evolution of the merge
processor. Merge by window of 5 min and 5 min windows should be aligned by some rule definition.
                2 : Same as use case 1 but here the date to be used is coming from the data
and there is potential out of order  to handle.

I have thought about using the correlation attribute name (but this won’t handle the out
of order). It may work for the first case by adding an adjusted timestamp as attribute. With
this I’m sure to output flow files with content on the same period, but I still don’t
know how to be sure to have one and only one flowfile for the period ( which is a key requirement
to do the aggregation using the QueryRecords) without inducing some latency ( the only way
I ‘ve found is to put a very large number for min/max flow and put a MawBin age larger than
the wanted period – but this will induce some latency if I want to be safe). Do you have
any other approach to handle that ?

For the schema inference , I totally following you , it does not seems realistic to infer
schema on real time streaming. I was more thinking of tools to help development of flows very
quickly. It’s nice to know you have that in mind even with a very low priority.


From: Mark Payne []
Sent: 20 July 2017 15:00
Subject: Re: MergeContent Inquiry


I'm not sure that I understand what you mean when you say "is there a way to merge content
based on temporal window."
Are you wanting to merge based on a rolling window, or a timestamp in the data? Can you explain
a bit more about what
you want to do in terms of determining which data should go together?

re: QueryRecord, it is not based on Apache Drill. It is based on Apache Calcite. I do believe
that Apache Calcite powers
Drill's SQL engine as well, but Calcite is just the SQL engine and does not do any sort of
schema inference. At present,
you need to provide a schema for the data. If your data is in Avro, you can simply use the
schema embedded in the data.
If the data is in CSV, you can derive the schema automatically from the header line (and assume
that all fields are Strings).
Otherwise, you'll probably need to use the Schema Registry.

I have considered implementing some sort of schema inference processor, but I've not put any
sort of priority on it, simply
because in my experience schema inference is convenient when it works, but almost always some
data will come in that
doesn't adhere properly to the inferred schema and the incorrect inference ends up costing
more time than it would have
taken to simply create the schema in the first place. Additionally, the schema would have
to be inferred for every FlowFile,
meaning quite a lot of overhead and inefficiency in doing that. That said, I do understand
how it would be convenient in some cases,
but I've personally just not been able to prioritize getting something like that done. Certainly
others in the community are
welcome to look into that.


On Jul 20, 2017, at 8:37 AM, Thierry Hanot <<>>

Hello All
Additional question on this subject  , is there a way to merge content based  on temporal
window. The attributeRollingWindows does not help here.
This can allow in my context to build an aggregation layer ( it’s for Telemetry data which
are coming in at different rate and I need to normalize/aggregate those data ) , the flow
may be like this :
                Receive telemetry data
                Merge content based on the type of data and a temporal windows
                Aggregate using QueryRecord to aggregate the bulk of data : Normally this
should be effective as it’s done per bulk .
                Then stream the result out ( backend / Mom … )

Of course all the aggregation should dynamic by merging and generating the query based on
attributes qualifying the type of the data and the aggregation which need to be done.
Additional question : If I understand correctly , QueryRecord is based on Drill , and Drill
allow to automatically infer the schema from JSON File. Is there a way to use this feature
without going thru the SchemaRepository ?

Thanks in advance.

Thierry Hanot

From: James McMahon []
Sent: 20 July 2017 14:04
Subject: Re: MergeContent Inquiry

Outstanding. Thank you very much Joe.

On Thu, Jul 20, 2017 at 8:00 AM, Joe Witt <<>>
Yep.  Very common.  Set the desired size or number of object targets
and set the 'Max Bin Age' so that it will kick out whatever you've got
by that time.

On Thu, Jul 20, 2017 at 7:38 AM, James McMahon <<>>
> Good morning. I have a situation where I have a staging directory into which
> may be dropped a small number or a large multitude of files. My customer
> wants me to package these up - but in a size range. I see that MergeContent
> allows me to set a MinimumGroupSize and a MaximumGroupSize.
> If all the files total less than the MinimumGroupSize in MB, would
> MergeContent take no action until enough files arrived to cross the minimum
> threshold - ie, would it just sit and wait? Is it possible to combine the
> size thresholds with a time parameter so that if X time passes and no new
> files appear, the package is created despite falling short of the minimum
> size threshold?
> Thanks in advance once again for any insights. -Jim

View raw message