nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Burgess <mattyb...@apache.org>
Subject Re: A bag of groovy questions regarding the ExecuteScript processor
Date Thu, 05 Oct 2017 19:02:29 GMT
Giovanni,

For the session.each(), a rollback will occur if an exception is not
caught. ExecuteScript catches anything bad that happens from the
script itself, and will perform the rollback. If you catch your own
exceptions, you have much more control over how the flow files (and
the overall session) are handled.

For ScriptTester, it matters how you use it, if you are running from a
Gradle build, I would've thought having just the bintray repo would
work, but perhaps not.  On a different note, I'm adding the ability to
specify an attributes file (actually a path to a Java .properties
file) on the ScriptTester command line, and I upgraded all the
components to NiFi 1.4.0.  I'll be doing some more testing (I do these
things at lunch and after everyone else goes to bed), but I hope to
have a new version up soon.

For the fat JAR, you're definitely better off putting all the
non-changing stuff into a directory and/or its own fat JAR, and just
rebuilding your code that changes into its own JAR.

For the reloading, I have a couple trivial examples of
InvokeScriptedProcessor on my blog, but one is more of a transitional
one, that lets you paste your ExecuteScript code into an
InvokeScriptedProcessor script body (using the provided boilerplate),
then you can experiment with the API from there [1].  Basically
ExecuteScript is a wrapper around the onTrigger() method, where
InvokeScriptedProcessor gives you more access to the processor
lifecycle and API.

Regards,
Matt

[1] http://funnifi.blogspot.in/2017/06/invokescriptedprocessor-template-faster.html


On Wed, Oct 4, 2017 at 5:36 PM, Giovanni Lanzani
<giovannilanzani@godatadriven.com> wrote:
> Hi Matt,
>
> Thanks for the answers.
>
> session.get(N).each) Good to know, I thought a roll-back was inevitable with
> uncatched exceptions;
>
> ScriptTester) Since you're here: I've could only get the script to download
> when adding this to the repositories in the .build
>
>     maven {
>         url 'http://dl.bintray.com/mattyb149/maven/'
>     }
>
> Is that how it's supposed to work?
>
> fatJar) I've actually saw that with Gradle you can easily do something like
> this
>
> shadowJar {
>    dependencies {
>       exclude(dependency('org.codehaus.groovy:.*'))
>       exclude(dependency('commons-.*:.*'))
>    }
> }
>
> That way the fat jar will be much smaller but still executable by NiFi.
> Without that a 15kb jar ends up being a 8mb fat jar.
>
> on-the-fly-reload) I'd rather hack the API that doing that :) Are there any
> pointers/examples for this InvokeScriptedProcessor? It seems all rather new
> and esoteric by looking at its docs.
>
> Cheers,
>
> Giovanni
>
> On 4 Oct 2017, at 18:33, Matt Burgess wrote:
>
> Giovanni,
>
> I second all of Andy's answers, they are spot-on. For the each()
> construct, they are "safe" in the sense that you will be working with
> one flow file at a time, but remember that there is only one
> "session". If you throw an Exception from inside the each(), then it
> will be caught by ExecuteScript (if not caught by your script), and
> the entire session will be rolled back. You are probably better off
> with the approach you outlined where you wrap the logic in the
> try/catch and route to success/failure accordingly... unless an error
> indicates a "retry all", then a rollback is likely what you want.
>
> For the ScriptTester, I haven't yet added support for setting
> attributes on incoming flow file(s), I am trying to think of a clean
> way to allow them for arbitrary flow files such as when the --input
> switch is specified. Suggestions are welcome :) For the first go-round
> I might allow something such that attributes would be added to all
> flow files, or at least for one coming in via STDIN.
>
> For the single fat/shaded JAR, you can certainly do things that way,
> but if you are using Groovy, Clojure, or Javascript/Nashorn, you can
> put all the JARs in a single directory (not nested!) and just add the
> directory to your Module Directory property. That might save you a
> build/package step. Doesn't help with reloading though.
>
> For the on-the-fly reload of an updated fat JAR, you could (at the
> expense of performance) have the script load the JAR. At that point
> you'd probably be better served with InvokeScriptedProcessor so you
> could add a FileWatcher at startup, and reload the JAR from a separate
> thread when changes are detected. In either case I believe you'd be
> looking at creating a URLClassLoader with your fat JAR as the only
> URL, and the current ClassLoader as its parent. Then you can set the
> Thread's context classloader to the new one, and/or you may need to do
> some more classloading voodoo.
>
> Not sure if I covered all your questions/comments, but if not please
> let me know and I will try again :)
>
> Regards,
> Matt
>
>
> On Wed, Oct 4, 2017 at 3:18 AM, Giovanni Lanzani
> <giovannilanzani@godatadriven.com> wrote:
>
> Hi Andy,
>
> That's very helpful, thanks! Inline my comments, waiting for Matt to come
> home :)
>
> On 3 Oct 2017, at 22:44, Andy LoPresto wrote:
>
> Giovanni,
>
> A lot of great questions here. I’ll try to go through them but I hope Matt
> weighs in as well (he is on vacation for the next few days though).
>
> * The only time I am aware the Jars are reloaded is at processor restart (I
> believe this is the same for the script content if defined by a referenced
> file as well). The scriptingComponentHelper setup*() methods execute inside
> ExecuteScript#setup(), which has @OnScheduled annotation [1].
>
> Is there anyone that has written sort of script (I don't know if it is
> possible) to query the NiFi API for all the (Groovy ExecuteScript)
> processors using a particular module directory (we plan to use a single one
> for everything), so that I could add a new step, after the shadowJar
> deployment, that restarts all of them?
>
> I imagine this would be a fairly common use case. We're I'm currently
> working we have the following workflow:
>
> Have a single jar with all the code that the groovy scripts will need;
> The groovy scripts will use that code with minimal boilerplate around it, so
> all the (non-NiFi) related code is in the jar. This makes it very easy to
> test the logic in the jar. We added some extra code to ensure the functions
> that the groovy scripts will call are "NiFi compatible" (right now it's just
> .getBytes(StandardCharsets.UTF_8)) We don't use Matt framework because we
> need incoming flowFile to have attributes, and I couldn't figure out how to
> do it :)
> NiFi has a flow to fetch new master updates on the repo and compile the
> (fat) jar as a result. However we would need to restart the ExecuteScript
> processors by hand and... no/no? :) A script would help greatly here (if
> nobody has one, I will dig into the API to see what's possible. I might just
> parse the whole xml file if there's no way to do so via the API;
>
> * I’m not sure how other users bundle their dependencies, but shadow Jars
> would be fine for this use case, and Matt has referenced using them in his
> script-tester article [2].
> * Yes, while there are small idiosyncrasies with each language flavor, the
> NiFi-related domain is fairly consistent. In this case, iterating over a
> number of flowfiles for processing in a single Groovy script is fine.
> Session.get(int) [3] is delegated to ProcessSession and returns
> List<FlowFile>, so you can use any of the Groovy collections methods over
> it.
>
> So what happens in this case
>
> def n = 0
> session.get(N).each{ flowFile ->
> if(n ==0) {
> //do something
> } else {
> throw Exception
> }
> session.transfer(flowFile, REL_SUCCESS)
> n += 1
> }
>
> Will the first flowFile be successfully transferred or will a rollback
> happen? (Note: I usually wrap the logic in try/catch and then, based on the
> result, transfer the file to REL_SUCCESS/REL_FAILURE
>
> Thanks again,
>
> Giovanni
>
> Hopefully this helps you and if Matt or anyone else sees a mistake, they
> correct it and add their thoughts. Thanks.
>
> [1]
> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#onscheduled
> [2]
> https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html
> <https://funnifi.blogspot.com/2016/06/testing-executescript-processor-scripts.html>
> [3]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1520
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L1520>
>
>
>
> Andy LoPresto
> alopresto@apache.org
> alopresto.apache@gmail.com
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>
> On Oct 3, 2017, at 1:09 PM, Giovanni Lanzani
> <giovannilanzani@godatadriven.com> wrote:
>
> I apologize if this is specified elsewhere, but I couldn't find it.
>
> I was wondering when the jars, used by a particular Groovy script (in the
> ExecuteScript processor), are reloaded. I.e. if one jar is updated, when
> will the script pick up the new version? I know that upon restarting the
> processor, the updated jar is considered, but I was wondering in which other
> occasions that happens;
> Do people tend to use fat (shadow) jars for this sort of jars referenced by
> groovy scripts? I don't think it makes sense to keep track of all the
> dependencies manually otherwise;
> When using the {P,J}ython processor, I read Matt advice to use the following
> construct in the script:
> for flowFile in session.get(N):
> if flowFile:
> # do your thing here
> Does the same hold for Groovy, i.e. should someone do
>
> session.get(N).each{ flowFile ->
> // do your thing here
> if(condition) {
> session.transfer(flowFile, REL_SUCCESS)
> } else {
> session.transfer(flowFile, REL_FAILURE)}
>
> }
> Is this approach safe in groovy inside a each? Or is this approach not
> needed at all in Groovy, while it is needed in {P,J}ython?
>
> Thanks in advance!
>
> Giovanni

Mime
View raw message