nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Witt <>
Subject Re: MergeContent to group related files
Date Fri, 17 Jun 2016 16:53:51 GMT
Sure thing Sumo.  Also if you're interested there are some really well
thought out and articulated 'enterprise integration patterns' that
much of this stuff aligns to.  This correlation concept I'm referring
to relates to this


On Fri, Jun 17, 2016 at 12:28 PM, Sumanth Chinthagunta
<> wrote:
> Hi Joe,
> Thanks a lot helping with the solution. I don’t  understand  before on how correlation-identifier
> I guess MergeContent may be pulling flowFiles with same  correlation-identifier from
queue in a batch.
> I don’t really need 1_N in the file name , this solution should work for my case. I
will try and let you know.
> Thanks
> -Sumo
>> On Jun 17, 2016, at 6:27 AM, Joe Witt <> wrote:
>> Sumo,
>> Should be doable.  The only part that may be tricky is the filename showing 1_N if
that means the whole thing has to retain sequential ordering from source through destination.
>> When merging flowfiles together you need to decide 'how should content be merged'
and 'how should attributes be merged'.  The properties to control that are 'Merge Strategy'
and 'Attribute Strategy' respectively.  For merge strategy you'll want to do binary merge.
 For the attribute strategy the default of keeping only common attributes should likely be
sufficient.  The reason it should be is the information that you'll need for writing to HDFS
then is the common databaseName, tableName, and action.  When merging you'll merge by all
three of these attributes combined.  You can do this by creating an attribute that combines
those three things right after your extract attributes processor.
>> Lets say your extract attributes pulls out 'databaseName', 'tableName' and 'action'.
 If so then put an UpdateAttributes between your extract attributes and MergeContent (or you
could use HashAttribute as well).  In this create an attribute called 'correlatation-identifier'
and give it a value of ${databaseName}-${tableName}-${action}
>> Then in merge content use that correlation-identifier attribute in the 'Correlation
Attribute Name' property.
>> Now, given that you'll be smashing JSON documents together keep in mind the resulting
smashed together thing would not be valid JSON itself.  You'd need to either make sure when
it is merged that the resulting output is also valid JSON which you can do by using MergeContent's
header/footer/demarcator feature.  Or, you need the thing that reads these merged JSON documents
to be able to demarcate them for you.
>> If you want to end up with roughly 64MB bundles and these objects can be quite small
(between say 1 and 10KB) then you'd be bundling around 6000-10000 objects each time and that
is not factoring in compression.  I'd recommend a two phase merge with a GZIP compression
step in between then.  GZIP is nice as it compresses quite fast and it can be safely concatenated.
 So the 'merge step' would really be:
>> - First Merge
>> - GZIP Compress
>> - Final Merge
>> In first merge do bundles of at least 800 objects but no more than 1000 and set an
age kick-out of say 1 minute or whatever is appropriate in your case
>> In GZIP compress set level 1
>> In final merge do bundles of at least 55MB but no more than 64MB with an age kick-out
of say 5 minutes or whatever is appropriate in your case
>> Since the common attributes you needed will be retained in this model you will be
able to write to hdfs using a path of something like '/${databaseName}/${tableName}/${action}/${uuid}.whatever.
>> Now that I got here I just noticed you set 'tar' so presumably you are using tar
merging strategy and most likely this is to address how to keep these objects separate and
avoid the need for header/foot/demarcator/etc..  Good choice as well.
>> There are a lot of ways to slice this up.
>> Thanks
>> Joe
>> On Wed, Jun 15, 2016 at 6:04 PM, Sumanth Chinthagunta < <>>
>> Hi,
>> I have following flow that receives JSON data from Kafka and writes to HDFS.
>> Each flowFile received from Kafka has following attributes and JSON payload.
>> 1.      databaseName = db1 or db2 etc
>> 2.      tableName = customer or address etc
>> 3.      action = [insert, update, delete]
>> My goal is to merge 1000 flowFlies into single file and write to HDFS (because writing
large files into HDFS is more efficient then writing small JSON files.)
>> I also want to write into HDFS folder structure   like:
>> /<databaseName>/<tableName>/<action>/1_1000.tar
>> /<databaseName>/<tableName>/<action>/1000_2000.tar
>> With default MergeContent configuration, I am losing individual flowFile’s attributes
and cannot organize bin files into directory structure. Is it possible to accomplish my goal
with   MergeContent?
>> Thanks
>> -Sumo

View raw message