nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Wagner <swag...@beenverified.com>
Subject Re: Options for increasing performance?
Date Fri, 07 Apr 2017 16:57:32 GMT
Jim,

     Here's the full script with unnecessary business logic removed:

flowFiles = session.get(10)
for flowFile in flowFiles:
     if flowFile is None:
         continue
     s3_bucket = flowFile.getAttribute('job.s3_bucket')
     s3_path = flowFile.getAttribute('job.s3_path')
     # More stuff here....
     errors = []
     # More stuff here...
     if len(errors) > 0:
         flowFile = session.putAttribute(flowFile, 'job.error', 
';'.join(errors))
         session.transfer(flowFile, REL_FAILURE)
     else:
         flowFile = session.putAttribute(flowFile, 
'job.number_csv_files', str(len(matches)))
         flowFile = session.putAttribute(flowFile, 
'job.total_file_size', str(total_size))
         session.transfer(flowFile, REL_SUCCESS)

     I'm not calling session.commit anywhere.

     Here's another script (this one is the full file - no business 
secrets in here!) that creates N number of flowfiles from an input file 
based on the attributes defining a numeric range:

import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil


flowFiles = session.get(10)
for flowFile in flowFiles:
     if flowFile is None:
         continue
     start = int(flowFile.getAttribute('range.start'))
     stop = int(flowFile.getAttribute('range.stop'))
     increment = int(flowFile.getAttribute('range.increment'))
     for x in range(start, stop + 1, increment):
         newFlowFile = session.clone(flowFile)
         newFlowFile = session.putAttribute(newFlowFile, 'current', str(x))
         session.transfer(newFlowFile, REL_SUCCESS)
     session.remove(flowFile)

     I hope these examples are helpful.

- Scott
> James McMahon <mailto:jsmcmahon3@gmail.com>
> Friday, April 7, 2017 11:22 AM
> Scott, how did you refine your session.transfer and session.commit 
> when you introduced the for loop?
>
> I am getting a "transfer relationship not specified" when I move my 
> transfer and my commit into the "for flowFile" loop. Can you show the 
> bottom closure to your # Do stuff here? Thank you sir.
>
> Jim
>
>
> Scott Wagner <mailto:swagner@beenverified.com>
> Wednesday, April 5, 2017 3:26 PM
> One of my experiences is that when using ExecuteScript and Python is 
> that having an ExecuteScript that works on an individual FlowFile when 
> you have multiple in the input queue is very inefficient, even when 
> you set it to a timer of 0 sec.
>
> Instead, I have the following in all of my Python scripts:
>
> flowFiles = session.get(10)
> for flowFile in flowFiles:
>     if flowFile is None:
>         continue
>     # Do stuff here
>
> That seems to improve the throughput of the ExecuteScript processor 
> dramatically.
>
> YMMV
>
> - Scott
>
> James McMahon <mailto:jsmcmahon3@gmail.com>
> Wednesday, April 5, 2017 12:48 PM
> I am receiving POSTs from a Pentaho process, delivering files to my 
> NiFi 0.7.x workflow HandleHttpRequest processor. That processor hands 
> the flowfile off to an ExecuteScript processor that runs a python 
> script. This script is very, very simple: it takes an incoming JSO 
> object and loads it into a Python dictionary, and verifies the 
> presence of required fields using simple has_key checks on the 
> dictionary. There are only eight fields in the incoming JSON object.
>
> The throughput for these two processes is not exceeding 100-150 files 
> in five minutes. It seems very slow in light of the minimal processing 
> going on in these two steps.
>
> I notice that there are configuration operations seemingly related to 
> optimizing performance. "Concurrent tasks", for example,  is only set 
> by default to 1 for each processor.
>
> What performance optimizations at the processor level do users 
> recommend? Is it advisable to crank up the concurrent tasks for a 
> processor, and is there an optimal performance point beyond which you 
> should not crank up that value? Are there trade-offs?
>
> I am particularly interested in optimizations for HandleHttpRequest 
> and ExecuteScript processors.
>
> Thanks in advance for your thoughts.
>
> cheers,
>
> Jim


Mime
View raw message