Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E55E9200C8C for ; Mon, 1 May 2017 22:11:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E2624160BAE; Mon, 1 May 2017 20:11:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91F56160BB9 for ; Mon, 1 May 2017 22:11:55 +0200 (CEST) Received: (qmail 91297 invoked by uid 500); 1 May 2017 20:11:54 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 91194 invoked by uid 99); 1 May 2017 20:11:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 May 2017 20:11:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 853BEE0896; Mon, 1 May 2017 20:11:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: alopresto@apache.org To: commits@nifi.apache.org Date: Mon, 01 May 2017 20:11:56 -0000 Message-Id: In-Reply-To: <7132a44d5f184a2ca8de66bd8ffe5963@git.apache.org> References: <7132a44d5f184a2ca8de66bd8ffe5963@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R archived-at: Mon, 01 May 2017 20:11:58 -0000 http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java deleted file mode 100644 index b6daab7..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -import java.io.Closeable; -import java.io.IOException; -import java.math.BigInteger; -import java.sql.Array; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ResultSetRecordSet implements RecordSet, Closeable { - private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class); - private final ResultSet rs; - private final RecordSchema schema; - private final Set rsColumnNames; - private boolean moreRows; - - public ResultSetRecordSet(final ResultSet rs) throws SQLException { - this.rs = rs; - moreRows = rs.next(); - this.schema = createSchema(rs); - - rsColumnNames = new HashSet<>(); - final ResultSetMetaData metadata = rs.getMetaData(); - for (int i = 0; i < metadata.getColumnCount(); i++) { - rsColumnNames.add(metadata.getColumnLabel(i + 1)); - } - } - - @Override - public RecordSchema getSchema() { - return schema; - } - - @Override - public Record next() throws IOException { - try { - if (moreRows) { - final Record record = createRecord(rs); - moreRows = rs.next(); - return record; - } else { - return null; - } - } catch (final SQLException e) { - throw new IOException("Could not obtain next record from ResultSet", e); - } - } - - @Override - public void close() { - try { - rs.close(); - } catch (final SQLException e) { - logger.error("Failed to close ResultSet", e); - } - } - - private Record createRecord(final ResultSet rs) throws SQLException { - final Map values = new HashMap<>(schema.getFieldCount()); - - for (final RecordField field : schema.getFields()) { - final String fieldName = field.getFieldName(); - - final Object value; - if (rsColumnNames.contains(fieldName)) { - value = normalizeValue(rs.getObject(fieldName)); - } else { - value = null; - } - - values.put(fieldName, value); - } - - return new MapRecord(schema, values); - } - - @SuppressWarnings("rawtypes") - private Object normalizeValue(final Object value) { - if (value == null) { - return null; - } - - if (value instanceof List) { - return ((List) value).toArray(); - } - - return value; - } - - private static RecordSchema createSchema(final ResultSet rs) throws SQLException { - final ResultSetMetaData metadata = rs.getMetaData(); - final int numCols = metadata.getColumnCount(); - final List fields = new ArrayList<>(numCols); - - for (int i = 0; i < numCols; i++) { - final int column = i + 1; - final int sqlType = metadata.getColumnType(column); - - final DataType dataType = getDataType(sqlType, rs, column); - final String fieldName = metadata.getColumnLabel(column); - final RecordField field = new RecordField(fieldName, dataType); - fields.add(field); - } - - return new SimpleRecordSchema(fields); - } - - private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException { - switch (sqlType) { - case Types.ARRAY: - // The JDBC API does not allow us to know what the base type of an array is through the metadata. - // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine - // the base type. However, if the base type is, itself, an array, we will simply return a base type of - // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not - // support calling Array.getResultSet() and will throw an Exception if that is not supported. - if (rs.isAfterLast()) { - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); - } - - final Array array = rs.getArray(columnIndex); - if (array == null) { - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()); - } - - final DataType baseType = getArrayBaseType(array); - return RecordFieldType.ARRAY.getArrayDataType(baseType); - case Types.BINARY: - case Types.LONGVARBINARY: - case Types.VARBINARY: - return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); - case Types.OTHER: - // If we have no records to inspect, we can't really know its schema so we simply use the default data type. - if (rs.isAfterLast()) { - return RecordFieldType.RECORD.getDataType(); - } - - final Object obj = rs.getObject(columnIndex); - if (obj == null || !(obj instanceof Record)) { - return RecordFieldType.RECORD.getDataType(); - } - - final Record record = (Record) obj; - final RecordSchema recordSchema = record.getSchema(); - return RecordFieldType.RECORD.getRecordDataType(recordSchema); - default: - return getFieldType(sqlType).getDataType(); - } - } - - private static DataType getArrayBaseType(final Array array) throws SQLException { - final Object arrayValue = array.getArray(); - if (arrayValue == null) { - return RecordFieldType.STRING.getDataType(); - } - - if (arrayValue instanceof byte[]) { - return RecordFieldType.BYTE.getDataType(); - } - if (arrayValue instanceof int[]) { - return RecordFieldType.INT.getDataType(); - } - if (arrayValue instanceof long[]) { - return RecordFieldType.LONG.getDataType(); - } - if (arrayValue instanceof boolean[]) { - return RecordFieldType.BOOLEAN.getDataType(); - } - if (arrayValue instanceof short[]) { - return RecordFieldType.SHORT.getDataType(); - } - if (arrayValue instanceof byte[]) { - return RecordFieldType.BYTE.getDataType(); - } - if (arrayValue instanceof float[]) { - return RecordFieldType.FLOAT.getDataType(); - } - if (arrayValue instanceof double[]) { - return RecordFieldType.DOUBLE.getDataType(); - } - if (arrayValue instanceof char[]) { - return RecordFieldType.CHAR.getDataType(); - } - if (arrayValue instanceof Object[]) { - final Object[] values = (Object[]) arrayValue; - if (values.length == 0) { - return RecordFieldType.STRING.getDataType(); - } - - Object valueToLookAt = null; - for (int i = 0; i < values.length; i++) { - valueToLookAt = values[i]; - if (valueToLookAt != null) { - break; - } - } - if (valueToLookAt == null) { - return RecordFieldType.STRING.getDataType(); - } - - if (valueToLookAt instanceof String) { - return RecordFieldType.STRING.getDataType(); - } - if (valueToLookAt instanceof Long) { - return RecordFieldType.LONG.getDataType(); - } - if (valueToLookAt instanceof Integer) { - return RecordFieldType.INT.getDataType(); - } - if (valueToLookAt instanceof Short) { - return RecordFieldType.SHORT.getDataType(); - } - if (valueToLookAt instanceof Byte) { - return RecordFieldType.BYTE.getDataType(); - } - if (valueToLookAt instanceof Float) { - return RecordFieldType.FLOAT.getDataType(); - } - if (valueToLookAt instanceof Double) { - return RecordFieldType.DOUBLE.getDataType(); - } - if (valueToLookAt instanceof Boolean) { - return RecordFieldType.BOOLEAN.getDataType(); - } - if (valueToLookAt instanceof Character) { - return RecordFieldType.CHAR.getDataType(); - } - if (valueToLookAt instanceof BigInteger) { - return RecordFieldType.BIGINT.getDataType(); - } - if (valueToLookAt instanceof Integer) { - return RecordFieldType.INT.getDataType(); - } - if (valueToLookAt instanceof java.sql.Time) { - return RecordFieldType.TIME.getDataType(); - } - if (valueToLookAt instanceof java.sql.Date) { - return RecordFieldType.DATE.getDataType(); - } - if (valueToLookAt instanceof java.sql.Timestamp) { - return RecordFieldType.TIMESTAMP.getDataType(); - } - if (valueToLookAt instanceof Record) { - return RecordFieldType.RECORD.getDataType(); - } - } - - return RecordFieldType.STRING.getDataType(); - } - - - private static RecordFieldType getFieldType(final int sqlType) { - switch (sqlType) { - case Types.BIGINT: - case Types.ROWID: - return RecordFieldType.LONG; - case Types.BIT: - case Types.BOOLEAN: - return RecordFieldType.BOOLEAN; - case Types.CHAR: - return RecordFieldType.CHAR; - case Types.DATE: - return RecordFieldType.DATE; - case Types.DECIMAL: - case Types.DOUBLE: - case Types.NUMERIC: - case Types.REAL: - return RecordFieldType.DOUBLE; - case Types.FLOAT: - return RecordFieldType.FLOAT; - case Types.INTEGER: - return RecordFieldType.INT; - case Types.SMALLINT: - return RecordFieldType.SHORT; - case Types.TINYINT: - return RecordFieldType.BYTE; - case Types.LONGNVARCHAR: - case Types.LONGVARCHAR: - case Types.NCHAR: - case Types.NULL: - case Types.NVARCHAR: - case Types.VARCHAR: - return RecordFieldType.STRING; - case Types.OTHER: - case Types.JAVA_OBJECT: - return RecordFieldType.RECORD; - case Types.TIME: - case Types.TIME_WITH_TIMEZONE: - return RecordFieldType.TIME; - case Types.TIMESTAMP: - case Types.TIMESTAMP_WITH_TIMEZONE: - return RecordFieldType.TIMESTAMP; - } - - return RecordFieldType.STRING; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java deleted file mode 100644 index d7f5664..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; - -public interface SchemaIdentifier { - - /** - * @return the name of the schema, if one has been defined. - */ - Optional getName(); - - /** - * @return the identifier of the schema, if one has been defined. - */ - OptionalLong getIdentifier(); - - /** - * @return the version of the schema, if one has been defined. - */ - OptionalInt getVersion(); - - - public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null); - - public static SchemaIdentifier ofName(final String name) { - return new StandardSchemaIdentifier(name, null, null); - } - - public static SchemaIdentifier of(final String name, final long identifier, final int version) { - return new StandardSchemaIdentifier(name, identifier, version); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java deleted file mode 100644 index 86db284..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -import java.util.Optional; -import java.util.OptionalInt; -import java.util.OptionalLong; - -public class StandardSchemaIdentifier implements SchemaIdentifier { - private final Optional name; - private final OptionalLong identifier; - private final OptionalInt version; - - StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) { - this.name = Optional.ofNullable(name); - this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);; - this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);; - } - - @Override - public Optional getName() { - return name; - } - - @Override - public OptionalLong getIdentifier() { - return identifier; - } - - @Override - public OptionalInt getVersion() { - return version; - } - - @Override - public int hashCode() { - return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof SchemaIdentifier)) { - return false; - } - final SchemaIdentifier other = (SchemaIdentifier) obj; - return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion()); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java deleted file mode 100644 index af5f909..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -public class TypeMismatchException extends RuntimeException { - public TypeMismatchException(String message) { - super(message); - } - - public TypeMismatchException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java deleted file mode 100644 index 0c21239..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.type; - -import java.util.Objects; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordFieldType; - -public class ArrayDataType extends DataType { - private final DataType elementType; - - public ArrayDataType(final DataType elementType) { - super(RecordFieldType.ARRAY, null); - this.elementType = elementType; - } - - public DataType getElementType() { - return elementType; - } - - @Override - public RecordFieldType getFieldType() { - return RecordFieldType.ARRAY; - } - - @Override - public int hashCode() { - return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode()); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof ArrayDataType)) { - return false; - } - - final ArrayDataType other = (ArrayDataType) obj; - return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType); - } - - @Override - public String toString() { - return "ARRAY[" + elementType + "]"; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java deleted file mode 100644 index 038b147..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.type; - -import java.util.List; -import java.util.Objects; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordFieldType; - -public class ChoiceDataType extends DataType { - private final List possibleSubTypes; - - public ChoiceDataType(final List possibleSubTypes) { - super(RecordFieldType.CHOICE, null); - this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes); - } - - public List getPossibleSubTypes() { - return possibleSubTypes; - } - - @Override - public RecordFieldType getFieldType() { - return RecordFieldType.CHOICE; - } - - @Override - public int hashCode() { - return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode()); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof ChoiceDataType)) { - return false; - } - - final ChoiceDataType other = (ChoiceDataType) obj; - return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes); - } - - @Override - public String toString() { - return "CHOICE" + possibleSubTypes; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java deleted file mode 100644 index a85fb5e..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.type; - -import java.util.Objects; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordFieldType; - -public class MapDataType extends DataType { - private final DataType valueType; - - public MapDataType(final DataType elementType) { - super(RecordFieldType.MAP, null); - this.valueType = elementType; - } - - public DataType getValueType() { - return valueType; - } - - @Override - public RecordFieldType getFieldType() { - return RecordFieldType.MAP; - } - - @Override - public int hashCode() { - return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode()); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof MapDataType)) { - return false; - } - - final MapDataType other = (MapDataType) obj; - return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType); - } - - @Override - public String toString() { - return "MAP[" + valueType + "]"; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java deleted file mode 100644 index 006d34c..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.type; - -import java.util.Objects; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; - -public class RecordDataType extends DataType { - private final RecordSchema childSchema; - - public RecordDataType(final RecordSchema childSchema) { - super(RecordFieldType.RECORD, null); - this.childSchema = childSchema; - } - - @Override - public RecordFieldType getFieldType() { - return RecordFieldType.RECORD; - } - - public RecordSchema getChildSchema() { - return childSchema; - } - - @Override - public int hashCode() { - return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode()); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof RecordDataType)) { - return false; - } - - final RecordDataType other = (RecordDataType) obj; - return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema); - } - - @Override - public String toString() { - return RecordFieldType.RECORD.toString(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java deleted file mode 100644 index 05b3157..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ /dev/null @@ -1,670 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.util; - -import java.math.BigInteger; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.TimeZone; -import java.util.function.Consumer; - -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.type.ChoiceDataType; -import org.apache.nifi.serialization.record.type.RecordDataType; - -public class DataTypeUtils { - - private static final TimeZone gmt = TimeZone.getTimeZone("gmt"); - - public static Object convertType(final Object value, final DataType dataType, final String fieldName) { - return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName); - } - - public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat, final String fieldName) { - switch (dataType.getFieldType()) { - case BIGINT: - return toBigInt(value, fieldName); - case BOOLEAN: - return toBoolean(value, fieldName); - case BYTE: - return toByte(value, fieldName); - case CHAR: - return toCharacter(value, fieldName); - case DATE: - return toDate(value, dateFormat, fieldName); - case DOUBLE: - return toDouble(value, fieldName); - case FLOAT: - return toFloat(value, fieldName); - case INT: - return toInteger(value, fieldName); - case LONG: - return toLong(value, fieldName); - case SHORT: - return toShort(value, fieldName); - case STRING: - return toString(value, dateFormat, timeFormat, timestampFormat); - case TIME: - return toTime(value, timeFormat, fieldName); - case TIMESTAMP: - return toTimestamp(value, timestampFormat, fieldName); - case ARRAY: - return toArray(value, fieldName); - case MAP: - return toMap(value, fieldName); - case RECORD: - final RecordDataType recordType = (RecordDataType) dataType; - final RecordSchema childSchema = recordType.getChildSchema(); - return toRecord(value, childSchema, fieldName); - case CHOICE: { - if (value == null) { - return null; - } - - final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; - final DataType chosenDataType = chooseDataType(value, choiceDataType); - if (chosenDataType == null) { - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() - + " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes()); - } - - return convertType(value, chosenDataType, fieldName); - } - } - - return null; - } - - - public static boolean isCompatibleDataType(final Object value, final DataType dataType) { - switch (dataType.getFieldType()) { - case ARRAY: - return isArrayTypeCompatible(value); - case BIGINT: - return isBigIntTypeCompatible(value); - case BOOLEAN: - return isBooleanTypeCompatible(value); - case BYTE: - return isByteTypeCompatible(value); - case CHAR: - return isCharacterTypeCompatible(value); - case DATE: - return isDateTypeCompatible(value, dataType.getFormat()); - case DOUBLE: - return isDoubleTypeCompatible(value); - case FLOAT: - return isFloatTypeCompatible(value); - case INT: - return isIntegerTypeCompatible(value); - case LONG: - return isLongTypeCompatible(value); - case RECORD: - return isRecordTypeCompatible(value); - case SHORT: - return isShortTypeCompatible(value); - case TIME: - return isTimeTypeCompatible(value, dataType.getFormat()); - case TIMESTAMP: - return isTimestampTypeCompatible(value, dataType.getFormat()); - case STRING: - return isStringTypeCompatible(value); - case MAP: - return isMapTypeCompatible(value); - case CHOICE: { - final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType); - return chosenDataType != null; - } - } - - return false; - } - - public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) { - for (final DataType subType : choiceType.getPossibleSubTypes()) { - if (isCompatibleDataType(value, subType)) { - return subType; - } - } - - return null; - } - - public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Record) { - return ((Record) value); - } - - if (value instanceof Map) { - if (recordSchema == null) { - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() - + " to Record for field " + fieldName + " because the value is a Map but no Record Schema was provided"); - } - - final Map map = (Map) value; - final Map coercedValues = new HashMap<>(); - - for (final Map.Entry entry : map.entrySet()) { - final Object keyValue = entry.getKey(); - if (keyValue == null) { - continue; - } - - final String key = keyValue.toString(); - final Optional desiredTypeOption = recordSchema.getDataType(key); - if (!desiredTypeOption.isPresent()) { - continue; - } - - final Object rawValue = entry.getValue(); - final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName); - coercedValues.put(key, coercedValue); - } - - return new MapRecord(recordSchema, coercedValues); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName); - } - - public static boolean isRecordTypeCompatible(final Object value) { - return value != null && value instanceof Record; - } - - public static Object[] toArray(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Object[]) { - return (Object[]) value; - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName); - } - - public static boolean isArrayTypeCompatible(final Object value) { - return value != null && value instanceof Object[]; - } - - @SuppressWarnings("unchecked") - public static Map toMap(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Map) { - final Map original = (Map) value; - - boolean keysAreStrings = true; - for (final Object key : original.keySet()) { - if (!(key instanceof String)) { - keysAreStrings = false; - } - } - - if (keysAreStrings) { - return (Map) value; - } - - final Map transformed = new HashMap<>(); - for (final Map.Entry entry : original.entrySet()) { - final Object key = entry.getKey(); - if (key == null) { - transformed.put(null, entry.getValue()); - } else { - transformed.put(key.toString(), entry.getValue()); - } - } - - return transformed; - } - - if (value instanceof Record) { - final Record record = (Record) value; - final RecordSchema recordSchema = record.getSchema(); - if (recordSchema == null) { - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type Record to Map for field " + fieldName - + " because Record does not have an associated Schema"); - } - - final Map map = new HashMap<>(); - for (final String recordFieldName : recordSchema.getFieldNames()) { - map.put(recordFieldName, record.getValue(recordFieldName)); - } - - return map; - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Map for field " + fieldName); - } - - public static boolean isMapTypeCompatible(final Object value) { - return value != null && value instanceof Map; - } - - - public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) { - if (value == null) { - return null; - } - - if (value instanceof String) { - return (String) value; - } - - if (value instanceof java.sql.Date) { - return getDateFormat(dateFormat).format((java.util.Date) value); - } - if (value instanceof java.sql.Time) { - return getDateFormat(timeFormat).format((java.util.Date) value); - } - if (value instanceof java.sql.Timestamp) { - return getDateFormat(timestampFormat).format((java.util.Date) value); - } - if (value instanceof java.util.Date) { - return getDateFormat(timestampFormat).format((java.util.Date) value); - } - - return value.toString(); - } - - public static boolean isStringTypeCompatible(final Object value) { - return value != null; - } - - public static java.sql.Date toDate(final Object value, final String format, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Date) { - return (Date) value; - } - - if (value instanceof Number) { - final long longValue = ((Number) value).longValue(); - return new Date(longValue); - } - - if (value instanceof String) { - try { - final java.util.Date utilDate = getDateFormat(format).parse((String) value); - return new Date(utilDate.getTime()); - } catch (final ParseException e) { - throw new IllegalTypeConversionException("Could not convert value [" + value - + "] of type java.lang.String to Date because the value is not in the expected date format: " + format + " for field " + fieldName); - } - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName); - } - - public static boolean isDateTypeCompatible(final Object value, final String format) { - if (value == null) { - return false; - } - - if (value instanceof java.util.Date || value instanceof Number) { - return true; - } - - if (value instanceof String) { - try { - getDateFormat(format).parse((String) value); - return true; - } catch (final ParseException e) { - return false; - } - } - - return false; - } - - public static Time toTime(final Object value, final String format, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Time) { - return (Time) value; - } - - if (value instanceof Number) { - final long longValue = ((Number) value).longValue(); - return new Time(longValue); - } - - if (value instanceof String) { - try { - final java.util.Date utilDate = getDateFormat(format).parse((String) value); - return new Time(utilDate.getTime()); - } catch (final ParseException e) { - throw new IllegalTypeConversionException("Could not convert value [" + value - + "] of type java.lang.String to Time for field " + fieldName + " because the value is not in the expected date format: " + format); - } - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName); - } - - private static DateFormat getDateFormat(final String format) { - final DateFormat df = new SimpleDateFormat(format); - df.setTimeZone(gmt); - return df; - } - - public static boolean isTimeTypeCompatible(final Object value, final String format) { - return isDateTypeCompatible(value, format); - } - - public static Timestamp toTimestamp(final Object value, final String format, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Timestamp) { - return (Timestamp) value; - } - - if (value instanceof Number) { - final long longValue = ((Number) value).longValue(); - return new Timestamp(longValue); - } - - if (value instanceof String) { - try { - final java.util.Date utilDate = getDateFormat(format).parse((String) value); - return new Timestamp(utilDate.getTime()); - } catch (final ParseException e) { - throw new IllegalTypeConversionException("Could not convert value [" + value - + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: " + format); - } - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp for field " + fieldName); - } - - public static boolean isTimestampTypeCompatible(final Object value, final String format) { - return isDateTypeCompatible(value, format); - } - - - public static BigInteger toBigInt(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof BigInteger) { - return (BigInteger) value; - } - if (value instanceof Long) { - return BigInteger.valueOf((Long) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger for field " + fieldName); - } - - public static boolean isBigIntTypeCompatible(final Object value) { - return value == null && (value instanceof BigInteger || value instanceof Long); - } - - public static Boolean toBoolean(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Boolean) { - return (Boolean) value; - } - if (value instanceof String) { - final String string = (String) value; - if (string.equalsIgnoreCase("true")) { - return Boolean.TRUE; - } else if (string.equalsIgnoreCase("false")) { - return Boolean.FALSE; - } - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean for field " + fieldName); - } - - public static boolean isBooleanTypeCompatible(final Object value) { - if (value == null) { - return false; - } - if (value instanceof Boolean) { - return true; - } - if (value instanceof String) { - final String string = (String) value; - return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false"); - } - return false; - } - - public static Double toDouble(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - - if (value instanceof String) { - return Double.parseDouble((String) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double for field " + fieldName); - } - - public static boolean isDoubleTypeCompatible(final Object value) { - return isNumberTypeCompatible(value, s -> Double.parseDouble(s)); - } - - private static boolean isNumberTypeCompatible(final Object value, final Consumer stringValueVerifier) { - if (value == null) { - return false; - } - - if (value instanceof Number) { - return true; - } - - if (value instanceof String) { - try { - stringValueVerifier.accept((String) value); - return true; - } catch (final NumberFormatException nfe) { - return false; - } - } - - return false; - } - - public static Float toFloat(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).floatValue(); - } - - if (value instanceof String) { - return Float.parseFloat((String) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float for field " + fieldName); - } - - public static boolean isFloatTypeCompatible(final Object value) { - return isNumberTypeCompatible(value, s -> Float.parseFloat(s)); - } - - public static Long toLong(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).longValue(); - } - - if (value instanceof String) { - return Long.parseLong((String) value); - } - - if (value instanceof java.util.Date) { - return ((java.util.Date) value).getTime(); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long for field " + fieldName); - } - - public static boolean isLongTypeCompatible(final Object value) { - if (value == null) { - return false; - } - - if (value instanceof Number) { - return true; - } - - if (value instanceof java.util.Date) { - return true; - } - - if (value instanceof String) { - try { - Long.parseLong((String) value); - return true; - } catch (final NumberFormatException nfe) { - return false; - } - } - - return false; - } - - - public static Integer toInteger(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).intValue(); - } - - if (value instanceof String) { - return Integer.parseInt((String) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer for field " + fieldName); - } - - public static boolean isIntegerTypeCompatible(final Object value) { - return isNumberTypeCompatible(value, s -> Integer.parseInt(s)); - } - - - public static Short toShort(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).shortValue(); - } - - if (value instanceof String) { - return Short.parseShort((String) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short for field " + fieldName); - } - - public static boolean isShortTypeCompatible(final Object value) { - return isNumberTypeCompatible(value, s -> Short.parseShort(s)); - } - - public static Byte toByte(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Number) { - return ((Number) value).byteValue(); - } - - if (value instanceof String) { - return Byte.parseByte((String) value); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte for field " + fieldName); - } - - public static boolean isByteTypeCompatible(final Object value) { - return isNumberTypeCompatible(value, s -> Byte.parseByte(s)); - } - - - public static Character toCharacter(final Object value, final String fieldName) { - if (value == null) { - return null; - } - - if (value instanceof Character) { - return ((Character) value); - } - - if (value instanceof CharSequence) { - final CharSequence charSeq = (CharSequence) value; - if (charSeq.length() == 0) { - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() - + " to Character because it has a length of 0 for field " + fieldName); - } - - return charSeq.charAt(0); - } - - throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character for field " + fieldName); - } - - public static boolean isCharacterTypeCompatible(final Object value) { - return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0)); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java deleted file mode 100644 index 38b5d20..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record.util; - -public class IllegalTypeConversionException extends RuntimeException { - - public IllegalTypeConversionException(final String message) { - super(message); - } - - public IllegalTypeConversionException(final String message, final Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java deleted file mode 100644 index 5a61275..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.junit.Assert; -import org.junit.Test; - -public class TestSimpleRecordSchema { - - @Test - public void testPreventsTwoFieldsWithSameAlias() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); - fields.add(new RecordField("goodbye", RecordFieldType.STRING.getDataType(), null, set("baz", "bar"))); - - try { - new SimpleRecordSchema(fields); - Assert.fail("Was able to create two fields with same alias"); - } catch (final IllegalArgumentException expected) { - } - } - - @Test - public void testPreventsTwoFieldsWithSameName() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); - fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType())); - - try { - new SimpleRecordSchema(fields); - Assert.fail("Was able to create two fields with same name"); - } catch (final IllegalArgumentException expected) { - } - } - - @Test - public void testPreventsTwoFieldsWithConflictingNamesAliases() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); - fields.add(new RecordField("bar", RecordFieldType.STRING.getDataType())); - - try { - new SimpleRecordSchema(fields); - Assert.fail("Was able to create two fields with conflicting names/aliases"); - } catch (final IllegalArgumentException expected) { - } - } - - private Set set(final String... values) { - final Set set = new HashSet<>(); - for (final String value : values) { - set.add(value); - } - return set; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java deleted file mode 100644 index 82e20a6..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.serialization.record; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.junit.Assert; -import org.junit.Test; - -public class TestMapRecord { - - @Test - public void testDefaultValue() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); - fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - final Record record = new MapRecord(schema, values); - - assertNull(record.getValue("noDefault")); - assertEquals("hello", record.getValue("defaultOfHello")); - } - - @Test - public void testDefaultValueInGivenField() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); - fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - final Record record = new MapRecord(schema, values); - - assertNull(record.getValue("noDefault")); - assertEquals("hello", record.getValue("defaultOfHello")); - - final RecordField newField = new RecordField("noDefault", RecordFieldType.STRING.getDataType(), "new"); - assertEquals("new", record.getValue(newField)); - } - - @Test - public void testIllegalDefaultValue() { - new RecordField("hello", RecordFieldType.STRING.getDataType(), 84); - new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null); - new RecordField("hello", RecordFieldType.INT.getDataType(), 84); - new RecordField("hello", RecordFieldType.INT.getDataType(), (Object) null); - - try { - new RecordField("hello", RecordFieldType.INT.getDataType(), "foo"); - Assert.fail("Was able to set a default value of \"foo\" for INT type"); - } catch (final IllegalArgumentException expected) { - // expected - } - } - - private Set set(final String... values) { - final Set set = new HashSet<>(); - for (final String value : values) { - set.add(value); - } - return set; - } - - @Test - public void testAliasOneValue() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - values.put("bar", 1); - - final Record record = new MapRecord(schema, values); - assertEquals(1, record.getValue("foo")); - assertEquals(1, record.getValue("bar")); - assertEquals(1, record.getValue("baz")); - } - - @Test - public void testAliasConflictingValues() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - values.put("bar", 1); - values.put("foo", null); - - final Record record = new MapRecord(schema, values); - assertEquals(1, record.getValue("foo")); - assertEquals(1, record.getValue("bar")); - assertEquals(1, record.getValue("baz")); - } - - @Test - public void testAliasConflictingAliasValues() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - values.put("baz", 1); - values.put("bar", 33); - - final Record record = new MapRecord(schema, values); - assertEquals(33, record.getValue("foo")); - assertEquals(33, record.getValue("bar")); - assertEquals(33, record.getValue("baz")); - } - - @Test - public void testAliasInGivenField() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - values.put("bar", 33); - - final Record record = new MapRecord(schema, values); - assertEquals(33, record.getValue("foo")); - assertEquals(33, record.getValue("bar")); - assertEquals(33, record.getValue("baz")); - - final RecordField noAlias = new RecordField("hello", RecordFieldType.STRING.getDataType()); - assertNull(record.getValue(noAlias)); - - final RecordField withAlias = new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("baz")); - assertEquals(33, record.getValue(withAlias)); - assertEquals("33", record.getAsString(withAlias, withAlias.getDataType().getFormat())); - } - - - @Test - public void testDefaultValueWithAliasValue() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - values.put("baz", 1); - values.put("bar", 33); - - final Record record = new MapRecord(schema, values); - assertEquals(33, record.getValue("foo")); - assertEquals(33, record.getValue("bar")); - assertEquals(33, record.getValue("baz")); - } - - @Test - public void testDefaultValueWithAliasesDefined() { - final List fields = new ArrayList<>(); - fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); - - final RecordSchema schema = new SimpleRecordSchema(fields); - final Map values = new HashMap<>(); - final Record record = new MapRecord(schema, values); - assertEquals("hello", record.getValue("foo")); - assertEquals("hello", record.getValue("bar")); - assertEquals("hello", record.getValue("baz")); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 16479f1..81a6775 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -35,9 +35,17 @@ org.apache.nifi + nifi-record + + + org.apache.nifi nifi-schema-registry-service-api + org.apache.nifi + nifi-avro-record-utils + + com.jayway.jsonpath json-path http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index f5b4373..ae6254e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; @@ -50,7 +51,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - final String schemaAccessStrategy = getConfigurationContext().getProperty(SCHEMA_ACCESS_STRATEGY).getValue(); + final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java index 621ec74..13a8317 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java @@ -17,39 +17,20 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.Array; -import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.util.Utf8; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.util.DataTypeUtils; -public abstract class AvroRecordReader implements RecordReader { +import java.io.IOException; +import java.util.Map; +public abstract class AvroRecordReader implements RecordReader { protected abstract GenericRecord nextAvroRecord() throws IOException; - @Override public Record nextRecord() throws IOException, MalformedRecordException { GenericRecord record = nextAvroRecord(); @@ -58,148 +39,8 @@ public abstract class AvroRecordReader implements RecordReader { } final RecordSchema schema = getSchema(); - final Map values = convertAvroRecordToMap(record, schema); + final Map values = AvroTypeUtil.convertAvroRecordToMap(record, schema); return new MapRecord(schema, values); } - - private Map convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) { - final Map values = new HashMap<>(recordSchema.getFieldCount()); - - for (final RecordField recordField : recordSchema.getFields()) { - Object value = avroRecord.get(recordField.getFieldName()); - if (value == null) { - for (final String alias : recordField.getAliases()) { - value = avroRecord.get(alias); - if (value != null) { - break; - } - } - } - - final String fieldName = recordField.getFieldName(); - final Field avroField = avroRecord.getSchema().getField(fieldName); - if (avroField == null) { - values.put(fieldName, null); - continue; - } - - final Schema fieldSchema = avroField.schema(); - final Object rawValue = normalizeValue(value, fieldSchema); - - final DataType desiredType = recordField.getDataType(); - final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName); - - values.put(fieldName, coercedValue); - } - - return values; - } - - private Object normalizeValue(final Object value, final Schema avroSchema) { - if (value == null) { - return null; - } - - switch (avroSchema.getType()) { - case INT: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return value; - } - - final String logicalName = logicalType.getName(); - if (LogicalTypes.date().getName().equals(logicalName)) { - // date logical name means that the value is number of days since Jan 1, 1970 - return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value)); - } else if (LogicalTypes.timeMillis().equals(logicalName)) { - // time-millis logical name means that the value is number of milliseconds since midnight. - return new java.sql.Time((int) value); - } - - break; - } - case LONG: { - final LogicalType logicalType = avroSchema.getLogicalType(); - if (logicalType == null) { - return value; - } - - final String logicalName = logicalType.getName(); - if (LogicalTypes.timeMicros().getName().equals(logicalName)) { - return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value)); - } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) { - return new java.sql.Timestamp((long) value); - } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) { - return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value)); - } - break; - } - case UNION: - if (value instanceof GenericData.Record) { - final GenericData.Record avroRecord = (GenericData.Record) value; - return normalizeValue(value, avroRecord.getSchema()); - } - break; - case RECORD: - final GenericData.Record record = (GenericData.Record) value; - final Schema recordSchema = record.getSchema(); - final List recordFields = recordSchema.getFields(); - final Map values = new HashMap<>(recordFields.size()); - for (final Field field : recordFields) { - final Object avroFieldValue = record.get(field.name()); - final Object fieldValue = normalizeValue(avroFieldValue, field.schema()); - values.put(field.name(), fieldValue); - } - final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); - return new MapRecord(childSchema, values); - case BYTES: - final ByteBuffer bb = (ByteBuffer) value; - return AvroTypeUtil.convertByteArray(bb.array()); - case FIXED: - final GenericFixed fixed = (GenericFixed) value; - return AvroTypeUtil.convertByteArray(fixed.bytes()); - case ENUM: - return value.toString(); - case NULL: - return null; - case STRING: - return value.toString(); - case ARRAY: - final Array array = (Array) value; - final Object[] valueArray = new Object[array.size()]; - for (int i = 0; i < array.size(); i++) { - final Schema elementSchema = avroSchema.getElementType(); - valueArray[i] = normalizeValue(array.get(i), elementSchema); - } - return valueArray; - case MAP: - final Map avroMap = (Map) value; - final Map map = new HashMap<>(avroMap.size()); - for (final Map.Entry entry : avroMap.entrySet()) { - Object obj = entry.getValue(); - if (obj instanceof Utf8 || obj instanceof CharSequence) { - obj = obj.toString(); - } - - final String key = entry.getKey().toString(); - obj = normalizeValue(obj, avroSchema.getValueType()); - - map.put(key, obj); - } - - final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); - final List mapFields = new ArrayList<>(); - for (final String key : map.keySet()) { - mapFields.add(new RecordField(key, elementType)); - } - final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); - return new MapRecord(mapSchema, map); - } - - return value; - } - - - } http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 381e978..62e53ea 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; @@ -60,7 +61,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.add(SCHEMA_ACCESS_STRATEGY); + properties.add(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY); properties.add(SCHEMA_REGISTRY); return properties; }