nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-1280) Create FilterCSVColumns Processor
Date Wed, 11 May 2016 08:05:12 GMT

    [ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15279752#comment-15279752
] 

ASF GitHub Bot commented on NIFI-1280:
--------------------------------------

Github user ToivoAdams commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/420#discussion_r62806202
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import static java.sql.Types.CHAR;
    +import static java.sql.Types.LONGNVARCHAR;
    +import static java.sql.Types.LONGVARCHAR;
    +import static java.sql.Types.NCHAR;
    +import static java.sql.Types.NVARCHAR;
    +import static java.sql.Types.VARCHAR;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.io.Reader;
    +import java.nio.charset.StandardCharsets;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.calcite.adapter.csv.CsvSchemaFactory2;
    +import org.apache.calcite.jdbc.CalciteConnection;
    +import org.apache.calcite.schema.Schema;
    +import org.apache.calcite.schema.SchemaPlus;
    +import org.apache.commons.lang3.StringEscapeUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +import org.apache.nifi.util.StopWatch;
    +
    +import com.google.common.collect.ImmutableMap;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"xml", "xslt", "transform"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations
are also supported."
    +        + "Columns can be renamed, simple calculations performed, aggregations, etc."
    +        + "SQL select statement is used to specify how CSV data should be transformed."
    +        + "SQL statement follows standard SQL, some restrictions may apply."
    +        + "Successfully transformed CSV data is routed to the 'success' relationship."
    +        + "If transform fails, the original FlowFile is routed to the 'failure' relationship")
    +public class FilterCSVColumns  extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder()
    +            .name("SQL select statement")
    +            .description("SQL select statement specifies how CSV data should be transformed.
"
    +                       + "Sql select should select from CSV.A table")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("The FlowFile with transformed content will be routed to this
relationship")
    +            .build();
    +    public static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("If a FlowFile fails processing for any reason (for example,
the SQL statement contains columns not present in CSV), it will be routed to this relationship")
    +            .build();
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(SQL_SELECT);
    +        this.properties = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final ProcessSession session)
{
    +        final FlowFile original = session.get();
    +        if (original == null) {
    +            return;
    +        }
    +
    +        final ProcessorLog logger = getLogger();
    +        final StopWatch stopWatch = new StopWatch(true);
    +
    +        try {
    +            FlowFile transformed = session.write(original, new StreamCallback() {
    +                @Override
    +                public void process(final InputStream rawIn, final OutputStream out)
throws IOException {
    +                    try (final InputStream in = new BufferedInputStream(rawIn)) {
    +
    +                        String sql = context.getProperty(SQL_SELECT).getValue();
    +                        final ResultSet resultSet = transform(rawIn, sql);
    +                        convertToCSV(resultSet, out);
    +
    +                    } catch (final Exception e) {
    +                        throw new IOException(e);
    +                    }
    +                }
    +            });
    +            session.transfer(transformed, REL_SUCCESS);
    +            session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +            logger.info("Transformed {}", new Object[]{original});
    --- End diff --
    
    Yes


> Create FilterCSVColumns Processor
> ---------------------------------
>
>                 Key: NIFI-1280
>                 URL: https://issues.apache.org/jira/browse/NIFI-1280
>             Project: Apache NiFi
>          Issue Type: Task
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Toivo Adams
>
> We should have a Processor that allows users to easily filter out specific columns from
CSV data. For instance, a user would configure two different properties: "Columns of Interest"
(a comma-separated list of column indexes) and "Filtering Strategy" (Keep Only These Columns,
Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it would be
with this Processor, as the user has to use Regular Expressions, etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a Sample CSV
and choose which columns from there, similar to the way that Excel works when importing CSV
by dragging and selecting the desired columns? That would certainly be a larger undertaking
and would not need to be done for an initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message