nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-210) Provide an ExecuteScript processor
Date Mon, 01 Feb 2016 22:00:40 GMT

    [ https://issues.apache.org/jira/browse/NIFI-210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127132#comment-15127132
] 

ASF GitHub Bot commented on NIFI-210:
-------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/185#discussion_r51488029
  
    --- Diff: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java
---
    @@ -0,0 +1,606 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.script;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.ControllerServiceLookup;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Processor;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
    +import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
    +
    +import javax.script.Invocable;
    +import java.io.FileInputStream;
    +import java.nio.file.Paths;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +@Tags({"script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript",
"js", "lua", "luaj"})
    +@CapabilityDescription("Experimental - Invokes a script engine for a Processor defined
in the given script. The script must define "
    +        + "a valid class that implements the Processor interface, and it must set a variable
'processor' to an instance of "
    +        + "the class. Processor methods such as onTrigger() will be delegated to the
scripted Processor instance. Also any "
    +        + "Relationships or PropertyDescriptors defined by the scripted processor will
be added to the configuration dialog.  "
    +        + "Experimental: Impact of sustained usage not yet verified.")
    +@DynamicProperty(name = "A script engine property to update", value = "The value to set
it to", supportsExpressionLanguage = true,
    +        description = "Updates a script engine property specified by the Dynamic Property's
key with the value specified by the Dynamic Property's value")
    +@SeeAlso({ExecuteScript.class})
    +public class InvokeScriptedProcessor extends AbstractScriptProcessor {
    +
    +    private final AtomicReference<Processor> processor = new AtomicReference<>();
    +    private final AtomicReference<Collection<ValidationResult>> validationResults
=
    +            new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
    +
    +    private final Lock lock = new ReentrantLock();
    +    private SynchronousFileWatcher scriptWatcher;
    +
    +    private ScheduledExecutorService reloadService = null;
    +
    +    /**
    +     * Creates the resources needed by this processor. An attempt is made to also initialize
the scripted processor,
    +     * but unless the properties (such as script engine name and script file path) have
already been specified, the
    +     * script will not yet have been evaluated, so the script's initialize() method will
not be called.
    +     */
    +    protected void createResources() {
    +
    +        // Set up script file reloader service. This checks to see if the script file
has changed, and if so, tries
    +        // to reload it
    +        if (reloadService == null) {
    +            reloadService = Executors.newScheduledThreadPool(1);
    +
    +            // monitor the script if configured for changes
    +            reloadService.scheduleWithFixedDelay(new Runnable() {
    +                @Override
    +                public void run() {
    +                    try {
    +                        final boolean hasLock = lock.tryLock();
    +
    +                        // if a property is changing we don't need to reload this iteration
    +                        if (hasLock) {
    +                            try {
    +                                if (scriptWatcher != null && scriptWatcher.checkAndReset())
{
    +                                    if (isFile(scriptPath)) {
    +                                        // reload the actual script
    +                                        final boolean reloaded = reloadScriptFile(scriptPath);
    +
    +                                        // log the script was reloaded
    +                                        if (reloaded) {
    +                                            getLogger().info("The configured script has
been successfully reloaded.");
    +                                        }
    +                                    }
    +                                }
    +                            } finally {
    +                                lock.unlock();
    +                            }
    +                        }
    +                    } catch (final Throwable t) {
    +                        final ProcessorLog logger = getLogger();
    +                        final String message = "Unable to reload configured script Processor:
" + t;
    +
    +                        logger.error(message);
    +                        if (logger.isDebugEnabled()) {
    +                            logger.error(message, t);
    +                        }
    +                    }
    +                }
    +            }, 30, 10, TimeUnit.SECONDS);
    +        }
    +
    +        super.createResources();
    +    }
    +
    +    /**
    +     * Returns the valid relationships for this processor. SUCCESS and FAILURE are always
returned, and if the script
    +     * processor has defined additional relationships, those will be added as well.
    +     *
    +     * @return a Set of Relationships supported by this processor
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        final Processor instance = processor.get();
    +        if (instance != null) {
    +            try {
    +                relationships.addAll(instance.getRelationships());
    +            } catch (final Throwable t) {
    +                final ProcessorLog logger = getLogger();
    +                final String message = "Unable to get relationships from scripted Processor:
" + t;
    +
    +                logger.error(message);
    +                if (logger.isDebugEnabled()) {
    +                    logger.error(message, t);
    +                }
    +            }
    +        } else {
    +            // Return defaults for now
    +            relationships.add(REL_SUCCESS);
    +            relationships.add(REL_FAILURE);
    +        }
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Returns a list of property descriptors supported by this processor. The list always
includes properties such as
    +     * script engine name, script file name, script body name, script arguments, and
an external module path. If the
    +     * scripted processor also defines supported properties, those are added to the list
as well.
    +     *
    +     * @return a List of PropertyDescriptor objects supported by this processor
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +
    +        synchronized (isInitialized) {
    +            if (!isInitialized.get()) {
    +                createResources();
    +            }
    +        }
    +        List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
    +        supportedPropertyDescriptors.addAll(descriptors);
    +
    +        final Processor instance = processor.get();
    +        if (instance != null) {
    +            try {
    +                final List<PropertyDescriptor> instanceDescriptors = instance.getPropertyDescriptors();
    +                if (instanceDescriptors != null) {
    +                    supportedPropertyDescriptors.addAll(instanceDescriptors);
    +                }
    +            } catch (final Throwable t) {
    +                final ProcessorLog logger = getLogger();
    +                final String message = "Unable to get property descriptors from Processor:
" + t;
    +
    +                logger.error(message);
    +                if (logger.isDebugEnabled()) {
    +                    logger.error(message, t);
    +                }
    +            }
    +        }
    +
    +        return Collections.unmodifiableList(supportedPropertyDescriptors);
    +    }
    +
    +    /**
    +     * Returns a PropertyDescriptor for the given name. This is for the user to be able
to define their own properties
    +     * which will be available as variables in the script
    +     *
    +     * @param propertyDescriptorName used to lookup if any property descriptors exist
for that name
    +     * @return a PropertyDescriptor object corresponding to the specified dynamic property
name
    +     */
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .required(false)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    /**
    +     * Performs setup operations when the processor is scheduled to run. This includes
evaluating the processor's
    +     * properties, as well as reloading the script (from file or the "Script Body" property)
    +     *
    +     * @param context the context in which to perform the setup operations
    +     */
    +    @OnScheduled
    +    @Override
    +    public void setup(final ProcessContext context) {
    +        super.setup(context);
    +
    +        if (processor.get() == null) {
    +            if (isFile(scriptPath)) {
    +                reloadScriptFile(scriptPath);
    +            } else {
    +                reloadScriptBody(scriptBody);
    +            }
    +        }
    +    }
    +
    +
    +    /**
    +     * Handles changes to this processor's properties. If changes are made to script-
or engine-related properties,
    +     * the script will be reloaded.
    +     *
    +     * @param descriptor of the modified property
    +     * @param oldValue   non-null property value (previous)
    +     * @param newValue   the new property value or if null indicates the property
    +     */
    +    @Override
    +    public void onPropertyModified(final PropertyDescriptor descriptor, final String
oldValue, final String newValue) {
    +        final ProcessorLog logger = getLogger();
    +        final Processor instance = processor.get();
    +
    +        if (SCRIPT_FILE.equals(descriptor)
    +                || SCRIPT_BODY.equals(descriptor)
    +                || MODULES.equals(descriptor)
    +                || SCRIPT_ENGINE.equals(descriptor)) {
    +            lock.lock();
    +            try {
    +                // if the script is changing we'll want to reload the instance
    +                if (SCRIPT_FILE.equals(descriptor)) {
    +                    if (isFile(newValue)) {
    +                        reloadScriptFile(newValue);
    +
    +                        // we're attempted to load the script so we need to watch for
updates
    +                        scriptWatcher = new SynchronousFileWatcher(Paths.get(newValue),
new LastModifiedMonitor());
    +                    } else {
    +                        // the doesn't appear to be a file
    +                        scriptWatcher = null;
    +                    }
    +
    +                    // always want to record the configured value
    +                    scriptPath = newValue;
    +                } else if (SCRIPT_BODY.equals(descriptor)) {
    +
    +                    if (reloadScriptBody(newValue)) {
    +                        // always want to record the configured value
    +                        scriptBody = newValue;
    +                    }
    +                } else if (MODULES.equals(descriptor)) {
    +
    +                    // temporarily set new value (will be restored to oldValue if something
goes wrong)
    +                    modulePath = newValue;
    +                    try {
    +                        setupEngine();
    +
    +                        boolean reloaded = false;
    +
    +                        // we only want to reload during a module change if the script
is already loaded
    +                        if (scriptPath != null || scriptBody != null) {
    +                            if (isFile(scriptPath)) {
    +                                // reload the script
    +                                reloaded = reloadScriptFile(scriptPath);
    +                            } else if (scriptBody != null) {
    +                                reloaded = reloadScriptBody(scriptBody);
    +                            }
    +                            // log the script was reloaded
    +                            if (reloaded) {
    +                                logger.info("The configured script has been successfully
reloaded.");
    +                            } else {
    +                                throw new ProcessException("The configured script could
not be reloaded");
    +                            }
    +                        }
    +                    } catch (Throwable t) {
    +                        modulePath = oldValue;
    +                        logger.error(t.getLocalizedMessage(), t);
    +                    }
    +
    +                } else if (SCRIPT_ENGINE.equals(descriptor)) {
    +                    // The script engine has changed, so we need to set up a new instance
for the selected
    +                    // engine name
    +                    scriptEngineName = newValue;
    +                    setupEngine();
    +                }
    +            } finally {
    +                lock.unlock();
    +            }
    +        } else if (instance != null) {
    +            // If the script provides a Processor, call its onPropertyModified() method
    +            try {
    +                instance.onPropertyModified(descriptor, oldValue, newValue);
    +            } catch (final Exception e) {
    +                final String message = "Unable to invoke onPropertyModified from script
Processor: " + e;
    +                logger.error(message, e);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Reloads the script located at the given path
    +     *
    +     * @param scriptPath the path to the script file to be loaded
    +     * @return true if the script was loaded successfully; false otherwise
    +     */
    +    private boolean reloadScriptFile(final String scriptPath) {
    +        final Collection<ValidationResult> results = new HashSet<>();
    +
    +        try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
    +            return reloadScript(IOUtils.toString(scriptStream));
    +
    +        } catch (final Exception e) {
    +            final ProcessorLog logger = getLogger();
    +            final String message = "Unable to load script: " + e;
    +
    +            // If the module path has not yet been set, then this script is likely being
loaded too early and depends
    +            // on modules the processor does not yet know about. If this is the case,
it will be reloaded later on
    +            // property change (modules) or when scheduled
    +            if (modulePath != null) {
    +                logger.error(message, e);
    +                results.add(new ValidationResult.Builder()
    +                        .subject("ScriptValidation")
    +                        .valid(false)
    +                        .explanation("Unable to load script due to " + e)
    +                        .input(scriptPath)
    +                        .build());
    +            }
    +        }
    +
    +        // store the updated validation results
    +        validationResults.set(results);
    +
    +        // return whether there was any issues loading the configured script
    +        return results.isEmpty();
    +    }
    +
    +    /**
    +     * Reloads the script defined by the given string
    +     *
    +     * @param scriptBody the contents of the script to be loaded
    +     * @return true if the script was loaded successfully; false otherwise
    +     */
    +    private boolean reloadScriptBody(final String scriptBody) {
    +        final Collection<ValidationResult> results = new HashSet<>();
    +        try {
    +            return reloadScript(scriptBody);
    +
    +        } catch (final Exception e) {
    +            final ProcessorLog logger = getLogger();
    +            final String message = "Unable to load script: " + e;
    +
    +            // If the module path has not yet been set, then this script is likely being
loaded too early and depends
    +            // on modules the processor does not yet know about. If this is the case,
it will be reloaded later on
    +            // property change (modules) or when scheduled
    +            if (modulePath != null) {
    --- End diff --
    
    seems like something a bit wonky is happening here.  So if one enters a valid script then
enters a buggy script the original value is set and not being replaced.  Then because if there
is an invalid script and no modulepath the error is being buried.  Definitely need the old
script/processor that was compiled to get wiped out and we need the errors to always make
it to the log/bulletin.


> Provide an ExecuteScript processor
> ----------------------------------
>
>                 Key: NIFI-210
>                 URL: https://issues.apache.org/jira/browse/NIFI-210
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.0.1
>            Reporter: A. Steven Anderson
>            Assignee: Matt Burgess
>              Labels: processor, scala
>             Fix For: 0.5.0
>
>         Attachments: 0001-NIFI-210-few-tweaks-to-drop-static-reference-and-fix.patch,
Fun_with_Scripting_Languages.xml, flow.xml.gz
>
>
> Add latest Scala version support for ExcecuteScript processor.
> Should also support Clojure as per discussion and request on mailing list http://mail-archives.apache.org/mod_mbox/nifi-dev/201506.mbox/%3CCAMpSqch4GK1gnw6M1u8tH6AN8e_miXZN5SNkAeMjBujXYGqJiw%40mail.gmail.com%3E
> UPDATE: The ScriptEngine for Clojure is not being maintained and is not currently available
via Maven Central or a public repository. Recommend adding Clojure as a separate Improvement
Jira case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message