nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-210) Provide an ExecuteScript processor
Date Tue, 26 Jan 2016 15:31:40 GMT

    [ https://issues.apache.org/jira/browse/NIFI-210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15117386#comment-15117386
] 

ASF GitHub Bot commented on NIFI-210:
-------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/185#discussion_r50849585
  
    --- Diff: nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
---
    @@ -0,0 +1,585 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.script;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.ByteArrayInputStream;
    +import org.apache.nifi.util.StopWatch;
    +import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
    +import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
    +
    +import javax.script.Bindings;
    +import javax.script.Compilable;
    +import javax.script.CompiledScript;
    +import javax.script.ScriptContext;
    +import javax.script.ScriptEngine;
    +import javax.script.ScriptEngineFactory;
    +import javax.script.ScriptEngineManager;
    +import javax.script.ScriptException;
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.Reader;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.nio.file.Files;
    +import java.nio.file.Paths;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.ServiceLoader;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript",
"js", "lua", "luaj", "scala"})
    +@CapabilityDescription("Executes a script given the flow file and a process session.
 The script is responsible for "
    +        + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well
as any flow files created by "
    +        + "the script. If the handling is incomplete or incorrect, the session will be
rolled back.")
    +@DynamicProperty(
    +        name = "A script engine property to update",
    +        value = "The value to set it to",
    +        supportsExpressionLanguage = true,
    +        description = "Updates a script engine property specified by the Dynamic Property's
key with the value "
    +                + "specified by the Dynamic Property's value")
    +public class ExecuteScript extends AbstractProcessor {
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles that were successfully processed")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that were failed to process")
    +            .build();
    +
    +    public static PropertyDescriptor SCRIPT_ENGINE;
    +
    +    public static final PropertyDescriptor SCRIPT_FILE = new PropertyDescriptor.Builder()
    +            .name("Script File")
    +            .required(false)
    +            .description("Path to script file to execute. Use either file or body not
both")
    +            .addValidator(new StandardValidators.FileExistsValidator(true))
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor SCRIPT_BODY = new PropertyDescriptor.Builder()
    +            .name("Script Body")
    +            .required(false)
    +            .description("Body to script to execute. Use either file or body not both")
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor SCRIPT_ARGS = new PropertyDescriptor.Builder()
    +            .name("Arguments")
    +            .required(false)
    +            .description("Arguments to pass to scripting engine")
    +            .addValidator(Validator.VALID)
    +            .expressionLanguageSupported(true)
    +            .defaultValue("")
    +            .build();
    +
    +    public static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder()
    +            .name("Module Directory")
    +            .description("Path to a directory which contains modules required by the
script script.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(new StandardValidators.DirectoryExistsValidator(true, false))
    +            .build();
    +
    +    // A map from engine name to a custom configurator for that engine
    +    private final Map<String, ScriptEngineConfigurator> scriptEngineConfiguratorMap
= new ConcurrentHashMap<>();
    +
    +    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
    +    private final Lock lock = new ReentrantLock();
    +    private SynchronousFileWatcher scriptWatcher;
    +
    +    private Map<String, ScriptEngineFactory> scriptEngineFactoryMap;
    +    private ScriptEngine scriptEngine;
    +    private String scriptEngineName;
    +    private String scriptPath;
    +    private String scriptBody;
    +    private String modulePath;
    +    private CompiledScript compiledScript;
    +    private final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
    +    private ScheduledExecutorService reloadService;
    +    private List<PropertyDescriptor> descriptors;
    +
    +
    +    /**
    +     * Initializes this processor. A reload service is defined and scheduled, for the
purpose of watching for
    +     * script file changes, which indicates a reload is necessary
    +     *
    +     * @param context in which to perform initialization
    +     */
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +    }
    +
    +
    +    protected void createResources() {
    +
    +        // Set up script file reloader service. This checks to see if the script file
has changed, and if so, marks
    +        // the script file as needing a reload before evaluation
    +        if (reloadService == null) {
    +            reloadService = Executors.newScheduledThreadPool(1);
    +
    +            // monitor the script if configured for changes
    +            reloadService.scheduleWithFixedDelay(new Runnable() {
    +                @Override
    +                public void run() {
    +                    try {
    +                        final boolean hasLock = lock.tryLock();
    +
    +                        // if a property is changing we don't need to reload this iteration
    +                        if (hasLock) {
    +                            try {
    +                                if (scriptWatcher != null && scriptWatcher.checkAndReset())
{
    +                                    if (isFile(scriptPath)) {
    +                                        scriptNeedsReload.set(true);
    +                                    }
    +                                }
    +                            } finally {
    +                                lock.unlock();
    +                            }
    +                        }
    +                    } catch (final Throwable t) {
    +                        final ProcessorLog logger = getLogger();
    +                        final String message = "Unable to reload configured script Processor:
" + t;
    +
    +                        logger.error(message);
    +                        if (logger.isDebugEnabled()) {
    +                            logger.error(message, t);
    +                        }
    +                    }
    +                }
    +            }, 30, 10, TimeUnit.SECONDS);
    +        }
    +
    +        descriptors = new ArrayList<>();
    +
    +        // The following is required for JRuby, should be transparent to everything else.
    +        // Note this is not done in a ScriptEngineConfigurator, as it is too early in
the lifecycle. The
    +        // setting must be there before the factories/engines are loaded.
    +        System.setProperty("org.jruby.embed.localvariable.behavior", "persistent");
    +
    +        // Create list of available engines
    +        ScriptEngineManager scriptEngineManager = new ScriptEngineManager();
    +        List<ScriptEngineFactory> scriptEngineFactories = scriptEngineManager.getEngineFactories();
    +        if (scriptEngineFactories != null) {
    +            scriptEngineFactoryMap = new HashMap<>(scriptEngineFactories.size());
    +            List<AllowableValue> engineList = new LinkedList<>();
    +            for (ScriptEngineFactory factory : scriptEngineFactories) {
    +                engineList.add(new AllowableValue(factory.getLanguageName()));
    +                scriptEngineFactoryMap.put(factory.getLanguageName(), factory);
    +            }
    +
    +            // Sort the list by name so the list always looks the same.
    +            Collections.sort(engineList, new Comparator<AllowableValue>() {
    +                @Override
    +                public int compare(AllowableValue o1, AllowableValue o2) {
    +                    if (o1 == null) {
    +                        return o2 == null ? 0 : 1;
    +                    }
    +                    if (o2 == null) {
    +                        return -1;
    +                    }
    +                    return o1.getValue().compareTo(o2.getValue());
    +                }
    +            });
    +
    +            AllowableValue[] engines = engineList.toArray(new AllowableValue[engineList.size()]);
    +
    +            SCRIPT_ENGINE = new PropertyDescriptor.Builder()
    +                    .name("Script Engine")
    +                    .required(true)
    +                    .description("The engine to execute scripts")
    +                    .allowableValues(engines)
    +                    .defaultValue(engines[0].getValue())
    +                    .required(true)
    +                    .expressionLanguageSupported(false)
    +                    .build();
    +            descriptors.add(SCRIPT_ENGINE);
    +        }
    +
    +        descriptors.add(SCRIPT_FILE);
    +        descriptors.add(SCRIPT_BODY);
    +        descriptors.add(SCRIPT_ARGS);
    +        descriptors.add(MODULES);
    +
    +        isInitialized.set(true);
    +    }
    +
    +    /**
    +     * Returns the valid relationships for this processor.
    +     *
    +     * @return a Set of Relationships supported by this processor
    +     */
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        return Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Returns a list of property descriptors supported by this processor. The list always
includes properties such as
    +     * script engine name, script file name, script body name, script arguments, and
an external module path. If the
    +     * scripted processor also defines supported properties, those are added to the list
as well.
    +     *
    +     * @return a List of PropertyDescriptor objects supported by this processor
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        synchronized (isInitialized) {
    +            if (!isInitialized.get()) {
    +                createResources();
    +            }
    +        }
    +
    +        return Collections.unmodifiableList(descriptors);
    +    }
    +
    +    /**
    +     * Returns a PropertyDescriptor for the given name. This is for the user to be able
to define their own properties
    +     * which will be available as variables in the script
    +     *
    +     * @param propertyDescriptorName used to lookup if any property descriptors exist
for that name
    +     * @return a PropertyDescriptor object corresponding to the specified dynamic property
name
    +     */
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName)
{
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .required(false)
    +                .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
    +                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    /**
    +     * Determines whether the given path refers to a valid file
    +     *
    +     * @param path a path to a file
    +     * @return true if the path refers to a valid file, false otherwise
    +     */
    +    private boolean isFile(final String path) {
    +        return path != null && Files.isRegularFile(Paths.get(path));
    +    }
    +
    +    /**
    +     * Performs setup operations when the processor is scheduled to run. This includes
evaluating the processor's
    +     * properties, as well as reloading the script (from file or the "Script Body" property)
    +     *
    +     * @param context the context in which to perform the setup operations
    +     */
    +    @OnScheduled
    +    public void setup(final ProcessContext context) {
    +        scriptEngineName = context.getProperty(SCRIPT_ENGINE).getValue();
    +        scriptPath = context.getProperty(SCRIPT_FILE).getValue();
    +        scriptBody = context.getProperty(SCRIPT_BODY).getValue();
    +        modulePath = context.getProperty(MODULES).getValue();
    +        setupEngine();
    +    }
    +
    +    /**
    +     * Configures the specified script engine. First, the engine is loaded and instantiated
using the JSR-223
    +     * javax.script APIs. Then, if any script configurators have been defined for this
engine, their init() method is
    +     * called, and the configurator is saved for future calls.
    +     *
    +     * @see org.apache.nifi.processors.script.ScriptEngineConfigurator
    +     */
    +    private void setupEngine() {
    +        ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
    +        try {
    +            ProcessorLog log = getLogger();
    +
    +            // Need the right classloader when the engine is created. This ensures the
NAR's execution class loader
    +            // (plus the module path) becomes the parent for the script engine
    +            ClassLoader scriptEngineModuleClassLoader = createScriptEngineModuleClassLoader(modulePath);
    +            if (scriptEngineModuleClassLoader != null) {
    +                Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
    +            }
    +            scriptEngine = getScriptEngine();
    +            ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
    +                    ServiceLoader.load(ScriptEngineConfigurator.class);
    +            for (ScriptEngineConfigurator configurator : configuratorServiceLoader) {
    +                String configuratorScriptEngineName = configurator.getScriptEngineName();
    +                try {
    +                    if (configuratorScriptEngineName != null
    +                            && configuratorScriptEngineName.equals(scriptEngineName))
{
    +                        configurator.init(scriptEngine, modulePath);
    +                        scriptEngineConfiguratorMap.put(configurator.getScriptEngineName(),
configurator);
    +                    }
    +                } catch (ScriptException se) {
    +                    log.error("Error initializing script engine configurator {}",
    --- End diff --
    
    I recommend just passing the Exception to the logger here and removing the if (log.isDebugEnabled())
... the ProcessorLog itself will handle this, so that the Exception is logged only when debug
is enabled.


> Provide an ExecuteScript processor
> ----------------------------------
>
>                 Key: NIFI-210
>                 URL: https://issues.apache.org/jira/browse/NIFI-210
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.0.1
>            Reporter: A. Steven Anderson
>            Assignee: Matt Burgess
>              Labels: processor, scala
>             Fix For: 0.5.0
>
>         Attachments: 0001-NIFI-210-few-tweaks-to-drop-static-reference-and-fix.patch
>
>
> Add latest Scala version support for ExcecuteScript processor.
> Should also support Clojure as per discussion and request on mailing list http://mail-archives.apache.org/mod_mbox/nifi-dev/201506.mbox/%3CCAMpSqch4GK1gnw6M1u8tH6AN8e_miXZN5SNkAeMjBujXYGqJiw%40mail.gmail.com%3E
> UPDATE: The ScriptEngine for Clojure is not being maintained and is not currently available
via Maven Central or a public repository. Recommend adding Clojure as a separate Improvement
Jira case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message