[ https://issues.apache.org/jira/browse/NIFI-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159045#comment-16159045
]
ASF GitHub Bot commented on NIFI-3970:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2130#discussion_r137853778
--- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
+@CapabilityDescription("A reloadable CSV file-based lookup service. When the lookup key
is found in the CSV file, the remaining columns are returned as a Record.")
+public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService
{
+
+ private static final String KEY = "key";
+
+ private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+ public static final PropertyDescriptor CSV_FILE =
+ new PropertyDescriptor.Builder()
+ .name("csv-file")
+ .displayName("CSV File")
+ .description("A CSV file.")
+ .required(true)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
+ .name("CSV Format")
+ .description("Specifies which \"format\" the CSV data is in, or specifies
if custom formatting should be used.")
+ .expressionLanguageSupported(false)
+ .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e
-> e.toString()).collect(Collectors.toSet()))
+ .defaultValue(CSVFormat.Predefined.Default.toString())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
+ new PropertyDescriptor.Builder()
+ .name("lookup-key-column")
+ .displayName("Lookup Key Column")
+ .description("Lookup key column.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DUPLICATES =
+ new PropertyDescriptor.Builder()
+ .name("ignore-duplicates")
+ .displayName("Ignore Duplicates")
+ .description("Ignore duplicate keys for records in the CSV file.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ private List<PropertyDescriptor> properties;
+
+ private volatile ConcurrentMap<String, Map<String, Object>> cache;
+
+ private volatile String csvFile;
+
+ private volatile CSVFormat csvFormat;
+
+ private volatile String lookupKeyColumn;
+
+ private volatile boolean ignoreDuplicates;
+
+ private volatile SynchronousFileWatcher watcher;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private void loadCache() throws IllegalStateException, IOException {
+ if (lock.tryLock()) {
+ try {
+ final ComponentLog logger = getLogger();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading lookup table from file: " + csvFile);
+ }
+
+ final FileReader reader = new FileReader(csvFile);
+ final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader);
+ this.cache = new ConcurrentHashMap<>();
+ for (final CSVRecord record : records) {
+ final String key = record.get(lookupKeyColumn);
+
+ if (StringUtils.isBlank(key)) {
+ throw new IllegalStateException("Empty lookup key encountered
in: " + csvFile);
+ } else if (!ignoreDuplicates && this.cache.containsKey(key))
{
+ throw new IllegalStateException("Duplicate lookup key encountered:
" + key + " in " + csvFile);
+ } else if (ignoreDuplicates && this.cache.containsKey(key))
{
+ logger.warn("Duplicate lookup key encountered: {} in {}", new
Object[]{key, csvFile});
+ }
+
+ // Put each key/value pair (except the lookup) into the properties
+ final Map<String, Object> properties = new HashMap<>();
+ record.toMap().forEach((k, v) -> {
+ if (!lookupKeyColumn.equals(k)) {
+ properties.put(k, v);
+ }
+ });
+ cache.put(key, properties);
+ }
+
+ if (cache.isEmpty()) {
+ logger.warn("Lookup table is empty after reading file: " + csvFile);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) throws
InitializationException {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(CSV_FILE);
+ properties.add(CSV_FORMAT);
+ properties.add(LOOKUP_KEY_COLUMN);
+ properties.add(IGNORE_DUPLICATES);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException,
IOException, FileNotFoundException {
+ this.csvFile = context.getProperty(CSV_FILE).getValue();
+ this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
+ this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue();
+ this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
+ this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(),
30000L);
+ try {
+ loadCache();
+ } catch (final IllegalStateException e) {
+ throw new InitializationException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Optional<Record> lookup(final Map<String, String> coordinates)
throws LookupFailureException {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final String key = coordinates.get(KEY);
+ if (StringUtils.isBlank(key)) {
+ return Optional.empty();
+ }
+
+ try {
+ if (watcher != null && watcher.checkAndReset()) {
--- End diff --
'watcher' should never be null in this case. It is initialized in the onEnabled method,
and the framework will guarantee that it is called before we can get to this point.
> Add CSVRecordLookupService
> --------------------------
>
> Key: NIFI-3970
> URL: https://issues.apache.org/jira/browse/NIFI-3970
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Joey Frazee
> Assignee: Matt Burgess
>
> PR [#1830|https://github.com/apache/nifi/pull/1830] provides a SimpleCsvFileLookupService.
Since CSV data is tabular, a counterpart CSVRecordLookupService would be useful for using
with multi-criteria lookups and enrichments in LookupRecord and LookupAttribute.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
|