arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Antonio Vilches <antonio.vilc...@shapelets.io>
Subject Issue while loading an arrow file with dictionaries
Date Mon, 04 Feb 2019 14:41:10 GMT
Hi there,

I´m a newbie arrow user and I want to develop an application which loads/stores information
from/to arrow. For this purpose, I started a very simple Kotlin application that first writes
an arrow file and later reads the information
from that arrow file. It makes use of the Arrow Java API (See code below).
This application sets an schema with 3 fields: 1 Float, 1 Int and 1 Varchar. Later, it writes
3 batches of ten elements (in Section Writer). In the last section (called Reader), It tries
to load the information that was stored in the Arrow file.

The application works properly if I avoid the usage of dictionaries for storing and loading
Strings in Utf-8 format. However, If I use dictionaries I get an exception (see below) while
trying to load the first batch by using the ArrowFileReader.loadRecordsBatch() function (Line
annoted with //ERROR HERE tag.).

Exception in thread "main" java.lang.IllegalArgumentException: not all nodes and buffers were
consumed. nodes: [] buffers: [ArrowBuf[52], udle: [34 448..458]]
                at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
                at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
                at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
                at org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
                at ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)

I´m not sure if I´m doing something wrong or it is just an issue, could anyone assist me
with this issue?
Thanks in advance,
Antonio Vilches.




fun main(args: Array<String>) {

    println("Writing and reading an apache arrow file")
    val numRecords = (10 * 3) // 30 in total
    val initialBatchSize = 10


    val recordsPerBatch = initialBatchSize

    val numBatches  = if ((numRecords % recordsPerBatch) == 0){
        numRecords / recordsPerBatch
    }else{
        numRecords / recordsPerBatch + 1
    }

    println("Reading and writing $numRecords records in $numBatches batches.")
    // *****************************************************************************************************************
    // Generate dictionary                                                               
                             *
    // *****************************************************************************************************************
    val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
    val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
    vector.setInitialCapacity(4)
    vector.allocateNew()

    vector.set(0, "a".toByteArray(Charsets.UTF_8))
    vector.set(1, "b".toByteArray(Charsets.UTF_8))
    vector.set(2, "c".toByteArray(Charsets.UTF_8))
    vector.set(3, "d".toByteArray(Charsets.UTF_8))

    vector.valueCount = 4
    val dictionary = Dictionary(vector, DictionaryEncoding("Tags".hashCode().toLong(), false,
null))
    dictionaryProvider.put(dictionary)

    // *****************************************************************************************************************
    // Create schema                                                                     
                             *
    // *****************************************************************************************************************
    val filePath = "./example.arrow"

    //Create Fields
    val temperature = Field("temperature", FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
null)
    val humidity = Field("humidity", FieldType.nullable(ArrowType.Int(32, true)), null)
    val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
    val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/ dic.encoding, null),
null)

    //Create schema
    val builder = mutableListOf<Field>()
    builder.add(temperature)
    builder.add(humidity)
    builder.add(tag)
    val schema = Schema(builder, null)

    // *****************************************************************************************************************
    // Write to Arrow                                                                    
                             *
    // *****************************************************************************************************************

    val fileToWrite = File(filePath)
    val writeStream = fileToWrite.outputStream()
    val schemaRoot = VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))
    val writer = ArrowFileWriter(schemaRoot, /*DictionaryProvider.MapDictionaryProvider()*/
dictionaryProvider, writeStream.channel)

    writer.start()

    schemaRoot.rowCount = recordsPerBatch
    for (batch in 0 until numBatches){
        val numItemsBatch = min(recordsPerBatch, numRecords - batch * recordsPerBatch)
        schemaRoot.rowCount = numItemsBatch
        schemaRoot.schema.fields.forEach {
            when(it.type){
                ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
                    val vector = schemaRoot.getVector(it.name) as Float8Vector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toDouble())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Int(32, true) -> {
                    val vector = schemaRoot.getVector(it.name) as IntVector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toInt())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Utf8() -> {
                    val vec = schemaRoot.getVector(it.name) as VarCharVector
                    vec.setInitialCapacity(recordsPerBatch)
                    vec.allocateNew()

                    //val encoded = DictionaryEncoder.encode(vec, dictionaryProvider.lookup("Tags".hashCode().toLong()))

                    for (i in 0 until recordsPerBatch) {
                        when(i % 4){
                            0 -> { vec.set(i, "a".toByteArray(Charsets.UTF_8)) }
                            1 -> { vec.set(i, "b".toByteArray(Charsets.UTF_8)) }
                            2 -> { vec.set(i, "c".toByteArray(Charsets.UTF_8)) }
                            3 -> { vec.set(i, "d".toByteArray(Charsets.UTF_8)) }
                        }
                    }
                    vec.valueCount = recordsPerBatch
                }
            }
        }
        writer.writeBatch()
    }
    writer.end()
    writer.close()

    // Need to close dictionary vectors
    for (id in dictionaryProvider.dictionaryIds) {
        dictionaryProvider.lookup(id).vector.close()
    }

    // *****************************************************************************************************************
    // Read from Arrow                                                                   
                             *
    // *****************************************************************************************************************

    //Accum results
    var accumDouble = 0.0
    var accumInt = 0L

    // Setting reading
    val fileToRead = File(filePath)
    val readStream = fileToRead.inputStream()
    val reader = ArrowFileReader(readStream.channel, RootAllocator(Long.MAX_VALUE))

    //println("Reading the arrow file : $filePath")
    val readRoot = reader.vectorSchemaRoot
    //val readSchema = readRoot.schema



    val arrowBlocks = reader.recordBlocks

    reader.recordBlocks.forEachIndexed { index, arrowBlock ->
        reader.loadRecordBatch(arrowBlock)  // ERROR HERE

        println("Reading Block[$index]: ${readRoot.rowCount} elements.")

        readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
            val minorType = fieldVector.minorType
            when(minorType){
                Types.MinorType.FLOAT8 -> {
                    val vecDouble = fieldVector as Float8Vector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumDouble += vecDouble.get(i)
                    }
                }
                Types.MinorType.INT -> {
                    val vecDouble = fieldVector as IntVector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumInt += vecDouble.get(i)
                    }
                }
                Types.MinorType.VARCHAR -> {
                    val vec = fieldVector as VarCharVector
                    val cap = vec.valueCapacity
                    var address = vec.dataBufferAddress
                    for (i in 0 until vec.valueCapacity){
                        println("Reading tag[$i] ${vec.get(i).toString(Charsets.UTF_8)}")
                    }
                }
            }
        }
    }

    reader.close()

    println("Double accum: $accumDouble")
    println("Long accum: $accumInt")

}

/**
 * This is a helper function to create VarCharVector for dictionary purposes.
 */
fun newVarCharVector(name: String, allocator: BufferAllocator): VarCharVector {
    return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name, allocator, null)
as VarCharVector

Mime
View raw message