nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattyb...@apache.org
Subject nifi git commit: NIFI-2624: Avro logical types for ExecuteSQL and QueryDatabaseTable
Date Mon, 15 May 2017 18:16:33 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 20a1fc24d -> 1811ba568


NIFI-2624: Avro logical types for ExecuteSQL and QueryDatabaseTable

- Added Logical type support for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP column types.
- Added Logical type 'decimal' to AvroReader so that Avro records with logical types written by ExecuteSQL and QueryDatabaseTable can be consumed by AvroReader.
- Added JdbcCommon.AvroConversionOptions to consolidate conversion options.
- Added 'Use Avro Logical Types' property to ExecuteSQL and QueryDatabaseTable to toggle whether to use Logical types.
- Added 'mime.type' FlowFile attribute as 'application/avro-binary' so that output FlowFiles can be displayed by content viewer.

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1798


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1811ba56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1811ba56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1811ba56

Branch: refs/heads/master
Commit: 1811ba568152d0586c29861899e7ceb2c6b06d24
Parents: 20a1fc2
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Tue Jan 24 12:16:52 2017 +0900
Committer: Matt Burgess <mattyb149@apache.org>
Committed: Mon May 15 14:15:23 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/avro/AvroTypeUtil.java |  30 ++++
 .../nifi-standard-processors/pom.xml            |   4 +-
 .../AbstractDatabaseFetchProcessor.java         |  10 --
 .../nifi/processors/standard/ExecuteSQL.java    |  22 +--
 .../processors/standard/QueryDatabaseTable.java |  15 +-
 .../processors/standard/util/JdbcCommon.java    | 179 +++++++++++++++++--
 .../standard/util/TestJdbcCommon.java           | 122 ++++++++++++-
 .../avro/TestAvroReaderWithEmbeddedSchema.java  |   5 +
 .../apache/nifi/avro/TestWriteAvroResult.java   |  16 +-
 .../src/test/resources/avro/logical-types.avsc  |   7 +
 10 files changed, 364 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 8ded9bc..0682b34 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.avro;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -39,6 +41,8 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.MathContext;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
@@ -61,6 +65,7 @@ public class AvroTypeUtil {
     private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
     private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
     private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
+    private static final String LOGICAL_TYPE_DECIMAL = "decimal";
 
     public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
         if (recordSchema == null) {
@@ -107,6 +112,10 @@ public class AvroTypeUtil {
                 case LOGICAL_TYPE_TIMESTAMP_MILLIS:
                 case LOGICAL_TYPE_TIMESTAMP_MICROS:
                     return RecordFieldType.TIMESTAMP.getDataType();
+                case LOGICAL_TYPE_DECIMAL:
+                    // We convert Decimal to Double.
+                    // Alternatively we could convert it to String, but numeric type is generally more preferable by users.
+                    return RecordFieldType.DOUBLE.getDataType();
             }
         }
 
@@ -262,6 +271,10 @@ public class AvroTypeUtil {
      * Convert a raw value to an Avro object to serialize in Avro type system.
      * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema)}.
      */
+    public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) {
+        return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName());
+    }
+
     private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
         if (rawValue == null) {
             return null;
@@ -311,6 +324,19 @@ public class AvroTypeUtil {
             }
             case BYTES:
             case FIXED:
+                final LogicalType logicalType = fieldSchema.getLogicalType();
+                if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
+                    final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
+                    final BigDecimal decimal;
+                    if (rawValue instanceof BigDecimal) {
+                        decimal = (BigDecimal) rawValue;
+                    } else if (rawValue instanceof Double) {
+                        decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision()));
+                    } else {
+                        throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
+                    }
+                    return new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType);
+                }
                 if (rawValue instanceof byte[]) {
                     return ByteBuffer.wrap((byte[]) rawValue);
                 }
@@ -529,6 +555,10 @@ public class AvroTypeUtil {
                 return new MapRecord(childSchema, values);
             case BYTES:
                 final ByteBuffer bb = (ByteBuffer) value;
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
+                    return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType);
+                }
                 return AvroTypeUtil.convertByteArray(bb.array());
             case FIXED:
                 final GenericFixed fixed = (GenericFixed) value;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 1372735..a86c836 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -174,8 +174,8 @@
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 1d943f4..e11a888 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -143,16 +143,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
             .expressionLanguageSupported(true)
             .build();
 
-    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
-            .name("dbf-normalize")
-            .displayName("Normalize Table/Column Names")
-            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
-                    + "will be changed to underscores in order to build a valid Avro record.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
     protected List<PropertyDescriptor> propDescriptors;
 
     // The delimiter to use when referencing qualified names (such as table@!@column in the state map)

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 14283c1..3ef9107 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -42,6 +42,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
 
+import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"sql", "select", "jdbc", "query", "database"})
@@ -107,16 +111,6 @@ public class ExecuteSQL extends AbstractProcessor {
             .sensitive(false)
             .build();
 
-    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
-            .name("dbf-normalize")
-            .displayName("Normalize Table/Column Names")
-            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
-                    + "will be changed to underscores in order to build a valid Avro record.")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
-
     private final List<PropertyDescriptor> propDescriptors;
 
     public ExecuteSQL() {
@@ -130,6 +124,7 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(SQL_SELECT_QUERY);
         pds.add(QUERY_TIMEOUT);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -172,6 +167,7 @@ public class ExecuteSQL extends AbstractProcessor {
         final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
         final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
         final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
+        final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final StopWatch stopWatch = new StopWatch(true);
         final String selectQuery;
         if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@@ -202,7 +198,10 @@ public class ExecuteSQL extends AbstractProcessor {
                     try {
                         logger.debug("Executing query {}", new Object[]{selectQuery});
                         final ResultSet resultSet = st.executeQuery(selectQuery);
-                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro));
+                        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                                .convertNames(convertNamesForAvro)
+                                .useLogicalTypes(useAvroLogicalTypes).build();
+                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
                     } catch (final SQLException e) {
                         throw new ProcessException(e);
                     }
@@ -211,6 +210,7 @@ public class ExecuteSQL extends AbstractProcessor {
 
             // set attribute how many rows were selected
             fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+            fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
 
             logger.info("{} contains {} Avro records; transferring to 'success'",
                     new Object[]{fileToProcess, nrOfRows.get()});

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index e25273e..dd3ac7b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -35,6 +35,7 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -67,6 +68,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
+import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
+import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -155,6 +159,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(MAX_FRAGMENTS);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -202,7 +207,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
         final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
                 ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
                 : 0;
-        final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
+                .recordName(tableName)
+                .maxRows(maxRowsPerFlowFile)
+                .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
+                .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
+                .build();
 
         final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
 
@@ -284,7 +294,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                             // Max values will be updated in the state property map by the callback
                             final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
                             try {
-                                nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro));
+                                nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
                             } catch (SQLException | RuntimeException e) {
                                 throw new ProcessException("Error during database query or conversion of records to Avro.", e);
                             }
@@ -299,6 +309,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
                         // set attribute how many rows were selected
                         fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
                         fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
+                        fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
                         if(maxRowsPerFlowFile > 0) {
                             fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier);
                             fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex));

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index 40709d0..8771fe9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -55,22 +55,59 @@ import java.sql.Clob;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.Date;
+import java.util.function.Function;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.BaseTypeBuilder;
 import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.avro.SchemaBuilder.NullDefault;
+import org.apache.avro.SchemaBuilder.UnionAccumulator;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
 
 /**
  * JDBC / SQL common functions.
  */
 public class JdbcCommon {
 
+    public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
+
+    public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
+            .name("dbf-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+                    + "will be changed to underscores in order to build a valid Avro record.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
+            .name("dbf-user-logical-types")
+            .displayName("Use Avro Logical Types")
+            .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. "
+                    + "If disabled, written as string. "
+                    + "If enabled, Logical types are used and written as its underlying type, specifically, "
+                    + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, "
+                    + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), "
+                    + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, "
+                    + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+                    + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+
     private static final int MAX_DIGITS_IN_BIGINT = 19;
     private static final int MAX_DIGITS_IN_INT = 9;
 
@@ -90,7 +127,69 @@ public class JdbcCommon {
 
     public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows, boolean convertNames)
             throws SQLException, IOException {
-        final Schema schema = createSchema(rs, recordName, convertNames);
+        final AvroConversionOptions options = AvroConversionOptions.builder()
+                .recordName(recordName)
+                .maxRows(maxRows)
+                .convertNames(convertNames)
+                .useLogicalTypes(false).build();
+        return convertToAvroStream(rs, outStream, options, callback);
+    }
+
+    public static class AvroConversionOptions {
+        private final String recordName;
+        private final int maxRows;
+        private final boolean convertNames;
+        private final boolean useLogicalTypes;
+
+        private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) {
+            this.recordName = recordName;
+            this.maxRows = maxRows;
+            this.convertNames = convertNames;
+            this.useLogicalTypes = useLogicalTypes;
+        }
+
+        public static Builder builder() {
+            return new Builder();
+        }
+
+        public static class Builder {
+            private String recordName;
+            private int maxRows = 0;
+            private boolean convertNames = false;
+            private boolean useLogicalTypes = false;
+
+            /**
+             * Specify a priori record name to use if it cannot be determined from the result set.
+             */
+            public Builder recordName(String recordName) {
+                this.recordName = recordName;
+                return this;
+            }
+
+            public Builder maxRows(int maxRows) {
+                this.maxRows = maxRows;
+                return this;
+            }
+
+            public Builder convertNames(boolean convertNames) {
+                this.convertNames = convertNames;
+                return this;
+            }
+
+            public Builder useLogicalTypes(boolean useLogicalTypes) {
+                this.useLogicalTypes = useLogicalTypes;
+                return this;
+            }
+
+            public AvroConversionOptions build() {
+                return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes);
+            }
+        }
+    }
+
+    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final AvroConversionOptions options, final ResultSetRowCallback callback)
+            throws SQLException, IOException {
+        final Schema schema = createSchema(rs, options);
         final GenericRecord rec = new GenericData.Record(schema);
 
         final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@@ -106,6 +205,7 @@ public class JdbcCommon {
                 }
                 for (int i = 1; i <= nrOfColumns; i++) {
                     final int javaSqlType = meta.getColumnType(i);
+                    final Schema fieldSchema = schema.getFields().get(i - 1).schema();
 
                     // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
                     if (javaSqlType == CLOB) {
@@ -171,8 +271,13 @@ public class JdbcCommon {
                         //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
                         rec.put(i - 1, ((Short) value).intValue());
                     } else if (value instanceof BigDecimal) {
-                        // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
-                        rec.put(i - 1, value.toString());
+                        if (options.useLogicalTypes) {
+                            // Delegate mapping to AvroTypeUtil in order to utilize logical types.
+                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
+                        } else {
+                            // As string for backward compatibility.
+                            rec.put(i - 1, value.toString());
+                        }
 
                     } else if (value instanceof BigInteger) {
                         // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
@@ -208,6 +313,15 @@ public class JdbcCommon {
                             rec.put(i - 1, value);
                         }
 
+                    } else if (value instanceof Date) {
+                        if (options.useLogicalTypes) {
+                            // Delegate mapping to AvroTypeUtil in order to utilize logical types.
+                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
+                        } else {
+                            // As string for backward compatibility.
+                            rec.put(i - 1, value.toString());
+                        }
+
                     } else {
                         // The different types that we support are numbers (int, long, double, float),
                         // as well as boolean values and Strings. Since Avro doesn't provide
@@ -219,7 +333,7 @@ public class JdbcCommon {
                 dataFileWriter.append(rec);
                 nrOfRows += 1;
 
-                if (maxRows > 0 && nrOfRows == maxRows)
+                if (options.maxRows > 0 && nrOfRows == options.maxRows)
                     break;
             }
 
@@ -231,19 +345,33 @@ public class JdbcCommon {
         return createSchema(rs, null, false);
     }
 
+    public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
+        final AvroConversionOptions options = AvroConversionOptions.builder().recordName(recordName).convertNames(convertNames).build();
+        return createSchema(rs, options);
+    }
+
+    private static void addNullableField(
+            FieldAssembler<Schema> builder,
+            String columnName,
+            Function<BaseTypeBuilder<UnionAccumulator<NullDefault<Schema>>>, UnionAccumulator<NullDefault<Schema>>> func
+    ) {
+        final BaseTypeBuilder<UnionAccumulator<NullDefault<Schema>>> and = builder.name(columnName).type().unionOf().nullBuilder().endNull().and();
+        func.apply(and).endUnion().noDefault();
+    }
+
     /**
      * Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a
      * fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value.
      *
-     * @param rs         The result set to convert to Avro
-     * @param recordName The a priori record name to use if it cannot be determined from the result set.
+     * @param rs The result set to convert to Avro
+     * @param options Specify various options
      * @return A Schema object representing the result set converted to an Avro record
      * @throws SQLException if any error occurs during conversion
      */
-    public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
+    public static Schema createSchema(final ResultSet rs, AvroConversionOptions options) throws SQLException {
         final ResultSetMetaData meta = rs.getMetaData();
         final int nrOfColumns = meta.getColumnCount();
-        String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName;
+        String tableName = StringUtils.isEmpty(options.recordName) ? "NiFi_ExecuteSQL_Record" : options.recordName;
         if (nrOfColumns > 0) {
             String tableNameFromMeta = meta.getTableName(1);
             if (!StringUtils.isBlank(tableNameFromMeta)) {
@@ -251,7 +379,7 @@ public class JdbcCommon {
             }
         }
 
-        if (convertNames) {
+        if (options.convertNames) {
             tableName = normalizeNameForAvro(tableName);
         }
 
@@ -267,7 +395,7 @@ public class JdbcCommon {
         *  check for alias. Postgres is the one that has the null column names for calculated fields.
         */
             String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) :meta.getColumnName(i);
-            String columnName = convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
+            String columnName = options.convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
             switch (meta.getColumnType(i)) {
                 case CHAR:
                 case LONGNVARCHAR:
@@ -324,17 +452,39 @@ public class JdbcCommon {
                     builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                     break;
 
-                // Did not find direct suitable type, need to be clarified!!!!
+                // Since Avro 1.8, LogicalType is supported.
                 case DECIMAL:
                 case NUMERIC:
-                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+
+                    final LogicalTypes.Decimal decimal = options.useLogicalTypes
+                            ? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null;
+                    addNullableField(builder, columnName,
+                            u -> options.useLogicalTypes
+                                    ? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))
+                                    : u.stringType());
+
                     break;
 
-                // Did not find direct suitable type, need to be clarified!!!!
                 case DATE:
+
+                    addNullableField(builder, columnName,
+                            u -> options.useLogicalTypes
+                                    ? u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
+                                    : u.stringType());
+                    break;
+
                 case TIME:
+                    addNullableField(builder, columnName,
+                            u -> options.useLogicalTypes
+                                    ? u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()))
+                                    : u.stringType());
+                    break;
+
                 case TIMESTAMP:
-                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+                    addNullableField(builder, columnName,
+                            u -> options.useLogicalTypes
+                                    ? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))
+                                    : u.stringType());
                     break;
 
                 case BINARY:
@@ -371,4 +521,5 @@ public class JdbcCommon {
     public interface ResultSetRowCallback {
         void processRow(ResultSet resultSet) throws IOException;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
index f8ce1f3..37660d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,17 +40,28 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.sql.Types;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.stream.IntStream;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.io.input.ReaderInputStream;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -350,11 +362,15 @@ public class TestJdbcCommon {
 
     @Test
     public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
+        final BigDecimal bigDecimal = new BigDecimal(38D);
+
         final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
         when(metadata.getColumnCount()).thenReturn(1);
         when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
         when(metadata.getColumnName(1)).thenReturn("The.Chairman");
         when(metadata.getTableName(1)).thenReturn("1the::table");
+        when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision());
+        when(metadata.getScale(1)).thenReturn(bigDecimal.scale());
 
         final ResultSet rs = mock(ResultSet.class);
         when(rs.getMetaData()).thenReturn(metadata);
@@ -367,24 +383,28 @@ public class TestJdbcCommon {
             }
         }).when(rs).next();
 
-        final BigDecimal bigDecimal = new BigDecimal(38D);
         when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal);
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
-        JdbcCommon.convertToAvroStream(rs, baos, true);
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
+                .builder().convertNames(true).useLogicalTypes(true).build();
+        JdbcCommon.convertToAvroStream(rs, baos, options, null);
 
         final byte[] serializedBytes = baos.toByteArray();
 
         final InputStream instream = new ByteArrayInputStream(serializedBytes);
 
-        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+        final GenericData genericData = new GenericData();
+        genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(null, null, genericData);
         try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
             GenericRecord record = null;
             while (dataFileReader.hasNext()) {
                 record = dataFileReader.next(record);
                 assertEquals("_1the__table", record.getSchema().getName());
-                assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString());
+                assertEquals(bigDecimal, record.get("The_Chairman"));
             }
         }
     }
@@ -526,6 +546,100 @@ public class TestJdbcCommon {
         }
     }
 
+    @Test
+    public void testConvertToAvroStreamForDateTimeAsString() throws SQLException, IOException, ParseException {
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
+                .builder().convertNames(true).useLogicalTypes(false).build();
+
+        testConvertToAvroStreamForDateTime(options,
+                (record, date) -> assertEquals(new Utf8(date.toString()), record.get("date")),
+                (record, time) -> assertEquals(new Utf8(time.toString()), record.get("time")),
+                (record, timestamp) -> assertEquals(new Utf8(timestamp.toString()), record.get("timestamp"))
+        );
+    }
+
+    @Test
+    public void testConvertToAvroStreamForDateTimeAsLogicalType() throws SQLException, IOException, ParseException {
+        final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
+                .builder().convertNames(true).useLogicalTypes(true).build();
+
+        testConvertToAvroStreamForDateTime(options,
+                (record, date) -> {
+                    final int daysSinceEpoch = (int) record.get("date");
+                    final long millisSinceEpoch = TimeUnit.MILLISECONDS.convert(daysSinceEpoch, TimeUnit.DAYS);
+                    assertEquals(date, new java.sql.Date(millisSinceEpoch));
+                },
+                (record, time) -> assertEquals(time, new Time((int) record.get("time"))),
+                (record, timestamp) -> assertEquals(timestamp, new Timestamp((long) record.get("timestamp")))
+        );
+    }
+
+    private void testConvertToAvroStreamForDateTime(
+            JdbcCommon.AvroConversionOptions options, BiConsumer<GenericRecord, java.sql.Date> assertDate,
+            BiConsumer<GenericRecord, Time> assertTime, BiConsumer<GenericRecord, Timestamp> assertTimeStamp)
+            throws SQLException, IOException, ParseException {
+
+        final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
+
+        final ResultSet rs = mock(ResultSet.class);
+        when(rs.getMetaData()).thenReturn(metadata);
+
+        BiFunction<String, String, Long> toMillis = (format, dateStr) -> {
+            try {
+                final SimpleDateFormat dateFormat = new SimpleDateFormat(format);
+                dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+                return dateFormat.parse(dateStr).getTime();
+            } catch (ParseException e) {
+                throw new RuntimeException(e);
+            }
+        };
+
+        when(metadata.getColumnCount()).thenReturn(3);
+        when(metadata.getTableName(anyInt())).thenReturn("table");
+
+        when(metadata.getColumnType(1)).thenReturn(Types.DATE);
+        when(metadata.getColumnName(1)).thenReturn("date");
+        final java.sql.Date date = new java.sql.Date(toMillis.apply("yyyy/MM/dd", "2017/05/10"));
+        when(rs.getObject(1)).thenReturn(date);
+
+        when(metadata.getColumnType(2)).thenReturn(Types.TIME);
+        when(metadata.getColumnName(2)).thenReturn("time");
+        final Time time = new Time(toMillis.apply("HH:mm:ss.SSS", "12:34:56.789"));
+        when(rs.getObject(2)).thenReturn(time);
+
+        when(metadata.getColumnType(3)).thenReturn(Types.TIMESTAMP);
+        when(metadata.getColumnName(3)).thenReturn("timestamp");
+        final Timestamp timestamp = new Timestamp(toMillis.apply("yyyy/MM/dd HH:mm:ss.SSS", "2017/05/11 19:59:39.123"));
+        when(rs.getObject(3)).thenReturn(timestamp);
+
+        final AtomicInteger counter = new AtomicInteger(1);
+        Mockito.doAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws Throwable {
+                return counter.getAndDecrement() > 0;
+            }
+        }).when(rs).next();
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        JdbcCommon.convertToAvroStream(rs, baos, options, null);
+
+        final byte[] serializedBytes = baos.toByteArray();
+
+        final InputStream instream = new ByteArrayInputStream(serializedBytes);
+
+        final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+        try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
+            GenericRecord record = null;
+            while (dataFileReader.hasNext()) {
+                record = dataFileReader.next(record);
+                assertDate.accept(record, date);
+                assertTime.accept(record, time);
+                assertTimeStamp.accept(record, timestamp);
+            }
+        }
+    }
+
     // many test use Derby as database, so ensure driver is available
     @Test
     public void testDriverLoad() throws ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
index bbb62c5..6782d33 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.text.DateFormat;
@@ -82,6 +83,7 @@ public class TestAvroReaderWithEmbeddedSchema {
         final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
         final long millisSinceMidnight = secondsSinceMidnight * 1000L;
 
+        final BigDecimal bigDecimal = new BigDecimal("123.45");
 
         final byte[] serialized;
         final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@@ -94,6 +96,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             record.put("timestampMillis", timeLong);
             record.put("timestampMicros", timeLong * 1000L);
             record.put("date", 17260);
+            record.put("decimal", ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray()));
 
             writer.append(record);
             writer.flush();
@@ -110,6 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType());
             assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType());
             assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType());
+            assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType());
 
             final Record record = reader.nextRecord();
             assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis"));
@@ -119,6 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema {
             final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
             noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
             assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date")));
+            assertEquals(bigDecimal.doubleValue(), record.getValue("decimal"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index dc5d943..c9587f3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Array;
 import org.apache.avro.generic.GenericRecord;
@@ -74,9 +76,11 @@ public abstract class TestWriteAvroResult {
         fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
         fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        // Avro decimal is represented as double in NiFi type system.
+        fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
         final RecordSchema recordSchema = new SimpleRecordSchema(fields);
 
-        final String expectedTime = "2017-04-04 14:20:33.000";
+        final String expectedTime = "2017-04-04 14:20:33.789";
         final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
         df.setTimeZone(TimeZone.getTimeZone("gmt"));
         final long timeLong = df.parse(expectedTime).getTime();
@@ -87,6 +91,9 @@ public abstract class TestWriteAvroResult {
         values.put("timestampMillis", new Timestamp(timeLong));
         values.put("timestampMicros", new Timestamp(timeLong));
         values.put("date", new Date(timeLong));
+        // Avro decimal is represented as double in NiFi type system.
+        final BigDecimal expectedDecimal = new BigDecimal("123.45");
+        values.put("decimal", expectedDecimal.doubleValue());
         final Record record = new MapRecord(recordSchema, values);
 
         final byte[] data;
@@ -98,17 +105,20 @@ public abstract class TestWriteAvroResult {
         try (final InputStream in = new ByteArrayInputStream(data)) {
             final GenericRecord avroRecord = readRecord(in, schema);
             final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
-            final long millisSinceMidnight = secondsSinceMidnight * 1000L;
+            final long millisSinceMidnight = (secondsSinceMidnight * 1000L) + 789;
 
             assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis"));
             assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros"));
             assertEquals(timeLong, avroRecord.get("timestampMillis"));
             assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros"));
             assertEquals(17260, avroRecord.get("date"));
+            // Double value will be converted into logical decimal if Avro schema is defined as logical decimal.
+            final Schema decimalSchema = schema.getField("decimal").schema();
+            final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, decimalSchema.getLogicalType());
+            assertEquals(expectedDecimal, decimal);
         }
     }
 
-
     @Test
     public void testDataTypes() throws IOException {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/1811ba56/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
index d8315b2..ef3335c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
@@ -29,6 +29,13 @@
 			"type" : "int",
 			"logicalType" : "date"
 		}
+    }, {
+		"name" : "decimal", "type": {
+			"type" : "bytes",
+			"logicalType" : "decimal",
+			"precision" : 5,
+			"scale" : 2
+		}
 	}
   ]
 }
\ No newline at end of file


Mime
View raw message