Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4FEF7200D1D for ; Fri, 8 Sep 2017 20:11:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4F06A1613B0; Fri, 8 Sep 2017 18:11:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62474161194 for ; Fri, 8 Sep 2017 20:11:10 +0200 (CEST) Received: (qmail 53248 invoked by uid 500); 8 Sep 2017 18:11:09 -0000 Mailing-List: contact issues-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list issues@nifi.apache.org Received: (qmail 52819 invoked by uid 99); 8 Sep 2017 18:11:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Sep 2017 18:11:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 915461A5451 for ; Fri, 8 Sep 2017 18:11:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Vnts5XewRh8t for ; Fri, 8 Sep 2017 18:11:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id B60C55FBB1 for ; Fri, 8 Sep 2017 18:11:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id ADF47E0ED9 for ; Fri, 8 Sep 2017 18:11:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id E2F6524157 for ; Fri, 8 Sep 2017 18:11:01 +0000 (UTC) Date: Fri, 8 Sep 2017 18:11:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@nifi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (NIFI-3970) Add CSVRecordLookupService MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 08 Sep 2017 18:11:11 -0000 [ https://issues.apache.org/jira/browse/NIFI-3970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16159039#comment-16159039 ] ASF GitHub Bot commented on NIFI-3970: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2130#discussion_r137845553 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.file.monitor.LastModifiedMonitor; +import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; + +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"}) +@CapabilityDescription("A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, the remaining columns are returned as a Record.") +public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService { + + private static final String KEY = "key"; + + private static final Set REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet())); + + public static final PropertyDescriptor CSV_FILE = + new PropertyDescriptor.Builder() + .name("csv-file") + .displayName("CSV File") + .description("A CSV file.") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder() + .name("CSV Format") + .description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.") + .expressionLanguageSupported(false) + .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet())) + .defaultValue(CSVFormat.Predefined.Default.toString()) + .required(true) + .build(); + + public static final PropertyDescriptor LOOKUP_KEY_COLUMN = + new PropertyDescriptor.Builder() + .name("lookup-key-column") + .displayName("Lookup Key Column") + .description("Lookup key column.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor IGNORE_DUPLICATES = + new PropertyDescriptor.Builder() + .name("ignore-duplicates") + .displayName("Ignore Duplicates") + .description("Ignore duplicate keys for records in the CSV file.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + private List properties; + + private volatile ConcurrentMap> cache; + + private volatile String csvFile; + + private volatile CSVFormat csvFormat; + + private volatile String lookupKeyColumn; + + private volatile boolean ignoreDuplicates; + + private volatile SynchronousFileWatcher watcher; + + private final ReentrantLock lock = new ReentrantLock(); + + private void loadCache() throws IllegalStateException, IOException { + if (lock.tryLock()) { + try { + final ComponentLog logger = getLogger(); + if (logger.isDebugEnabled()) { + logger.debug("Loading lookup table from file: " + csvFile); + } + + final FileReader reader = new FileReader(csvFile); + final CSVParser records = csvFormat.withFirstRecordAsHeader().parse(reader); + this.cache = new ConcurrentHashMap<>(); + for (final CSVRecord record : records) { + final String key = record.get(lookupKeyColumn); + + if (StringUtils.isBlank(key)) { + throw new IllegalStateException("Empty lookup key encountered in: " + csvFile); + } else if (!ignoreDuplicates && this.cache.containsKey(key)) { + throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile); + } else if (ignoreDuplicates && this.cache.containsKey(key)) { + logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile}); + } + + // Put each key/value pair (except the lookup) into the properties + final Map properties = new HashMap<>(); + record.toMap().forEach((k, v) -> { + if (!lookupKeyColumn.equals(k)) { + properties.put(k, v); + } + }); + cache.put(key, properties); --- End diff -- We should probably be putting a Record in this map instead of a Map so that we can avoid creating that Record object every time that we call 'lookup'. This would also allow us to create the RecordSchema object once and use it for every Record object, rather than having to create a separate RecordSchema object per record. > Add CSVRecordLookupService > -------------------------- > > Key: NIFI-3970 > URL: https://issues.apache.org/jira/browse/NIFI-3970 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Joey Frazee > Assignee: Matt Burgess > > PR [#1830|https://github.com/apache/nifi/pull/1830] provides a SimpleCsvFileLookupService. Since CSV data is tabular, a counterpart CSVRecordLookupService would be useful for using with multi-criteria lookups and enrichments in LookupRecord and LookupAttribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)