nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject [nifi] branch main updated: NIFI-8748 Corrected PutKudu String to java.sql.Date parsing
Date Wed, 30 Jun 2021 11:40:50 GMT
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b27c2b5  NIFI-8748 Corrected PutKudu String to java.sql.Date parsing
b27c2b5 is described below

commit b27c2b500e1c2d889078d0a63fcb10556cfbe621
Author: exceptionfactory <exceptionfactory@apache.org>
AuthorDate: Tue Jun 29 09:54:05 2021 -0500

    NIFI-8748 Corrected PutKudu String to java.sql.Date parsing
    
    - Added getDateFormat() using default time zone instead of GMT time zone from DataTypeUtils.getDateFormat()
    
    NIFI-8748 Adjusted Date Format to use DataType.getFormat()
    
    Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
    
    This closes #5194.
---
 .../processors/kudu/AbstractKuduProcessor.java     | 34 ++++++++++++++++++++--
 .../apache/nifi/processors/kudu/TestPutKudu.java   | 26 +++++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index b0d4566..774430e 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -55,11 +55,15 @@ import org.apache.nifi.util.StringUtils;
 
 import javax.security.auth.login.LoginException;
 import java.math.BigDecimal;
+import java.sql.Date;
 import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
@@ -114,7 +118,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor
{
             .displayName("Kudu Operation Timeout")
             .description("Default timeout used for user operations (using sessions and scanners)")
             .required(false)
-            .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) +
"ms")
+            .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -124,7 +128,7 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor
{
             .displayName("Kudu Keep Alive Period Timeout")
             .description("Default timeout used for user operations")
             .required(false)
-            .defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) +
"ms")
+            .defaultValue(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS + "ms")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -403,7 +407,9 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor
{
                         row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
                         break;
                     case DATE:
-                        row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()),
recordFieldName));
+                        final Optional<DataType> fieldDataType = record.getSchema().getDataType(recordFieldName);
+                        final String format = fieldDataType.isPresent() ? fieldDataType.get().getFormat()
: RecordFieldType.DATE.getDefaultFormat();
+                        row.addDate(columnIndex, getDate(value, recordFieldName, format));
                         break;
                     default:
                         throw new IllegalStateException(String.format("unknown column type
%s", colType));
@@ -413,6 +419,28 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor
{
     }
 
     /**
+     * Get java.sql.Date from Record Field Value with optional parsing when input value is
a String
+     *
+     * @param value Record Field Value
+     * @param recordFieldName Record Field Name
+     * @param format Date Format Pattern
+     * @return Date object or null when value is null
+     */
+    private Date getDate(final Object value, final String recordFieldName, final String format)
{
+        return DataTypeUtils.toDate(value, () -> getDateFormat(format), recordFieldName);
+    }
+
+    /**
+     * Get Date Format using Date Record Field default pattern and system time zone to avoid
unnecessary conversion
+     *
+     * @param format Date Format Pattern
+     * @return Date Format used to parsing date fields
+     */
+    private DateFormat getDateFormat(final String format) {
+        return new SimpleDateFormat(format);
+    }
+
+    /**
      * Converts a NiFi DataType to it's equivalent Kudu Type.
      */
     private Type toKuduType(DataType nifiType) {
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 0c739ea..1d397d9 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -75,6 +75,7 @@ import java.util.stream.IntStream;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -478,6 +479,31 @@ public class TestPutKudu {
                 row.getDate("sql_date").toString(), today.toString());
     }
 
+    @Test
+    public void testBuildPartialRowWithDateString() {
+        final String dateFieldName = "created";
+        final String dateFieldValue = "2000-01-01";
+
+        final Schema kuduSchema = new Schema(Collections.singletonList(
+                new ColumnSchema.ColumnSchemaBuilder(dateFieldName, Type.DATE).nullable(true).build()
+        ));
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField(dateFieldName, RecordFieldType.DATE.getDataType())
+        ));
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put(dateFieldName, dateFieldValue);
+        final MapRecord record = new MapRecord(schema, values);
+
+        final PartialRow row = kuduSchema.newPartialRow();
+
+        processor.buildPartialRow(kuduSchema, row, record, schema.getFieldNames(), true,
true);
+
+        final java.sql.Date rowDate = row.getDate(dateFieldName);
+        assertEquals("Partial Row Date Field not matched", dateFieldValue, rowDate.toString());
+    }
+
     private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName,
String recordIdName, String airport_code, java.sql.Date sql_date, Boolean lowercaseFields)
{
         final Schema kuduSchema = new Schema(Arrays.asList(
                 new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),

Mime
View raw message