arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: [Cython] Getting and Comparing String Scalars?
Date Thu, 15 Apr 2021 02:39:59 GMT
+1 to everything Weston said.

>From your comments about Gandiva, it sounded like you were OK with the
filter based approach but maybe you had a different idea of using Gandiva?

I believe "filter" also has optimizations if your data was already mostly
grouped by name.

I agree algorithmically, the map approach is probably optimal but as Weston
alluded to there are hidden constant overheads that might even out with
different approaches.

Also if your data is already grouped using the dataset expression might be
fairly efficient since it does row group pruning based on predicates on the
underlying parquet data.

-Micah

On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <weston.pace@gmail.com> wrote:

> Correct, the "group by" operation you're looking for doesn't quite exist
> (externally) yet (others can correct me if I'm wrong here). ARROW-3978
> <https://issues.apache.org/jira/browse/ARROW-3978> sometimes gets brought
> up in reference to this.  There are some things (e.g. C++ query execution
> engine
> <https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>)
> in the works which would provide this.  There is also an internal
> implementation (arrow::compute::internal::Grouper) that is used for
> computing partitions but I believe it was intentionally kept internal,
> others may be able to explain more the reason.
>
> Expressions are (or will soon be) built on compute so using them is unable
> to provide much benefit over what is in compute.  I want to say the best
> approach you could get in what is in compute for 4.0.0 is O(num_rows *
> num_unique_names).  To create the mask you would use the equals function.
> So the whole operation would be...
>
> 1) Use unique to get the possible string values
> 2) For each string value
>   a) Use equals to get a mask
>   b) Use filter to get a subarray
>
> So what you have may be a pretty reasonable workaround.  I'd recommend
> comparing with what you get from compute just for the sake of comparison.
>
> So there are a few minor optimizations you can make that shouldn't be too
> much harder.  You want to avoid GetScalar if you can as it will make an
> allocation / copy for every item you access.  Grab the column from the
> record batch and cast it to the appropriate typed array (this is only easy
> because it appears you have a fairly rigid schema).  This will allow you to
> access values directly without wrapping them in a scalar.  For example, in
> C++ (I'll leave the cython to you :)) it would look like...
>
>   auto arr =
> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
>   std::cout << arr->Value(0) << std::endl;
>
> For the string array I believe it is...
>
> auto str_arr =
> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
> arrow::util::string_view view = arr->GetView(0);
>
> It may take a slight bit of finesse to figure out how to get
> arrow::util::string_view to work with map but it should be doable.  There
> is also GetString which returns std::string which should only be slightly
> more expensive and GetValue which returns a uint8_t* and writes the length
> into an out parameter.
>
> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <xander@xander.ai> wrote:
>
>> Thanks, I did try a few things with pyarrow.compute. However, the
>> pyarrow.compute.filter interface indicates that it takes a boolean mask to
>> do the filtering:
>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
>>
>> But it doesn't actually help me create the mask? I'm back to iterating
>> through the rows and now I would need to create a boolean array of size
>> (num_rows) for every unique value of `name`.
>>
>> I saw in the dataset docs (
>> https://arrow.apache.org/docs/python/dataset.html) some discussion on
>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a
>> way of computing such an expression without loading the entire dataset into
>> memory with `dataset.to_table()`, which doesn't work for my dataset because
>> it's many times larger than RAM. Can an Expression be computed on a
>> RecordBatch?
>>
>> But it's also hard to foresee how applying filter for each unique value
>> of `name` will be more computationally efficient. The loop I posted above
>> is O(num_rows), whereas applying filter for each name would be O(num_rows *
>> num_unique_names). It could still be faster if my loop code is poorly
>> implemented or if filter is multi-threaded.
>>
>>
>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <emkornfield@gmail.com>
>> wrote:
>>
>>> Have you looked at the pyarrow compute functions [1][2]?
>>>
>>> Unique and filter seems like they would help.
>>>
>>> [1]
>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
>>> [2] https://arrow.apache.org/docs/cpp/compute.html#compute-function-list
>>>
>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <xander@xander.ai> wrote:
>>>
>>>> Thanks Weston,
>>>>
>>>> Performance is paramount here, I'm streaming through 7TB of data.
>>>>
>>>> I actually need to separate the data based on the value of the `name`
>>>> column. For every unique value of `name`, I need a batch of those rows. I
>>>> tried using gandiva's filter function but can't get gandiva installed on
>>>> Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on
>>>> Ubuntu?" on this mailing list).
>>>>
>>>> Aside from that, I'm not sure of a way to separate the data faster than
>>>> iterating through every row and placing the values into a map keyed on
>>>> `name`:
>>>> ```
>>>> cdef struct myUpdateStruct:
>>>>     double value
>>>>     int64_t checksum
>>>>
>>>> cdef iterate_dataset():
>>>>     cdef map[c_string, deque[myUpdateStruct]] myUpdates
>>>>     cdef shared_ptr[CRecordBatch] batch # This is populated by a
>>>> scanner of .parquet files
>>>>     cdef int64_t batch_row_index = 0
>>>>     while batch_row_index < batch.get().num_rows():
>>>>         name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>                 GetScalar(batch_row_index)).get()).value
>>>>         name = <char *>name_buffer.get().data()
>>>>         value = (<CDoubleScalar*>GetResultValue(values.get().\
>>>>                 GetScalar(batch_row_index)).get()).value
>>>>         checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>>>>                 GetScalar(batch_row_index)).get()).value
>>>>         newUpdate = myUpdateStruct(value, checksum)
>>>>         if myUpdates.count(name) <= 0:
>>>>             myUpdates[name] = deque[myUpdateStruct]()
>>>>         myUpdates[name].push_front(newUpdate)
>>>>         if myUpdates[name].size() > 1024:
>>>>             myUpdates[name].pop_back()
>>>>         batch_row_index += 1
>>>> ```
>>>> This takes 107minutes to iterate through the first 290GB of data.
>>>> Without accessing or filtering the data in any way it takes only 12min to
>>>> read all the .parquet files into RecordBatches and place them into Plasma.
>>>>
>>>>
>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <weston.pace@gmail.com>
>>>> wrote:
>>>>
>>>>> If you don't need the performance, you could stay in python (use
>>>>> to_pylist() for the array or as_py() for scalars).
>>>>>
>>>>> If you do need the performance then you're probably better served
>>>>> getting the buffers and operating on them directly.  Or, even better,
>>>>> making use of the compute kernels:
>>>>>
>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>>>> desired = pa.array(['Xander'], pa.string())
>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>>>>
>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai>
wrote:
>>>>>
>>>>>> This works for getting a c string out of the CScalar:
>>>>>> ```
>>>>>>                 name_buffer =
>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>>                         GetScalar(batch_row_index)).get()).value
>>>>>>                 name = <char *>name_buffer.get().data()
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <xander@xander.ai>
>>>>>> wrote:
>>>>>>
>>>>>>> Here is an example code snippet from a .pyx file that successfully
>>>>>>> iterates through a CRecordBatch and ensures that the timestamps
are
>>>>>>> ascending:
>>>>>>> ```
>>>>>>>             while batch_row_index < batch.get().num_rows():
>>>>>>>                 timestamp =
>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index))
>>>>>>>                 new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>>>>                 current_timestamp = timestamps[name]
>>>>>>>                 if current_timestamp > new_timestamp.value:
>>>>>>>                     abort()
>>>>>>>                 batch_row_index += 1
>>>>>>> ```
>>>>>>>
>>>>>>> However, I'm having difficulty operating on the values in a column
>>>>>>> of string type. Unlike CTimestampScalar, there is no CStringScalar.
>>>>>>> Although there is a StringScalar type in C++, it isn't defined
in the
>>>>>>> Cython interface. There is a `CStringType` and a `c_string` type.
>>>>>>> ```
>>>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>>>         name = GetResultValue(names.get().GetScalar(batch_row_index))
>>>>>>>         name_string = <CStringType*>name.get() # This is
wrong
>>>>>>>         printf("%s\n", name_string) # This prints garbage
>>>>>>>         if name_string == b"Xander": # Doesn't work
>>>>>>>             print("found it")
>>>>>>>         batch_row_index += 1
>>>>>>> ```
>>>>>>> How do I get the string value as a C type and compare it to other
>>>>>>> strings?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xander
>>>>>>>
>>>>>>
>>

Mime
View raw message