arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: [Cython] Getting and Comparing String Scalars?
Date Wed, 14 Apr 2021 23:45:50 GMT
Have you looked at the pyarrow compute functions [1][2]?

Unique and filter seems like they would help.

[1]
https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
[2] https://arrow.apache.org/docs/cpp/compute.html#compute-function-list

On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <xander@xander.ai> wrote:

> Thanks Weston,
>
> Performance is paramount here, I'm streaming through 7TB of data.
>
> I actually need to separate the data based on the value of the `name`
> column. For every unique value of `name`, I need a batch of those rows. I
> tried using gandiva's filter function but can't get gandiva installed on
> Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on
> Ubuntu?" on this mailing list).
>
> Aside from that, I'm not sure of a way to separate the data faster than
> iterating through every row and placing the values into a map keyed on
> `name`:
> ```
> cdef struct myUpdateStruct:
>     double value
>     int64_t checksum
>
> cdef iterate_dataset():
>     cdef map[c_string, deque[myUpdateStruct]] myUpdates
>     cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner
> of .parquet files
>     cdef int64_t batch_row_index = 0
>     while batch_row_index < batch.get().num_rows():
>         name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>                 GetScalar(batch_row_index)).get()).value
>         name = <char *>name_buffer.get().data()
>         value = (<CDoubleScalar*>GetResultValue(values.get().\
>                 GetScalar(batch_row_index)).get()).value
>         checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>                 GetScalar(batch_row_index)).get()).value
>         newUpdate = myUpdateStruct(value, checksum)
>         if myUpdates.count(name) <= 0:
>             myUpdates[name] = deque[myUpdateStruct]()
>         myUpdates[name].push_front(newUpdate)
>         if myUpdates[name].size() > 1024:
>             myUpdates[name].pop_back()
>         batch_row_index += 1
> ```
> This takes 107minutes to iterate through the first 290GB of data. Without
> accessing or filtering the data in any way it takes only 12min to read all
> the .parquet files into RecordBatches and place them into Plasma.
>
>
> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <weston.pace@gmail.com>
> wrote:
>
>> If you don't need the performance, you could stay in python (use
>> to_pylist() for the array or as_py() for scalars).
>>
>> If you do need the performance then you're probably better served getting
>> the buffers and operating on them directly.  Or, even better, making use of
>> the compute kernels:
>>
>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>> desired = pa.array(['Xander'], pa.string())
>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>
>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai> wrote:
>>
>>> This works for getting a c string out of the CScalar:
>>> ```
>>>                 name_buffer =
>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>                         GetScalar(batch_row_index)).get()).value
>>>                 name = <char *>name_buffer.get().data()
>>> ```
>>>
>>>
>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <xander@xander.ai> wrote:
>>>
>>>> Here is an example code snippet from a .pyx file that successfully
>>>> iterates through a CRecordBatch and ensures that the timestamps are
>>>> ascending:
>>>> ```
>>>>             while batch_row_index < batch.get().num_rows():
>>>>                 timestamp =
>>>> GetResultValue(times.get().GetScalar(batch_row_index))
>>>>                 new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>                 current_timestamp = timestamps[name]
>>>>                 if current_timestamp > new_timestamp.value:
>>>>                     abort()
>>>>                 batch_row_index += 1
>>>> ```
>>>>
>>>> However, I'm having difficulty operating on the values in a column of
>>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although
>>>> there is a StringScalar type in C++, it isn't defined in the Cython
>>>> interface. There is a `CStringType` and a `c_string` type.
>>>> ```
>>>>     while batch_row_index < batch.get().num_rows():
>>>>         name = GetResultValue(names.get().GetScalar(batch_row_index))
>>>>         name_string = <CStringType*>name.get() # This is wrong
>>>>         printf("%s\n", name_string) # This prints garbage
>>>>         if name_string == b"Xander": # Doesn't work
>>>>             print("found it")
>>>>         batch_row_index += 1
>>>> ```
>>>> How do I get the string value as a C type and compare it to other
>>>> strings?
>>>>
>>>> Thanks,
>>>> Xander
>>>>
>>>
>

Mime
View raw message