arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Li Jin <ice.xell...@gmail.com>
Subject Re: Issue while loading an arrow file with dictionaries
Date Tue, 05 Feb 2019 15:26:06 GMT
Hi Antonio,

My memory is a little rusty now but if I remember correctly, when writing
to a dictionary encoded vector, the value should be of encoded value
instead of decoded, in your code:


    //
*****************************************************************************************************************
    // Write to Arrow
                                                *
    //
*****************************************************************************************************************

 ArrowType.Utf8() -> {
                    val vec = schemaRoot.getVector(it.name) as VarCharVector
                    vec.setInitialCapacity(recordsPerBatch)
                    vec.allocateNew()

                    //val encoded = DictionaryEncoder.encode(vec,
dictionaryProvider.lookup("Tags".hashCode().toLong()))

                    for (i in 0 until recordsPerBatch) {
                        when(i % 4){
                            0 -> { vec.set(i,
"a".toByteArray(Charsets.UTF_8)) }
                            1 -> { vec.set(i,
"b".toByteArray(Charsets.UTF_8)) }
                            2 -> { vec.set(i,
"c".toByteArray(Charsets.UTF_8)) }
                            3 -> { vec.set(i,
"d".toByteArray(Charsets.UTF_8)) }
                        }
                    }
                    vec.valueCount = recordsPerBatch
                }
}

Is writing the decoded value (UTF-8) however it should be writing the
encoded value.

However, is there an issue that:
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))

will create an decoded type vector instead of encoded, so you would
probably need to create vectors yourself instead of using
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE)).

For more details please see:
https://github.com/apache/arrow/pull/2681



On Mon, Feb 4, 2019 at 2:07 PM Wes McKinney <wesmckinn@gmail.com> wrote:

> not sure if you're subscribed to user@ but in case you have advice
>
> ---------- Forwarded message ---------
> From: Antonio Vilches <antonio.vilches@shapelets.io>
> Date: Mon, Feb 4, 2019 at 8:41 AM
> Subject: Issue while loading an arrow file with dictionaries
> To: user@arrow.apache.org <user@arrow.apache.org>
>
>
> Hi there,
>
>
>
> I´m a newbie arrow user and I want to develop an application which
> loads/stores information from/to arrow. For this purpose, I started a
> very simple Kotlin application that first writes an arrow file and
> later reads the information
>
> from that arrow file. It makes use of the Arrow Java API (See code below).
>
> This application sets an schema with 3 fields: 1 Float, 1 Int and 1
> Varchar. Later, it writes 3 batches of ten elements (in Section
> Writer). In the last section (called Reader), It tries to load the
> information that was stored in the Arrow file.
>
>
>
> The application works properly if I avoid the usage of dictionaries
> for storing and loading Strings in Utf-8 format. However, If I use
> dictionaries I get an exception (see below) while trying to load the
> first batch by using the ArrowFileReader.loadRecordsBatch() function
> (Line annoted with //ERROR HERE tag.).
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException: not all
> nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52],
> udle: [34 448..458]]
>
>                 at
> org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
>
>                 at
>
> ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
>
>
>
> I´m not sure if I´m doing something wrong or it is just an issue,
> could anyone assist me with this issue?
>
> Thanks in advance,
>
> Antonio Vilches.
>
>
>
>
>
>
>
> fun main(args: Array<String>) {
>
>     println("Writing and reading an apache arrow file")
>     val numRecords = (10 * 3) // 30 in total
>     val initialBatchSize = 10
>
>
>     val recordsPerBatch = initialBatchSize
>
>     val numBatches  = if ((numRecords % recordsPerBatch) == 0){
>         numRecords / recordsPerBatch
>     }else{
>         numRecords / recordsPerBatch + 1
>     }
>
>     println("Reading and writing $numRecords records in $numBatches
> batches.")
>     //
> *****************************************************************************************************************
>     // Generate dictionary
>                                                 *
>     //
> *****************************************************************************************************************
>     val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
>     val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
>     vector.setInitialCapacity(4)
>     vector.allocateNew()
>
>     vector.set(0, "a".toByteArray(Charsets.UTF_8))
>     vector.set(1, "b".toByteArray(Charsets.UTF_8))
>     vector.set(2, "c".toByteArray(Charsets.UTF_8))
>     vector.set(3, "d".toByteArray(Charsets.UTF_8))
>
>     vector.valueCount = 4
>     val dictionary = Dictionary(vector,
> DictionaryEncoding("Tags".hashCode().toLong(), false, null))
>     dictionaryProvider.put(dictionary)
>
>     //
> *****************************************************************************************************************
>     // Create schema
>                                                 *
>     //
> *****************************************************************************************************************
>     val filePath = "./example.arrow"
>
>     //Create Fields
>     val temperature = Field("temperature",
> FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
> null)
>     val humidity = Field("humidity",
> FieldType.nullable(ArrowType.Int(32, true)), null)
>     val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
>     val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/
> dic.encoding, null), null)
>
>     //Create schema
>     val builder = mutableListOf<Field>()
>     builder.add(temperature)
>     builder.add(humidity)
>     builder.add(tag)
>     val schema = Schema(builder, null)
>
>     //
> *****************************************************************************************************************
>     // Write to Arrow
>                                                 *
>     //
> *****************************************************************************************************************
>
>     val fileToWrite = File(filePath)
>     val writeStream = fileToWrite.outputStream()
>     val schemaRoot = VectorSchemaRoot.create(schema,
> RootAllocator(Long.MAX_VALUE))
>     val writer = ArrowFileWriter(schemaRoot,
> /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider,
> writeStream.channel)
>
>     writer.start()
>
>     schemaRoot.rowCount = recordsPerBatch
>     for (batch in 0 until numBatches){
>         val numItemsBatch = min(recordsPerBatch, numRecords - batch *
> recordsPerBatch)
>         schemaRoot.rowCount = numItemsBatch
>         schemaRoot.schema.fields.forEach {
>             when(it.type){
>                 ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
>                     val vector = schemaRoot.getVector(it.name) as
> Float8Vector
>                     vector.setInitialCapacity(recordsPerBatch)
>                     vector.allocateNew(recordsPerBatch)
>                     for (i in 0 until numItemsBatch){
>                         vector.set(i, i.toDouble())
>                     }
>                     vector.valueCount = numItemsBatch
>                 }
>                 ArrowType.Int(32, true) -> {
>                     val vector = schemaRoot.getVector(it.name) as
> IntVector
>                     vector.setInitialCapacity(recordsPerBatch)
>                     vector.allocateNew(recordsPerBatch)
>                     for (i in 0 until numItemsBatch){
>                         vector.set(i, i.toInt())
>                     }
>                     vector.valueCount = numItemsBatch
>                 }
>                 ArrowType.Utf8() -> {
>                     val vec = schemaRoot.getVector(it.name) as
> VarCharVector
>                     vec.setInitialCapacity(recordsPerBatch)
>                     vec.allocateNew()
>
>                     //val encoded = DictionaryEncoder.encode(vec,
> dictionaryProvider.lookup("Tags".hashCode().toLong()))
>
>                     for (i in 0 until recordsPerBatch) {
>                         when(i % 4){
>                             0 -> { vec.set(i,
> "a".toByteArray(Charsets.UTF_8)) }
>                             1 -> { vec.set(i,
> "b".toByteArray(Charsets.UTF_8)) }
>                             2 -> { vec.set(i,
> "c".toByteArray(Charsets.UTF_8)) }
>                             3 -> { vec.set(i,
> "d".toByteArray(Charsets.UTF_8)) }
>                         }
>                     }
>                     vec.valueCount = recordsPerBatch
>                 }
>             }
>         }
>         writer.writeBatch()
>     }
>     writer.end()
>     writer.close()
>
>     // Need to close dictionary vectors
>     for (id in dictionaryProvider.dictionaryIds) {
>         dictionaryProvider.lookup(id).vector.close()
>     }
>
>     //
> *****************************************************************************************************************
>     // Read from Arrow
>                                                 *
>     //
> *****************************************************************************************************************
>
>     //Accum results
>     var accumDouble = 0.0
>     var accumInt = 0L
>
>     // Setting reading
>     val fileToRead = File(filePath)
>     val readStream = fileToRead.inputStream()
>     val reader = ArrowFileReader(readStream.channel,
> RootAllocator(Long.MAX_VALUE))
>
>     //println("Reading the arrow file : $filePath")
>     val readRoot = reader.vectorSchemaRoot
>     //val readSchema = readRoot.schema
>
>
>
>     val arrowBlocks = reader.recordBlocks
>
>     reader.recordBlocks.forEachIndexed { index, arrowBlock ->
>         reader.loadRecordBatch(arrowBlock)  // ERROR HERE
>
>         println("Reading Block[$index]: ${readRoot.rowCount} elements.")
>
>         readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
>             val minorType = fieldVector.minorType
>             when(minorType){
>                 Types.MinorType.FLOAT8 -> {
>                     val vecDouble = fieldVector as Float8Vector
>                     val cap = vecDouble.valueCapacity
>                     var address = vecDouble.dataBufferAddress
>                     for (i in 0 until vecDouble.valueCapacity){
>                         accumDouble += vecDouble.get(i)
>                     }
>                 }
>                 Types.MinorType.INT -> {
>                     val vecDouble = fieldVector as IntVector
>                     val cap = vecDouble.valueCapacity
>                     var address = vecDouble.dataBufferAddress
>                     for (i in 0 until vecDouble.valueCapacity){
>                         accumInt += vecDouble.get(i)
>                     }
>                 }
>                 Types.MinorType.VARCHAR -> {
>                     val vec = fieldVector as VarCharVector
>                     val cap = vec.valueCapacity
>                     var address = vec.dataBufferAddress
>                     for (i in 0 until vec.valueCapacity){
>                         println("Reading tag[$i]
> ${vec.get(i).toString(Charsets.UTF_8)}")
>                     }
>                 }
>             }
>         }
>     }
>
>     reader.close()
>
>     println("Double accum: $accumDouble")
>     println("Long accum: $accumInt")
>
> }
>
> /**
>  * This is a helper function to create VarCharVector for dictionary
> purposes.
>  */
> fun newVarCharVector(name: String, allocator: BufferAllocator):
> VarCharVector {
>     return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name,
> allocator, null) as VarCharVector
>

Mime
View raw message