nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject nifi git commit: NIFI-4496: Added JacksonCSVRecordReader to allow choice of CSV parser. This closes #2245.
Date Fri, 22 Dec 2017 13:57:06 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 62e388aa4 -> 14d2291db


NIFI-4496: Added JacksonCSVRecordReader to allow choice of CSV parser. This closes #2245.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/14d2291d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/14d2291d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/14d2291d

Branch: refs/heads/master
Commit: 14d2291db87d8ea160f538c10de31ac69fc996ae
Parents: 62e388a
Author: Matthew Burgess <mattyb149@apache.org>
Authored: Wed Nov 1 11:50:06 2017 -0400
Committer: Mark Payne <markap14@hotmail.com>
Committed: Fri Dec 22 08:56:29 2017 -0500

----------------------------------------------------------------------
 .../nifi-record-serialization-services/pom.xml  |   5 +
 .../java/org/apache/nifi/csv/CSVReader.java     |  30 +-
 .../apache/nifi/csv/JacksonCSVRecordReader.java | 251 +++++++++++++
 .../nifi/csv/ITApacheCSVRecordReader.java       |  74 ++++
 .../nifi/csv/ITJacksonCSVRecordReader.java      |  74 ++++
 .../nifi/csv/TestJacksonCSVRecordReader.java    | 371 +++++++++++++++++++
 6 files changed, 804 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 30f73c1..cfc07bb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -63,6 +63,11 @@
             <version>1.4</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-csv</artifactId>
+            <version>2.9.2</version>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index a9b98f5..9f133a6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -54,6 +54,26 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         "The first non-comment line of the CSV file is a header line that contains the names
of the columns. The schema will be derived by using the "
             + "column names in the header and assuming that all columns are of type String.");
 
+    // CSV parsers
+    public static final AllowableValue APACHE_COMMONS_CSV = new AllowableValue("commons-csv",
"Apache Commons CSV",
+            "The CSV parser implementation from the Apache Commons CSV library.");
+
+    public static final AllowableValue JACKSON_CSV = new AllowableValue("jackson-csv", "Jackson
CSV",
+            "The CSV parser implementation from the Jackson Dataformats library.");
+
+
+    public static final PropertyDescriptor CSV_PARSER = new PropertyDescriptor.Builder()
+            .name("csv-reader-csv-parser")
+            .displayName("CSV Parser")
+            .description("Specifies which parser to use to read CSV records. NOTE: Different
parsers may support different subsets of functionality "
+                    + "and may also exhibit different levels of performance.")
+            .expressionLanguageSupported(false)
+            .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
+            .defaultValue(APACHE_COMMONS_CSV.getValue())
+            .required(true)
+            .build();
+
+    private volatile String csvParser;
     private volatile CSVFormat csvFormat;
     private volatile String dateFormat;
     private volatile String timeFormat;
@@ -65,6 +85,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(CSV_PARSER);
         properties.add(DateTimeUtils.DATE_FORMAT);
         properties.add(DateTimeUtils.TIME_FORMAT);
         properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@@ -83,6 +104,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
 
     @OnEnabled
     public void storeCsvFormat(final ConfigurationContext context) {
+        this.csvParser = context.getProperty(CSV_PARSER).getValue();
         this.csvFormat = CSVUtils.createCSVFormat(context);
         this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
         this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
@@ -108,7 +130,13 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn),
null);
         bufferedIn.reset();
 
-        return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader,
ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+        if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
+            return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader,
ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+        } else if(JACKSON_CSV.getValue().equals(csvParser)) {
+            return new JacksonCSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader,
ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+        } else {
+            throw new IOException("Parser not supported");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
new file mode 100644
index 0000000..a273d0c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.io.input.BOMInputStream;
+import org.apache.commons.lang3.CharUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+
+public class JacksonCSVRecordReader implements RecordReader {
+    private final RecordSchema schema;
+
+    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+
+    private final ComponentLog logger;
+    private final boolean hasHeader;
+    private final boolean ignoreHeader;
+    private final MappingIterator<String[]> recordStream;
+    private List<String> rawFieldNames = null;
+
+    private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+    public JacksonCSVRecordReader(final InputStream in, final ComponentLog logger, final
RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
+                                  final String dateFormat, final String timeFormat, final
String timestampFormat, final String encoding) throws IOException {
+
+        this.schema = schema;
+        this.logger = logger;
+        this.hasHeader = hasHeader;
+        this.ignoreHeader = ignoreHeader;
+        final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+        final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+        final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
+
+        LAZY_DATE_FORMAT = () -> df;
+        LAZY_TIME_FORMAT = () -> tf;
+        LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
+        final Reader reader = new InputStreamReader(new BOMInputStream(in));
+
+        CsvSchema.Builder csvSchemaBuilder = CsvSchema.builder()
+                .setColumnSeparator(csvFormat.getDelimiter())
+                .setLineSeparator(csvFormat.getRecordSeparator())
+                // Can only use comments in Jackson CSV if the correct marker is set
+                .setAllowComments("#" .equals(CharUtils.toString(csvFormat.getCommentMarker())))
+                // The call to setUseHeader(false) in all code paths is due to the way Jackson
does data binding/mapping. Missing or extra columns may not
+                // be handled correctly when using the header for mapping.
+                .setUseHeader(false);
+
+        csvSchemaBuilder = (csvFormat.getQuoteCharacter() == null) ? csvSchemaBuilder : csvSchemaBuilder.setQuoteChar(csvFormat.getQuoteCharacter());
+        csvSchemaBuilder = (csvFormat.getEscapeCharacter() == null) ? csvSchemaBuilder :
csvSchemaBuilder.setEscapeChar(csvFormat.getEscapeCharacter());
+
+        if (hasHeader) {
+            if (ignoreHeader) {
+                csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true);
+            }
+        }
+
+        CsvSchema csvSchema = csvSchemaBuilder.build();
+
+        // Add remaining config options to the mapper
+        List<CsvParser.Feature> features = new ArrayList<>(3);
+        features.add(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS);
+        if (csvFormat.getIgnoreEmptyLines()) {
+            features.add(CsvParser.Feature.SKIP_EMPTY_LINES);
+        }
+        if (csvFormat.getTrim()) {
+            features.add(CsvParser.Feature.TRIM_SPACES);
+        }
+
+        ObjectReader objReader = mapper.readerFor(String[].class)
+                .with(csvSchema)
+                .withFeatures(features.toArray(new CsvParser.Feature[3]));
+
+        recordStream = objReader.readValues(reader);
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields)
throws IOException, MalformedRecordException {
+        final RecordSchema schema = getSchema();
+
+        if (recordStream.hasNext()) {
+            String[] csvRecord = recordStream.next();
+
+            // If the first record is the header names (and we're using them), store those
off for use in creating the value map on the next iterations
+            if (rawFieldNames == null) {
+                if (!hasHeader || ignoreHeader) {
+                    rawFieldNames = schema.getFieldNames();
+                } else {
+                    rawFieldNames = Arrays.asList(csvRecord);
+
+                    // Advance the stream to keep the record count correct
+                    if (recordStream.hasNext()) {
+                        csvRecord = recordStream.next();
+                    } else {
+                        return null;
+                    }
+                }
+            }
+            // Check for empty lines and ignore them
+            boolean foundRecord = true;
+            if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0])))
{
+                foundRecord = false;
+                while (recordStream.hasNext()) {
+                    csvRecord = recordStream.next();
+
+                    if (csvRecord != null && !(csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0])))
{
+                        // This is a non-empty record/row, so continue processing
+                        foundRecord = true;
+                        break;
+                    }
+                }
+            }
+            // If we didn't find a record, then the end of the file was comprised of empty
lines, so we have no record to return
+            if (!foundRecord) {
+                return null;
+            }
+
+            final Map<String, Object> values = new LinkedHashMap<>();
+            final int numFieldNames = rawFieldNames.size();
+            for (int i = 0; i < csvRecord.length; i++) {
+                final String rawFieldName = numFieldNames <= i ? "unknown_field_index_"
+ i : rawFieldNames.get(i);
+                String rawValue = (i >= csvRecord.length) ? null : csvRecord[i];
+
+                final Optional<DataType> dataTypeOption = schema.getDataType(rawFieldName);
+
+                if (!dataTypeOption.isPresent() && dropUnknownFields) {
+                    continue;
+                }
+
+                final Object value;
+                if (coerceTypes && dataTypeOption.isPresent()) {
+                    value = convert(rawValue, dataTypeOption.get(), rawFieldName);
+                } else if (dataTypeOption.isPresent()) {
+                    // The CSV Reader is going to return all fields as Strings, because CSV
doesn't have any way to
+                    // dictate a field type. As a result, we will use the schema that we
have to attempt to convert
+                    // the value into the desired type if it's a simple type.
+                    value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName);
+                } else {
+                    value = rawValue;
+                }
+
+                values.put(rawFieldName, value);
+            }
+
+            return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
+        }
+
+        return null;
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    protected Object convert(final String value, final DataType dataType, final String fieldName)
{
+        if (dataType == null || value == null) {
+            return value;
+        }
+
+        final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1,
value.length() - 1) : value;
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+
+        return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT,
LAZY_TIMESTAMP_FORMAT, fieldName);
+    }
+
+    private Object convertSimpleIfPossible(final String value, final DataType dataType, final
String fieldName) {
+        if (dataType == null || value == null) {
+            return value;
+        }
+
+        final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1,
value.length() - 1) : value;
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+
+        switch (dataType.getFieldType()) {
+            case STRING:
+                return value;
+            case BOOLEAN:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BYTE:
+            case CHAR:
+            case SHORT:
+            case TIME:
+            case TIMESTAMP:
+            case DATE:
+                if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
+                    return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT,
LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                } else {
+                    return value;
+                }
+        }
+
+        return value;
+    }
+
+    @Override
+    public void close() throws IOException {
+        recordStream.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
new file mode 100644
index 0000000..30c05c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITApacheCSVRecordReader {
+
+    private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final String fieldName : new String[]{"id", "name", "balance", "address", "city",
"state", "zipCode", "country"}) {
+            fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+        }
+        return fields;
+    }
+
+    @Test
+    public void testParserPerformance() throws IOException, MalformedRecordException {
+        // Generates about 130MB of data
+        final int NUM_LINES = 2500000;
+        StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
+        for (int i = 0; i < NUM_LINES; i++) {
+            sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
+        }
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+             final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class),
schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+            Record record;
+            int numRecords = 0;
+            while ((record = reader.nextRecord()) != null) {
+                assertNotNull(record);
+                numRecords++;
+            }
+            assertEquals(NUM_LINES, numRecords);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
new file mode 100644
index 0000000..a3f1b37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITJacksonCSVRecordReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJacksonCSVRecordReader {
+
+    private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final String fieldName : new String[]{"id", "name", "balance", "address", "city",
"state", "zipCode", "country"}) {
+            fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+        }
+        return fields;
+    }
+
+    @Test
+    public void testParserPerformance() throws IOException, MalformedRecordException {
+        // Generates about 130MB of data
+        final int NUM_LINES = 2500000;
+        StringBuilder sb = new StringBuilder("id,name,balance,address,city,state,zipCode,country\n");
+        for (int i = 0; i < NUM_LINES; i++) {
+            sb.append("1,John Doe,4750.89D,123 My Street,My City,MS,11111,USA\n");
+        }
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes());
+             final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class),
schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+            Record record;
+            int numRecords = 0;
+            while ((record = reader.nextRecord()) != null) {
+                assertNotNull(record);
+                numRecords++;
+            }
+            assertEquals(NUM_LINES, numRecords);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/14d2291d/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
new file mode 100644
index 0000000..9e08594
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestJacksonCSVRecordReader {
+    private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
+    private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final String fieldName : new String[] {"id", "name", "balance", "address", "city",
"state", "zipCode", "country"}) {
+            fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+        }
+        return fields;
+    }
+
+    private JacksonCSVRecordReader createReader(final InputStream in, final RecordSchema
schema, CSVFormat format) throws IOException {
+        return new JacksonCSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format,
true, false,
+            RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII");
+    }
+
+    @Test
+    public void testUTF8() throws IOException, MalformedRecordException {
+        final String text = "name\n黃凱揚";
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+             final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class),
schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+            final Record record = reader.nextRecord();
+            final String name = (String)record.getValue("name");
+
+            assertEquals("黃凱揚", name);
+        }
+    }
+
+    @Test
+    public void testDate() throws IOException, MalformedRecordException {
+        final String text = "date\n11/30/1983";
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+             final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class),
schema, format, true, false,
+                     "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(),
"UTF-8")) {
+
+            final Record record = reader.nextRecord();
+            final Date date = (Date) record.getValue("date");
+            final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+            calendar.setTimeInMillis(date.getTime());
+
+            assertEquals(1983, calendar.get(Calendar.YEAR));
+            assertEquals(10, calendar.get(Calendar.MONTH));
+            assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+        }
+    }
+
+    @Test
+    public void testSimpleParse() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance",
doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
+            final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+            final Object[] record = reader.nextRecord().getValues();
+            final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123
My Street", "My City", "MS", "11111", "USA"};
+            Assert.assertArrayEquals(expectedValues, record);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testMultipleRecords() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance",
doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
+            final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D,
"123 My Street", "My City", "MS", "11111", "USA"};
+            Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D,
"321 Your Street", "Your City", "NY", "33333", "USA"};
+            Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance",
doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
+            final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D,
"123 My Street", "My City", "MS", "11111", "USA"};
+            Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D,
"321 Your Street", "Your City", "NY", "33333", "USA"};
+            Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testMissingField() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance",
doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, zipCode, country";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals(40.8D, record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testReadRawWithDifferentFieldName() throws IOException, MalformedRecordException
{
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, zipCode, continent";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, North
America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertNull(record.getValue("continent"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // test nextRawRecord does contain 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertEquals("North America", record.getValue("continent"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
+
+
+    @Test
+    public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordException
{
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, zipCode";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+
+            // If schema says that there are fields a, b, c
+            // and the CSV has a header line that says field names are a, b
+            // and then the data has values 1,2,3
+            // then a=1, b=2, c=null
+            assertNull(record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // Create another Record Reader that indicates that the header line is present but
should be ignored. This should cause
+        // our schema to be the definitive list of what fields exist.
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = new JacksonCSVRecordReader(bais, Mockito.mock(ComponentLog.class),
schema, format, true, true,
+                RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(),
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+
+            // If schema says that there are fields a, b, c
+            // and the CSV has a header line that says field names are a, b
+            // and then the data has values 1,2,3
+            // then a=1, b=2, c=null
+            // But if we configure the reader to Ignore the header, then this will not occur!
+            assertEquals("USA", record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+
+    }
+
+    @Test
+    public void testExtraFieldNotInHeader() throws IOException, MalformedRecordException
{
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, zipCode, country";
+        final String inputRecord = "1, John, 40.80, 123 My Street, My City, MS, 11111, USA,
North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+            final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertEquals("USA", record.getValue("country"));
+            assertEquals("North America", record.getValue("unknown_field_index_8"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
+
+    @Test
+    public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException
{
+
+        char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
+
+        final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance",
doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
+            final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D,
"123 My Street", "My City", "MS", "11111", "USA"};
+            Assert.assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D,
"321 Your Street", "Your City", "NY", "33333", "USA"};
+            Assert.assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+}


Mime
View raw message