nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-3970) Add CSVRecordLookupService
Date Fri, 08 Sep 2017 18:11:01 GMT

    [ https://issues.apache.org/jira/browse/NIFI-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159039#comment-16159039
] 

ASF GitHub Bot commented on NIFI-3970:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2130#discussion_r137845553
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
---
    @@ -0,0 +1,232 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.lookup;
    +
    +import org.apache.commons.csv.CSVFormat;
    +import org.apache.commons.csv.CSVParser;
    +import org.apache.commons.csv.CSVRecord;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.ControllerServiceInitializationContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
    +import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
    +
    +import java.io.FileNotFoundException;
    +import java.io.FileReader;
    +import java.io.IOException;
    +import java.nio.file.Paths;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
    +@CapabilityDescription("A reloadable CSV file-based lookup service. When the lookup key
is found in the CSV file, the remaining columns are returned as a Record.")
    +public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService
{
    +
    +    private static final String KEY = "key";
    +
    +    private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
    +
    +    public static final PropertyDescriptor CSV_FILE =
    +            new PropertyDescriptor.Builder()
    +                    .name("csv-file")
    +                    .displayName("CSV File")
    +                    .description("A CSV file.")
    +                    .required(true)
    +                    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +                    .expressionLanguageSupported(true)
    +                    .build();
    +
    +    static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
    +            .name("CSV Format")
    +            .description("Specifies which \"format\" the CSV data is in, or specifies
if custom formatting should be used.")
    +            .expressionLanguageSupported(false)
    +            .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e
-> e.toString()).collect(Collectors.toSet()))
    +            .defaultValue(CSVFormat.Predefined.Default.toString())
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
    +            new PropertyDescriptor.Builder()
    +                    .name("lookup-key-column")
    +                    .displayName("Lookup Key Column")
    +                    .description("Lookup key column.")
    +                    .required(true)
    +                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                    .expressionLanguageSupported(true)
    +                    .build();
    +
    +    public static final PropertyDescriptor IGNORE_DUPLICATES =
    +            new PropertyDescriptor.Builder()
    +                    .name("ignore-duplicates")
    +                    .displayName("Ignore Duplicates")
    +                    .description("Ignore duplicate keys for records in the CSV file.")
    +                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +                    .allowableValues("true", "false")
    +                    .defaultValue("true")
    +                    .required(true)
    +                    .build();
    +
    +    private List<PropertyDescriptor> properties;
    +
    +    private volatile ConcurrentMap<String, Map<String, Object>> cache;
    +
    +    private volatile String csvFile;
    +
    +    private volatile CSVFormat csvFormat;
    +
    +    private volatile String lookupKeyColumn;
    +
    +    private volatile boolean ignoreDuplicates;
    +
    +    private volatile SynchronousFileWatcher watcher;
    +
    +    private final ReentrantLock lock = new ReentrantLock();
    +
    +    private void loadCache() throws IllegalStateException, IOException {
    +        if (lock.tryLock()) {
    +            try {
    +                final ComponentLog logger = getLogger();
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("Loading lookup table from file: " + csvFile);
    +                }
    +
    +                final FileReader reader = new FileReader(csvFile);
    +                final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader);
    +                this.cache = new ConcurrentHashMap<>();
    +                for (final CSVRecord record : records) {
    +                    final String key = record.get(lookupKeyColumn);
    +
    +                    if (StringUtils.isBlank(key)) {
    +                        throw new IllegalStateException("Empty lookup key encountered
in: " + csvFile);
    +                    } else if (!ignoreDuplicates && this.cache.containsKey(key))
{
    +                        throw new IllegalStateException("Duplicate lookup key encountered:
" + key + " in " + csvFile);
    +                    } else if (ignoreDuplicates && this.cache.containsKey(key))
{
    +                        logger.warn("Duplicate lookup key encountered: {} in {}", new
Object[]{key, csvFile});
    +                    }
    +
    +                    // Put each key/value pair (except the lookup) into the properties
    +                    final Map<String, Object> properties = new HashMap<>();
    +                    record.toMap().forEach((k, v) -> {
    +                        if (!lookupKeyColumn.equals(k)) {
    +                            properties.put(k, v);
    +                        }
    +                    });
    +                    cache.put(key, properties);
    --- End diff --
    
    We should probably be putting a Record in this map instead of a Map<String, Object>
so that we can avoid creating that Record object every time that we call 'lookup'. This would
also allow us to create the RecordSchema object once and use it for every Record object, rather
than having to create a separate RecordSchema object per record.


> Add CSVRecordLookupService
> --------------------------
>
>                 Key: NIFI-3970
>                 URL: https://issues.apache.org/jira/browse/NIFI-3970
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joey Frazee
>            Assignee: Matt Burgess
>
> PR [#1830|https://github.com/apache/nifi/pull/1830] provides a SimpleCsvFileLookupService.
Since CSV data is tabular, a counterpart CSVRecordLookupService would be useful for using
with multi-criteria lookups and enrichments in LookupRecord and LookupAttribute.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message