http://git-wip-us.apache.org/repos/asf/nifi/blob/59659232/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java
deleted file mode 100644
index e3f6db5..0000000
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/orc/OrcUtils.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.orc;
-
-import org.apache.avro.Schema;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.orc.TypeDescription;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
- */
-public class OrcUtils {
-
- public static void putToRowBatch(ColumnVector col, MutableInt vectorOffset, int rowNumber, Schema fieldSchema, Object o) {
- Schema.Type fieldType = fieldSchema.getType();
-
- if (fieldType == null) {
- throw new IllegalArgumentException("Field type is null");
- }
-
- if (o == null) {
- col.isNull[rowNumber] = true;
- } else {
-
- switch (fieldType) {
- case INT:
- ((LongColumnVector) col).vector[rowNumber] = (int) o;
- break;
- case LONG:
- ((LongColumnVector) col).vector[rowNumber] = (long) o;
- break;
- case BOOLEAN:
- ((LongColumnVector) col).vector[rowNumber] = ((boolean) o) ? 1 : 0;
- break;
- case BYTES:
- ByteBuffer byteBuffer = ((ByteBuffer) o);
- int size = byteBuffer.remaining();
- byte[] buf = new byte[size];
- byteBuffer.get(buf, 0, size);
- ((BytesColumnVector) col).setVal(rowNumber, buf);
- break;
- case DOUBLE:
- ((DoubleColumnVector) col).vector[rowNumber] = (double) o;
- break;
- case FLOAT:
- ((DoubleColumnVector) col).vector[rowNumber] = (float) o;
- break;
- case STRING:
- case ENUM:
- ((BytesColumnVector) col).setVal(rowNumber, o.toString().getBytes());
- break;
- case UNION:
- // If the union only has one non-null type in it, it was flattened in the ORC schema
- if (col instanceof UnionColumnVector) {
- UnionColumnVector union = ((UnionColumnVector) col);
- Schema.Type avroType = OrcUtils.getAvroSchemaTypeOfObject(o);
- // Find the index in the union with the matching Avro type
- int unionIndex = -1;
- List<Schema> types = fieldSchema.getTypes();
- final int numFields = types.size();
- for (int i = 0; i < numFields && unionIndex == -1; i++) {
- if (avroType.equals(types.get(i).getType())) {
- unionIndex = i;
- }
- }
- if (unionIndex == -1) {
- throw new IllegalArgumentException("Object type " + avroType.getName() + " not found in union '" + fieldSchema.getName() + "'");
- }
-
- // Need nested vector offsets
- MutableInt unionVectorOffset = new MutableInt(0);
- putToRowBatch(union.fields[unionIndex], unionVectorOffset, rowNumber, fieldSchema.getTypes().get(unionIndex), o);
- } else {
- // Find and use the non-null type from the union
- List<Schema> types = fieldSchema.getTypes();
- Schema effectiveType = null;
- for (Schema type : types) {
- if (!Schema.Type.NULL.equals(type.getType())) {
- effectiveType = type;
- break;
- }
- }
- putToRowBatch(col, vectorOffset, rowNumber, effectiveType, o);
- }
- break;
- case ARRAY:
- Schema arrayType = fieldSchema.getElementType();
- ListColumnVector array = ((ListColumnVector) col);
- if (o instanceof int[] || o instanceof long[]) {
- int length = (o instanceof int[]) ? ((int[]) o).length : ((long[]) o).length;
- for (int i = 0; i < length; i++) {
- ((LongColumnVector) array.child).vector[vectorOffset.getValue() + i] =
- (o instanceof int[]) ? ((int[]) o)[i] : ((long[]) o)[i];
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = length;
- vectorOffset.add(length);
- } else if (o instanceof float[]) {
- float[] floatArray = (float[]) o;
- for (int i = 0; i < floatArray.length; i++) {
- ((DoubleColumnVector) array.child).vector[vectorOffset.getValue() + i] = floatArray[i];
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = floatArray.length;
- vectorOffset.add(floatArray.length);
- } else if (o instanceof double[]) {
- double[] doubleArray = (double[]) o;
- for (int i = 0; i < doubleArray.length; i++) {
- ((DoubleColumnVector) array.child).vector[vectorOffset.getValue() + i] = doubleArray[i];
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = doubleArray.length;
- vectorOffset.add(doubleArray.length);
- } else if (o instanceof String[]) {
- String[] stringArray = (String[]) o;
- BytesColumnVector byteCol = ((BytesColumnVector) array.child);
- for (int i = 0; i < stringArray.length; i++) {
- if (stringArray[i] == null) {
- byteCol.isNull[rowNumber] = true;
- } else {
- byteCol.setVal(vectorOffset.getValue() + i, stringArray[i].getBytes());
- }
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = stringArray.length;
- vectorOffset.add(stringArray.length);
- } else if (o instanceof Map[]) {
- Map[] mapArray = (Map[]) o;
- MutableInt mapVectorOffset = new MutableInt(0);
- for (int i = 0; i < mapArray.length; i++) {
- if (mapArray[i] == null) {
- array.child.isNull[rowNumber] = true;
- } else {
- putToRowBatch(array.child, mapVectorOffset, vectorOffset.getValue() + i, arrayType, mapArray[i]);
- }
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = mapArray.length;
- vectorOffset.add(mapArray.length);
- } else if (o instanceof List) {
- List listArray = (List) o;
- MutableInt listVectorOffset = new MutableInt(0);
- int numElements = listArray.size();
- for (int i = 0; i < numElements; i++) {
- if (listArray.get(i) == null) {
- array.child.isNull[rowNumber] = true;
- } else {
- putToRowBatch(array.child, listVectorOffset, vectorOffset.getValue() + i, arrayType, listArray.get(i));
- }
- }
- array.offsets[rowNumber] = vectorOffset.longValue();
- array.lengths[rowNumber] = numElements;
- vectorOffset.add(numElements);
-
- } else {
- throw new IllegalArgumentException("Object class " + o.getClass().getName() + " not supported as an ORC list/array");
- }
- break;
- case MAP:
- MapColumnVector map = ((MapColumnVector) col);
-
- // Avro maps require String keys
- @SuppressWarnings("unchecked")
- Map<String, ?> mapObj = (Map<String, ?>) o;
- int effectiveRowNumber = vectorOffset.getValue();
- for (Map.Entry<String, ?> entry : mapObj.entrySet()) {
- putToRowBatch(map.keys, vectorOffset, effectiveRowNumber, Schema.create(Schema.Type.STRING), entry.getKey());
- putToRowBatch(map.values, vectorOffset, effectiveRowNumber, fieldSchema.getValueType(), entry.getValue());
- effectiveRowNumber++;
- }
- map.offsets[rowNumber] = vectorOffset.longValue();
- map.lengths[rowNumber] = mapObj.size();
- vectorOffset.add(mapObj.size());
-
- break;
- default:
- throw new IllegalArgumentException("Field type " + fieldType.getName() + " not recognized");
- }
- }
- }
-
- public static String normalizeHiveTableName(String name) {
- return name.replaceAll("[\\. ]", "_");
- }
-
- public static String generateHiveDDL(Schema avroSchema, String tableName) {
- Schema.Type schemaType = avroSchema.getType();
- StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
- sb.append(tableName);
- sb.append(" (");
- if (Schema.Type.RECORD.equals(schemaType)) {
- List<String> hiveColumns = new ArrayList<>();
- List<Schema.Field> fields = avroSchema.getFields();
- if (fields != null) {
- hiveColumns.addAll(
- fields.stream().map(field -> field.name() + " " + getHiveTypeFromAvroType(field.schema())).collect(Collectors.toList()));
- }
- sb.append(StringUtils.join(hiveColumns, ", "));
- sb.append(") STORED AS ORC");
- return sb.toString();
- } else {
- throw new IllegalArgumentException("Avro schema is of type " + schemaType.getName() + ", not RECORD");
- }
- }
-
-
- public static void addOrcField(TypeDescription orcSchema, Schema.Field avroField) {
- Schema fieldSchema = avroField.schema();
- String fieldName = avroField.name();
-
- orcSchema.addField(fieldName, getOrcField(fieldSchema));
- }
-
- public static TypeDescription getOrcField(Schema fieldSchema) throws IllegalArgumentException {
- Schema.Type fieldType = fieldSchema.getType();
-
- switch (fieldType) {
- case INT:
- case LONG:
- case BOOLEAN:
- case BYTES:
- case DOUBLE:
- case FLOAT:
- case STRING:
- return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
-
- case UNION:
- List<Schema> unionFieldSchemas = fieldSchema.getTypes();
- TypeDescription unionSchema = TypeDescription.createUnion();
- if (unionFieldSchemas != null) {
- // Ignore null types in union
- List<TypeDescription> orcFields = unionFieldSchemas.stream().filter(
- unionFieldSchema -> !Schema.Type.NULL.equals(unionFieldSchema.getType())).map(OrcUtils::getOrcField).collect(Collectors.toList());
-
-
- // Flatten the field if the union only has one non-null element
- if (orcFields.size() == 1) {
- return orcFields.get(0);
- } else {
- orcFields.forEach(unionSchema::addUnionChild);
- }
- }
- return unionSchema;
-
- case ARRAY:
- return TypeDescription.createList(getOrcField(fieldSchema.getElementType()));
-
- case MAP:
- return TypeDescription.createMap(TypeDescription.createString(), getOrcField(fieldSchema.getValueType()));
-
- case RECORD:
- TypeDescription record = TypeDescription.createStruct();
- List<Schema.Field> avroFields = fieldSchema.getFields();
- if (avroFields != null) {
- avroFields.forEach(avroField -> addOrcField(record, avroField));
- }
- return record;
-
- case ENUM:
- // An enum value is just a String for ORC/Hive
- return TypeDescription.createString();
-
- default:
- throw new IllegalArgumentException("Did not recognize Avro type " + fieldType.getName());
- }
-
- }
-
- public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
- if (o == null) {
- return Schema.Type.NULL;
- } else if (o instanceof Integer) {
- return Schema.Type.INT;
- } else if (o instanceof Long) {
- return Schema.Type.LONG;
- } else if (o instanceof Boolean) {
- return Schema.Type.BOOLEAN;
- } else if (o instanceof byte[]) {
- return Schema.Type.BYTES;
- } else if (o instanceof Float) {
- return Schema.Type.FLOAT;
- } else if (o instanceof Double) {
- return Schema.Type.DOUBLE;
- } else if (o instanceof Enum) {
- return Schema.Type.ENUM;
- } else if (o instanceof Object[]) {
- return Schema.Type.ARRAY;
- } else if (o instanceof List) {
- return Schema.Type.ARRAY;
- } else if (o instanceof Map) {
- return Schema.Type.MAP;
- } else {
- throw new IllegalArgumentException("Object of class " + o.getClass() + " is not a supported Avro Type");
- }
- }
-
- public static TypeDescription getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws IllegalArgumentException {
- if (avroType == null) {
- throw new IllegalArgumentException("Avro type is null");
- }
- switch (avroType) {
- case INT:
- return TypeDescription.createInt();
- case LONG:
- return TypeDescription.createLong();
- case BOOLEAN:
- return TypeDescription.createBoolean();
- case BYTES:
- return TypeDescription.createBinary();
- case DOUBLE:
- return TypeDescription.createDouble();
- case FLOAT:
- return TypeDescription.createFloat();
- case STRING:
- return TypeDescription.createString();
- default:
- throw new IllegalArgumentException("Avro type " + avroType.getName() + " is not a primitive type");
- }
- }
-
- public static String getHiveTypeFromAvroType(Schema avroSchema) {
- if (avroSchema == null) {
- throw new IllegalArgumentException("Avro schema is null");
- }
-
- Schema.Type avroType = avroSchema.getType();
-
- switch (avroType) {
- case INT:
- return "INT";
- case LONG:
- return "BIGINT";
- case BOOLEAN:
- return "BOOLEAN";
- case BYTES:
- return "BINARY";
- case DOUBLE:
- return "DOUBLE";
- case FLOAT:
- return "FLOAT";
- case STRING:
- case ENUM:
- return "STRING";
- case UNION:
- List<Schema> unionFieldSchemas = avroSchema.getTypes();
- if (unionFieldSchemas != null) {
- List<String> hiveFields = new ArrayList<>();
- for (Schema unionFieldSchema : unionFieldSchemas) {
- Schema.Type unionFieldSchemaType = unionFieldSchema.getType();
- // Ignore null types in union
- if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
- hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema));
- }
- }
- // Flatten the field if the union only has one non-null element
- return (hiveFields.size() == 1)
- ? hiveFields.get(0)
- : "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">";
-
- }
- break;
- case MAP:
- return "MAP<STRING, " + getHiveTypeFromAvroType(avroSchema.getValueType()) + ">";
- case ARRAY:
- return "ARRAY<" + getHiveTypeFromAvroType(avroSchema.getElementType()) + ">";
- case RECORD:
- List<Schema.Field> recordFields = avroSchema.getFields();
- if (recordFields != null) {
- List<String> hiveFields = recordFields.stream().map(
- recordField -> recordField.name() + ":" + getHiveTypeFromAvroType(recordField.schema())).collect(Collectors.toList());
- return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
- }
- break;
- default:
- break;
- }
-
- throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type");
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/59659232/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 45e1f29..55ee005 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,5 +14,4 @@
# limitations under the License.
org.apache.nifi.processors.hive.SelectHiveQL
org.apache.nifi.processors.hive.PutHiveQL
-org.apache.nifi.processors.hive.ConvertAvroToORC
org.apache.nifi.processors.hive.PutHiveStreaming
http://git-wip-us.apache.org/repos/asf/nifi/blob/59659232/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
deleted file mode 100644
index 9afcf7f..0000000
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hive;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.util.orc.TestOrcUtils;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FileOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Unit tests for ConvertAvroToORC processor
- */
-public class TestConvertAvroToORC {
-
- private ConvertAvroToORC processor;
- private TestRunner runner;
-
- @Before
- public void setUp() throws Exception {
- processor = new ConvertAvroToORC();
- runner = TestRunners.newTestRunner(processor);
- }
-
- @Test
- public void test_Setup() throws Exception {
-
- }
-
- @Test
- public void test_onTrigger_primitive_record() throws Exception {
- GenericData.Record record = TestOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");
-
- DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
- DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- fileWriter.create(record.getSchema(), out);
- fileWriter.append(record);
- // Put another record in
- record = TestOrcUtils.buildPrimitiveAvroRecord(1, 2L, false, 3.0f, 4L, StandardCharsets.UTF_8.encode("I am"), "another record");
- fileWriter.append(record);
- // And one more
- record = TestOrcUtils.buildPrimitiveAvroRecord(100, 200L, true, 300.0f, 400L, StandardCharsets.UTF_8.encode("Me"), "too!");
- fileWriter.append(record);
- fileWriter.flush();
- fileWriter.close();
- out.close();
- Map<String,String> attributes = new HashMap<String,String>(){{
- put(CoreAttributes.FILENAME.key(), "test.avro");
- }};
- runner.enqueue(out.toByteArray(), attributes);
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
-
- // Write the flow file out to disk, since the ORC Reader needs a path
- MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
- assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
- + " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
- assertEquals("3", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
- assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
- byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
- FileOutputStream fos = new FileOutputStream("target/test1.orc");
- fos.write(resultContents);
- fos.flush();
- fos.close();
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.getLocal(conf);
- Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
- RecordReader rows = reader.rows();
- VectorizedRowBatch batch = reader.getSchema().createRowBatch();
- assertTrue(rows.nextBatch(batch));
- assertTrue(batch.cols[0] instanceof LongColumnVector);
- assertEquals(10, ((LongColumnVector) batch.cols[0]).vector[0]);
- assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[1]);
- assertEquals(100, ((LongColumnVector) batch.cols[0]).vector[2]);
- assertTrue(batch.cols[1] instanceof LongColumnVector);
- assertEquals(20, ((LongColumnVector) batch.cols[1]).vector[0]);
- assertEquals(2, ((LongColumnVector) batch.cols[1]).vector[1]);
- assertEquals(200, ((LongColumnVector) batch.cols[1]).vector[2]);
- assertTrue(batch.cols[2] instanceof LongColumnVector);
- assertEquals(1, ((LongColumnVector) batch.cols[2]).vector[0]);
- assertEquals(0, ((LongColumnVector) batch.cols[2]).vector[1]);
- assertEquals(1, ((LongColumnVector) batch.cols[2]).vector[2]);
- assertTrue(batch.cols[3] instanceof DoubleColumnVector);
- assertEquals(30.0f, ((DoubleColumnVector) batch.cols[3]).vector[0], Double.MIN_NORMAL);
- assertEquals(3.0f, ((DoubleColumnVector) batch.cols[3]).vector[1], Double.MIN_NORMAL);
- assertEquals(300.0f, ((DoubleColumnVector) batch.cols[3]).vector[2], Double.MIN_NORMAL);
- assertTrue(batch.cols[4] instanceof DoubleColumnVector);
- assertEquals(40.0f, ((DoubleColumnVector) batch.cols[4]).vector[0], Double.MIN_NORMAL);
- assertEquals(4.0f, ((DoubleColumnVector) batch.cols[4]).vector[1], Double.MIN_NORMAL);
- assertEquals(400.0f, ((DoubleColumnVector) batch.cols[4]).vector[2], Double.MIN_NORMAL);
- assertTrue(batch.cols[5] instanceof BytesColumnVector);
- assertEquals("Hello", ((BytesColumnVector) batch.cols[5]).toString(0));
- assertEquals("I am", ((BytesColumnVector) batch.cols[5]).toString(1));
- assertEquals("Me", ((BytesColumnVector) batch.cols[5]).toString(2));
- assertTrue(batch.cols[6] instanceof BytesColumnVector);
- assertEquals("World", ((BytesColumnVector) batch.cols[6]).toString(0));
- assertEquals("another record", ((BytesColumnVector) batch.cols[6]).toString(1));
- assertEquals("too!", ((BytesColumnVector) batch.cols[6]).toString(2));
- }
-
- @Test
- public void test_onTrigger_complex_record() throws Exception {
-
- Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
- put("key1", 1.0);
- put("key2", 2.0);
- }};
-
- GenericData.Record record = TestOrcUtils.buildComplexAvroRecord(10, mapData1, "DEF", 3.0f, Arrays.asList(10, 20));
-
- DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
- DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- fileWriter.create(record.getSchema(), out);
- fileWriter.append(record);
-
- // Put another record in
- Map<String, Double> mapData2 = new TreeMap<String, Double>() {{
- put("key1", 3.0);
- put("key2", 4.0);
- }};
-
- record = TestOrcUtils.buildComplexAvroRecord(null, mapData2, "XYZ", 4L, Arrays.asList(100, 200));
- fileWriter.append(record);
-
- fileWriter.flush();
- fileWriter.close();
- out.close();
-
- Map<String,String> attributes = new HashMap<String,String>(){{
- put(CoreAttributes.FILENAME.key(), "test");
- }};
- runner.enqueue(out.toByteArray(), attributes);
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
-
- // Write the flow file out to disk, since the ORC Reader needs a path
- MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
- assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS complex_record " +
- "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
- + " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
- assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
- assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
- byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
- FileOutputStream fos = new FileOutputStream("target/test1.orc");
- fos.write(resultContents);
- fos.flush();
- fos.close();
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.getLocal(conf);
- Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
- RecordReader rows = reader.rows();
- VectorizedRowBatch batch = reader.getSchema().createRowBatch();
- assertTrue(rows.nextBatch(batch));
- assertTrue(batch.cols[0] instanceof LongColumnVector);
- assertEquals(10, ((LongColumnVector) batch.cols[0]).vector[0]);
- assertTrue(batch.cols[1] instanceof MapColumnVector);
- assertTrue(batch.cols[2] instanceof BytesColumnVector);
- assertTrue(batch.cols[3] instanceof UnionColumnVector);
- StringBuilder buffer = new StringBuilder();
- batch.cols[3].stringifyValue(buffer, 1);
- assertEquals("{\"tag\": 0, \"value\": 4}", buffer.toString());
- assertTrue(batch.cols[4] instanceof ListColumnVector);
- }
-
- @Test
- public void test_onTrigger_multiple_batches() throws Exception {
-
- Schema recordSchema = TestOrcUtils.buildPrimitiveAvroSchema();
- DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(recordSchema);
- DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- fileWriter.create(recordSchema, out);
-
- GenericData.Record record;
- for (int i = 1;i<=2000;i++) {
- record = TestOrcUtils.buildPrimitiveAvroRecord(i, 2L * i, true, 30.0f * i, 40L * i, StandardCharsets.UTF_8.encode("Hello"), "World");
-
-
- fileWriter.append(record);
- }
-
- fileWriter.flush();
- fileWriter.close();
- out.close();
- runner.enqueue(out.toByteArray());
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
-
- // Write the flow file out to disk, since the ORC Reader needs a path
- MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
- assertEquals("2000", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
- byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
- FileOutputStream fos = new FileOutputStream("target/test1.orc");
- fos.write(resultContents);
- fos.flush();
- fos.close();
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.getLocal(conf);
- Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
- RecordReader rows = reader.rows();
- VectorizedRowBatch batch = reader.getSchema().createRowBatch();
- assertTrue(rows.nextBatch(batch));
- // At least 2 batches were created
- assertTrue(rows.nextBatch(batch));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/59659232/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestOrcUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestOrcUtils.java
deleted file mode 100644
index 539fddb..0000000
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestOrcUtils.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util.orc;
-
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.generic.GenericData;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unit tests for the OrcUtils helper class
- */
-public class TestOrcUtils {
-
- @Test
- public void test_getOrcField_primitive() throws Exception {
- // Expected ORC types
- TypeDescription[] expectedTypes = {
- TypeDescription.createInt(),
- TypeDescription.createLong(),
- TypeDescription.createBoolean(),
- TypeDescription.createFloat(),
- TypeDescription.createDouble(),
- TypeDescription.createBinary(),
- TypeDescription.createString(),
- };
-
- // Build a fake Avro record with all types
- Schema testSchema = buildPrimitiveAvroSchema();
- List<Schema.Field> fields = testSchema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- assertEquals(expectedTypes[i], OrcUtils.getOrcField(fields.get(i).schema()));
- }
-
- }
-
- @Test
- public void test_getOrcField_union_optional_type() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("union").type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("union").schema());
- assertEquals(TypeDescription.createBoolean(), orcType);
- }
-
- @Test
- public void test_getOrcField_union() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("union").type().unionOf().intType().and().booleanType().endUnion().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("union").schema());
- assertEquals(
- TypeDescription.createUnion()
- .addUnionChild(TypeDescription.createInt())
- .addUnionChild(TypeDescription.createBoolean()),
- orcType);
- }
-
- @Test
- public void test_getOrcField_map() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("map").type().map().values().doubleType().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("map").schema());
- assertEquals(
- TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createDouble()),
- orcType);
- }
-
- @Test
- public void test_getOrcField_nested_map() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("map").type().map().values().map().values().doubleType().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("map").schema());
- assertEquals(
- TypeDescription.createMap(TypeDescription.createString(),
- TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createDouble())),
- orcType);
- }
-
- @Test
- public void test_getOrcField_array() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().longType().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("array").schema());
- assertEquals(
- TypeDescription.createList(TypeDescription.createLong()),
- orcType);
- }
-
- @Test
- public void test_getOrcField_complex_array() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().map().values().floatType().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("array").schema());
- assertEquals(
- TypeDescription.createList(TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createFloat())),
- orcType);
- }
-
- @Test
- public void test_getOrcField_record() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("int").type().intType().noDefault();
- builder.name("long").type().longType().longDefault(1L);
- builder.name("array").type().array().items().stringType().noDefault();
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema);
- assertEquals(
- TypeDescription.createStruct()
- .addField("int", TypeDescription.createInt())
- .addField("long", TypeDescription.createLong())
- .addField("array", TypeDescription.createList(TypeDescription.createString())),
- orcType);
- }
-
- @Test
- public void test_getOrcField_enum() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("enumField").type().enumeration("enum").symbols("a", "b", "c").enumDefault("a");
- Schema testSchema = builder.endRecord();
- TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("enumField").schema());
- assertEquals(TypeDescription.createString(), orcType);
- }
-
- @Test
- public void test_getPrimitiveOrcTypeFromPrimitiveAvroType() throws Exception {
- // Expected ORC types
- TypeDescription[] expectedTypes = {
- TypeDescription.createInt(),
- TypeDescription.createLong(),
- TypeDescription.createBoolean(),
- TypeDescription.createFloat(),
- TypeDescription.createDouble(),
- TypeDescription.createBinary(),
- TypeDescription.createString(),
- };
-
- Schema testSchema = buildPrimitiveAvroSchema();
- List<Schema.Field> fields = testSchema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- assertEquals(expectedTypes[i], OrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(fields.get(i).schema().getType()));
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void test_getPrimitiveOrcTypeFromPrimitiveAvroType_badType() throws Exception {
- Schema.Type nonPrimitiveType = Schema.Type.ARRAY;
- OrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(nonPrimitiveType);
- }
-
- @Test
- public void test_putRowToBatch() {
- TypeDescription orcSchema = buildPrimitiveOrcSchema();
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- Schema avroSchema = buildPrimitiveAvroSchema();
- List<Schema.Field> fields = avroSchema.getFields();
- GenericData.Record record = buildPrimitiveAvroRecord(1, 2L, false, 1.0f, 3.0, ByteBuffer.wrap("Hello".getBytes()), "World");
- for (int i = 0; i < fields.size(); i++) {
- OrcUtils.putToRowBatch(batch.cols[i], new MutableInt(0), 0, fields.get(i).schema(), record.get(i));
- }
-
- assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[0]);
- assertEquals(2, ((LongColumnVector) batch.cols[1]).vector[0]);
- assertEquals(0, ((LongColumnVector) batch.cols[2]).vector[0]);
- assertEquals(1.0, ((DoubleColumnVector) batch.cols[3]).vector[0], Double.MIN_NORMAL);
- assertEquals(3.0, ((DoubleColumnVector) batch.cols[4]).vector[0], Double.MIN_NORMAL);
- assertEquals("Hello", ((BytesColumnVector) batch.cols[5]).toString(0));
- assertEquals("World", ((BytesColumnVector) batch.cols[6]).toString(0));
-
- }
-
- @Test
- public void test_putRowToBatch_union() {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("union").type().unionOf().intType().and().floatType().endUnion().noDefault();
- Schema testSchema = builder.endRecord();
-
- GenericData.Record row = new GenericData.Record(testSchema);
- row.put("union", 2);
-
- TypeDescription orcSchema = TypeDescription.createUnion()
- .addUnionChild(TypeDescription.createInt())
- .addUnionChild(TypeDescription.createFloat());
-
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("union").schema(), row.get("union"));
-
- UnionColumnVector union = ((UnionColumnVector) batch.cols[0]);
- // verify the value is in the union field of type 'int'
- assertEquals(2, ((LongColumnVector) union.fields[0]).vector[0]);
- assertEquals(0.0, ((DoubleColumnVector) union.fields[1]).vector[0], Double.MIN_NORMAL);
-
- row.put("union", 2.0f);
- OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 1, testSchema.getField("union").schema(), row.get("union"));
-
- union = ((UnionColumnVector) batch.cols[0]);
- // verify the value is in the union field of type 'double'
- assertEquals(0, ((LongColumnVector) union.fields[0]).vector[1]);
- assertEquals(2.0, ((DoubleColumnVector) union.fields[1]).vector[1], Double.MIN_NORMAL);
- }
-
- @Test
- public void test_putRowToBatch_optional_union() {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("union").type().unionOf().nullType().and().floatType().endUnion().noDefault();
- Schema testSchema = builder.endRecord();
-
- GenericData.Record row = new GenericData.Record(testSchema);
- row.put("union", 2.0f);
-
- TypeDescription orcSchema = TypeDescription.createFloat();
-
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("union").schema(), row.get("union"));
-
- assertTrue(batch.cols[0] instanceof DoubleColumnVector);
-
- DoubleColumnVector union = ((DoubleColumnVector) batch.cols[0]);
- // verify the value is in the union field of type 'int'
- assertEquals(2.0, union.vector[0], Double.MIN_NORMAL);
-
- }
-
- @Test
- public void test_putRowToBatch_array_ints() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().intType().noDefault();
- Schema testSchema = builder.endRecord();
-
- GenericData.Record row = new GenericData.Record(testSchema);
- int[] data1 = {1, 2, 3, 4, 5};
- row.put("array", data1);
-
- TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- MutableInt vectorOffset = new MutableInt(0);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
-
- int[] data2 = {10, 20, 30, 40};
- row.put("array", data2);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
-
- ListColumnVector array = ((ListColumnVector) batch.cols[0]);
- LongColumnVector dataColumn = ((LongColumnVector) array.child);
- // Check the first row, entries 0..4 should have values 1..5
- for (int i = 0; i < 5; i++) {
- assertEquals(i + 1, dataColumn.vector[i]);
- }
- // Check the second row, entries 5..8 should have values 10..40 (by tens)
- for (int i = 0; i < 4; i++) {
- assertEquals((i + 1) * 10, dataColumn.vector[(int) array.offsets[1] + i]);
- }
- assertEquals(0, dataColumn.vector[9]);
- }
-
- @Test
- public void test_putRowToBatch_array_floats() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().floatType().noDefault();
- Schema testSchema = builder.endRecord();
-
- GenericData.Record row = new GenericData.Record(testSchema);
- float[] data1 = {1.0f, 2.0f, 3.0f};
- row.put("array", data1);
-
- TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- MutableInt vectorOffset = new MutableInt(0);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
-
- float[] data2 = {40.0f, 41.0f, 42.0f, 43.0f};
- row.put("array", data2);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
-
- ListColumnVector array = ((ListColumnVector) batch.cols[0]);
- DoubleColumnVector dataColumn = ((DoubleColumnVector) array.child);
- // Check the first row, entries 0..4 should have values 1..5
- for (int i = 0; i < 3; i++) {
- assertEquals(i + 1.0f, dataColumn.vector[i], Float.MIN_NORMAL);
- }
- // Check the second row, entries 5..8 should have values 10..40 (by tens)
- for (int i = 0; i < 4; i++) {
- assertEquals((i + 40.0f), dataColumn.vector[(int) array.offsets[1] + i], Float.MIN_NORMAL);
- }
- assertEquals(0.0f, dataColumn.vector[9], Float.MIN_NORMAL);
- }
-
- @Test
- public void test_putRowToBatch_list_doubles() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().doubleType().noDefault();
- Schema testSchema = builder.endRecord();
-
- GenericData.Record row = new GenericData.Record(testSchema);
- List<Double> data1 = Arrays.asList(1.0, 2.0, 3.0);
- row.put("array", data1);
-
- TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- MutableInt vectorOffset = new MutableInt(0);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
-
- List<Double> data2 = Arrays.asList(40.0, 41.0, 42.0, 43.0);
- row.put("array", data2);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
-
- ListColumnVector array = ((ListColumnVector) batch.cols[0]);
- DoubleColumnVector dataColumn = ((DoubleColumnVector) array.child);
- // Check the first row, entries 0..4 should have values 1..5
- for (int i = 0; i < 3; i++) {
- assertEquals(i + 1.0f, dataColumn.vector[i], Float.MIN_NORMAL);
- }
- // Check the second row, entries 5..8 should have values 10..40 (by tens)
- for (int i = 0; i < 4; i++) {
- assertEquals((i + 40.0), dataColumn.vector[(int) array.offsets[1] + i], Float.MIN_NORMAL);
- }
- assertEquals(0.0, dataColumn.vector[9], Float.MIN_NORMAL);
- }
-
- @Test
- public void test_putRowToBatch_array_of_maps() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("array").type().array().items().map().values().floatType().noDefault();
- Schema testSchema = builder.endRecord();
-
- Map<String, Float> map1 = new TreeMap<String, Float>() {{
- put("key10", 10.0f);
- put("key20", 20.0f);
- }};
-
- Map<String, Float> map2 = new TreeMap<String, Float>() {{
- put("key101", 101.0f);
- put("key202", 202.0f);
- }};
-
- Map[] maps = new Map[]{map1, map2, null};
- GenericData.Record row = new GenericData.Record(testSchema);
- row.put("array", maps);
-
- TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("array").schema(), row.get("array"));
-
- ListColumnVector array = ((ListColumnVector) batch.cols[0]);
- MapColumnVector map = ((MapColumnVector) array.child);
- StringBuilder buffer = new StringBuilder();
- map.stringifyValue(buffer, 0);
- assertEquals("[{\"key\": \"key10\", \"value\": 10.0}, {\"key\": \"key20\", \"value\": 20.0}]", buffer.toString());
- buffer = new StringBuilder();
- map.stringifyValue(buffer, 1);
- assertEquals("[{\"key\": \"key101\", \"value\": 101.0}, {\"key\": \"key202\", \"value\": 202.0}]", buffer.toString());
-
- }
-
- @Test
- public void test_putRowToBatch_primitive_map() throws Exception {
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
- builder.name("map").type().map().values().longType().noDefault();
- Schema testSchema = builder.endRecord();
-
- Map<String, Long> mapData1 = new TreeMap<String, Long>() {{
- put("key10", 100L);
- put("key20", 200L);
- }};
-
- GenericData.Record row = new GenericData.Record(testSchema);
- row.put("map", mapData1);
-
- TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("map").schema());
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- batch.ensureSize(2);
- MutableInt vectorOffset = new MutableInt(0);
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("map").schema(), row.get("map"));
-
- Map<String, Long> mapData2 = new TreeMap<String, Long>() {{
- put("key1000", 1000L);
- put("key2000", 2000L);
- }};
-
- OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("map").schema(), mapData2);
-
- MapColumnVector map = ((MapColumnVector) batch.cols[0]);
- StringBuilder buffer = new StringBuilder();
- map.stringifyValue(buffer, 0);
- assertEquals("[{\"key\": \"key10\", \"value\": 100}, {\"key\": \"key20\", \"value\": 200}]", buffer.toString());
- buffer = new StringBuilder();
- map.stringifyValue(buffer, 1);
- assertEquals("[{\"key\": \"key1000\", \"value\": 1000}, {\"key\": \"key2000\", \"value\": 2000}]", buffer.toString());
-
- }
-
- @Test
- public void test_getHiveTypeFromAvroType_primitive() throws Exception {
- // Expected ORC types
- String[] expectedTypes = {
- "INT",
- "BIGINT",
- "BOOLEAN",
- "FLOAT",
- "DOUBLE",
- "BINARY",
- "STRING",
- };
-
- Schema testSchema = buildPrimitiveAvroSchema();
- List<Schema.Field> fields = testSchema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- assertEquals(expectedTypes[i], OrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
- }
- }
-
- @Test
- public void test_getHiveTypeFromAvroType_complex() throws Exception {
- // Expected ORC types
- String[] expectedTypes = {
- "INT",
- "MAP<STRING, DOUBLE>",
- "STRING",
- "UNIONTYPE<BIGINT, FLOAT>",
- "ARRAY<INT>"
- };
-
- Schema testSchema = buildComplexAvroSchema();
- List<Schema.Field> fields = testSchema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- assertEquals(expectedTypes[i], OrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
- }
-
- assertEquals("STRUCT<myInt:INT, myMap:MAP<STRING, DOUBLE>, myEnum:STRING, myLongOrFloat:UNIONTYPE<BIGINT, FLOAT>, myIntList:ARRAY<INT>>",
- OrcUtils.getHiveTypeFromAvroType(testSchema));
- }
-
- @Test
- public void test_generateHiveDDL_primitive() throws Exception {
- Schema avroSchema = buildPrimitiveAvroSchema();
- String ddl = OrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
- assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
- + " STORED AS ORC", ddl);
- }
-
- @Test
- public void test_generateHiveDDL_complex() throws Exception {
- Schema avroSchema = buildComplexAvroSchema();
- String ddl = OrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
- assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable "
- + "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
- + " STORED AS ORC", ddl);
- }
-
-
- //////////////////
- // Helper methods
- //////////////////
-
- public static Schema buildPrimitiveAvroSchema() {
- // Build a fake Avro record with all primitive types
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
- builder.name("int").type().intType().noDefault();
- builder.name("long").type().longType().longDefault(1L);
- builder.name("boolean").type().booleanType().booleanDefault(true);
- builder.name("float").type().floatType().floatDefault(0.0f);
- builder.name("double").type().doubleType().doubleDefault(0.0);
- builder.name("bytes").type().bytesType().noDefault();
- builder.name("string").type().stringType().stringDefault("default");
- return builder.endRecord();
- }
-
- public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
- Schema schema = buildPrimitiveAvroSchema();
- GenericData.Record row = new GenericData.Record(schema);
- row.put("int", i);
- row.put("long", l);
- row.put("boolean", b);
- row.put("float", f);
- row.put("double", d);
- row.put("bytes", bytes);
- row.put("string", string);
- return row;
- }
-
- public static TypeDescription buildPrimitiveOrcSchema() {
- return TypeDescription.createStruct()
- .addField("int", TypeDescription.createInt())
- .addField("long", TypeDescription.createLong())
- .addField("boolean", TypeDescription.createBoolean())
- .addField("float", TypeDescription.createFloat())
- .addField("double", TypeDescription.createDouble())
- .addField("bytes", TypeDescription.createBinary())
- .addField("string", TypeDescription.createString());
- }
-
- public static Schema buildComplexAvroSchema() {
- // Build a fake Avro record with nested types
- final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("complex.record").namespace("any.data").fields();
- builder.name("myInt").type().unionOf().nullType().and().intType().endUnion().nullDefault();
- builder.name("myMap").type().map().values().doubleType().noDefault();
- builder.name("myEnum").type().enumeration("myEnum").symbols("ABC", "DEF", "XYZ").enumDefault("ABC");
- builder.name("myLongOrFloat").type().unionOf().longType().and().floatType().endUnion().noDefault();
- builder.name("myIntList").type().array().items().intType().noDefault();
- return builder.endRecord();
- }
-
- public static GenericData.Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) {
- Schema schema = buildComplexAvroSchema();
- GenericData.Record row = new GenericData.Record(schema);
- row.put("myInt", i);
- row.put("myMap", m);
- row.put("myEnum", e);
- row.put("myLongOrFloat", unionVal);
- row.put("myIntList", intArray);
- return row;
- }
-}
\ No newline at end of file
|