nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Hamilton <ja...@homecarepulse.com>
Subject RE: InvokeScriptedProcessor
Date Tue, 11 Oct 2016 23:37:34 GMT
Well, I solved it myself - the script was fine, but apparently after some earlier failed attempts
that threw exceptions, the DBCPService was hosed.

SOLUTION: I had to disable and then re-enable the DBCPService

In summary, there are 2 buggy behaviors that have been huge gotchas (aka. hours wasted for
nothing) for me with InvokeScriptedProcessor:

1)      After correcting syntax errors in my property definitions, I had to completely delete
and recreate my InvokeScriptedProcessor instance instead of just updating the text in the
processor - otherwise the error still reoccurs even after fixing the code

2)      After throwing an exception while using a DBCPService I had to disable/reenable the
service - otherwise the Processor still crashes even after fixing the code

Not sure if there's even a way to prevent those from happening, but man is it misleading when
you (correctly) fix your code, start things up again, and the error is still there!  Perhaps
some missing cleanup?  Maybe it's just the nature of the beast.  Anyway, sorry for the bother,
just figured I post this solution for completeness.  Cheers!

[photo]<http://homecarepulse.com/?utm_source=logoLink&utm_medium=Jason&utm_campaign=EmailSignature>

Jason Hamilton
Senior Software Developer
Home Care Pulse, LLC
www.homecarepulse.com<http://homecarepulse.com?utm_source=websiteLink&utm_medium=Jason&utm_campaign=EmailSignature>
208.228.0895 (Direct)<tel:2082280895>
877.307.8573 (Office)<tel:8773078573>


[Linnked In]<http://www.linkedin.com/company/home-care-pulse>[Facebook]<http://facebook.com/homecarepulse>[Twitter]<http://twitter.com/homecarepulse>[Google+]<http://plus.google.com/+Homecarepulse/posts>

Satisfaction Management<http://www.homecarepulse.com/program-details?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Benchmarking<http://benchmarking.homecarepulse.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| BestofHomeCare.com<http://www.bestofhomecare.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Blog<http://www.homecarepulse.com/blog?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>



From: Jason Hamilton
Sent: Tuesday, October 11, 2016 5:10 PM
To: 'dev@nifi.apache.org' <dev@nifi.apache.org>
Subject: RE: InvokeScriptedProcessor

Sorry, it looks like the mailing list hates attachments.  The script is:

import org.apache.nifi.dbcp.DBCPService
import groovy.sql.Sql
import groovy.json.JsonBuilder

class MySQLToJSON implements Processor {

    def REL_SUCCESS = new Relationship.Builder()
                                .name("success")
                                .description("The flowfile with the specified query results
was successfully transferred.")
                                .build();

                // TODO: figure out how to get exception messages to show in the bulletins
or go out to a failure relationship
                def REL_FAILURE = new Relationship.Builder()
                                .name("failure")
                                .description("An error occured while running the specified
query.")
                                .build();

                def DBCP_SERVICE = new PropertyDescriptor.Builder()
                                .name('Database Connection Pooling Service')
                                .description("The Controller Service that is used to obtain
connection to database.")
                                .required(true)
                                .identifiesControllerService(DBCPService.class)
                                .build()

                def DB_QUERY = new PropertyDescriptor.Builder()
                                .name('SQL Query')
                                .description('SQL query to be executed.')
                                .required(true)
                                .expressionLanguageSupported(true)
                                .addValidator(Validator.VALID)
                                .build()

                def FILENAME = new PropertyDescriptor.Builder()
                                .name('File Name')
                                .description('Sets the filename attribute of the flowfile
(do not include the extension).')
                                .required(true)
                                .expressionLanguageSupported(true)
                                .addValidator(Validator.VALID)
                                .build()

                def ComponentLog log

    @Override
    void initialize(ProcessorInitializationContext context) {
        log = context.getLogger()
    }

    @Override

    Set<Relationship> getRelationships() {
        return [REL_SUCCESS, REL_FAILURE] as Set
    }

    @Override
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException
{
        try {

            def session = sessionFactory.createSession()
//            def flowFile = session.get() // use existing flowfile if one is there
            def flowFile = session.create() // create new flowfile if not
                                                log.info("Flowfile created")

                                                def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
                                                log.info("Filename set")

                                                // TODO: Evaluate expression langauge for
getProperty calls
                                                def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class)
                                                log.info("dbcpService set")
                                                def conn = dbcpService.getConnection()
                                                log.info("getConnection() called")
                                                def sql = new Sql(conn)
                                                log.info("Sql(conn) called set")
            def dbResults = sql.rows(context.getProperty(DB_QUERY).value)
                                                log.info("dbResults set")
                                                def jsonResults = new JsonBuilder(dbResults).toPrettyString()
                                                log.info("jsonResults set")
                                                flowFile = session.write(flowFile, { outputStream
->
                                                                                outputStream.write(jsonResults.getBytes('UTF-8'))
                                                                                log.info("outputStream
written")
                                                                } as OutputStreamCallback)

                                                //flowFile = session.putAttribute(flowFile,
"executesql.row.count", dbResults.size())
                                                flowFile = session.putAttribute(flowFile,
"mime.type", "application/json")
                                                log.info("mime.type attribute set")
                                                flowFile = session.putAttribute(flowFile,
"filename", "test")
                                                log.info("filename attribute set")

                                                // transfer
                                                session.transfer(flowFile, REL_SUCCESS)
                                                log.info("session.transfer called")
                                                session.commit()
                                                log.info("session.commit called")
                                }
                                catch (e) {
                                                log.error("Processor error: ${e.getMessage()}")
                                                session.transfer(flowFile, REL_FAILURE)
            session.commit()
        }
    }

    @Override
    Collection<ValidationResult> validate(ValidationContext context) { return null }

    @Override
    PropertyDescriptor getPropertyDescriptor(String name) {
                                switch(name) {
                                                case 'Database Connection Pool Service': return
DBCP_SERVICE
                                                case 'SQL Query': return DB_QUERY
                                                case 'File Name': return FILENAME
                                                default: return null
                                }
                }

    @Override
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue)
{ }

    @Override
    List<PropertyDescriptor> getPropertyDescriptors() {
                                return [DBCP_SERVICE, DB_QUERY, FILENAME] as List
                }

    @Override
    String getIdentifier() { return 'MySQLToJSON-InvokeScriptedProcessor' }
}

processor = new MySQLToJSON()

[photo]<http://homecarepulse.com/?utm_source=logoLink&utm_medium=Jason&utm_campaign=EmailSignature>

Jason Hamilton
Senior Software Developer
Home Care Pulse, LLC
www.homecarepulse.com<http://homecarepulse.com?utm_source=websiteLink&utm_medium=Jason&utm_campaign=EmailSignature>
208.228.0895 (Direct)<tel:2082280895>
877.307.8573 (Office)<tel:8773078573>


[Linnked In]<http://www.linkedin.com/company/home-care-pulse>[Facebook]<http://facebook.com/homecarepulse>[Twitter]<http://twitter.com/homecarepulse>[Google+]<http://plus.google.com/+Homecarepulse/posts>

Satisfaction Management<http://www.homecarepulse.com/program-details?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Benchmarking<http://benchmarking.homecarepulse.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| BestofHomeCare.com<http://www.bestofhomecare.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Blog<http://www.homecarepulse.com/blog?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>



From: Jason Hamilton [mailto:jason@homecarepulse.com]
Sent: Tuesday, October 11, 2016 5:02 PM
To: dev@nifi.apache.org<mailto:dev@nifi.apache.org>
Subject: InvokeScriptedProcessor

Hello everyone,

I have been struggling all day trying to make a simple MySQL to JSON processor (avoids the
nasty issues with type casting MySQL numerics to Avro just to go to JSON anyway) in Groovy
(see attached file).  There's more interesting processors I need to make but this is my starting
point.  I have carefully used the following resources:
http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html
http://funnifi.blogspot.com/2016_02_01_archive.html

I am using Nifi 1.0.0 download as a binary from the website on a clean install of CentOS 7
x64 with Oracle JDK 8.

I have verified the database pool works by running the same test query from an ExecuteSQL
Processor and it works.  From what I can tell the connection is even established from the
script, but for the life of me I can't figure why it is now getting the following useless
error:

---
InvokeScriptedProcessor[id=b5e567cf-0157-1000-cea6-4dc6cbaed0e3] InvokeScriptedProcessor[id=b5e567cf-0157-1000-cea6-4dc6cbaed0e3]
failed to process session due to java.lang.reflect.UndeclaredThrowableException: java.lang.reflect.UndeclaredThrowableException
---

I have debugging log.info lines on every call to see where it breaks down, and the results
seem inconsistent - I've seen it log the  "filename attribute set" clear at the bottom, but
no data! What silly thing am I missing here?

[photo]<http://homecarepulse.com/?utm_source=logoLink&utm_medium=Jason&utm_campaign=EmailSignature>

Jason Hamilton
Senior Software Developer
Home Care Pulse, LLC
www.homecarepulse.com<http://homecarepulse.com?utm_source=websiteLink&utm_medium=Jason&utm_campaign=EmailSignature>
208.228.0895 (Direct)<tel:2082280895>
877.307.8573 (Office)<tel:8773078573>


[Linnked In]<http://www.linkedin.com/company/home-care-pulse>[Facebook]<http://facebook.com/homecarepulse>[Twitter]<http://twitter.com/homecarepulse>[Google+]<http://plus.google.com/+Homecarepulse/posts>

Satisfaction Management<http://www.homecarepulse.com/program-details?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Benchmarking<http://benchmarking.homecarepulse.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| BestofHomeCare.com<http://www.bestofhomecare.com?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>
| Blog<http://www.homecarepulse.com/blog?utm_source=bottomLinks&utm_medium=Jason&utm_campaign=EmailSignature>




Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message