arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xander Dunn" <xan...@xander.ai>
Subject Re: [Cython] Getting and Comparing String Scalars?
Date Thu, 15 Apr 2021 18:26:00 GMT
Big thanks to both of you, this was very helpful.

I moved my loop down to pure C++ and replaced my GetScalarValue calls with array type casts
as Weston suggested. Time on the first 290GB of data went from 107min to 21min! Same functionality.
You were right, I was introducing a lot of overhead with the allocations.

As you predicted, the string_view as map keys didn't quite work out of the box. This may be
related ( https://stackoverflow.com/questions/35525777/use-of-string-view-for-map-lookup )
but I haven't been able to try it yet because of some Cython silliness. Currently I'm just
using the std::string from GetString().

My data do not start out with any grouping by `name`. It's sorted by timestamp and it's important
that I process it in order of timestamp, so the `name` column is close to uniformly distributed.
Although the grouping by `name` is not a stateful process, I am in parallel computing some
stateful functions. At some point you're right this dataset is going to become large enough
that I need to learn a distributed compute framework and break these tasks up. For now it's
convenient that the processing of stored data is the same code as the processing of real-time
production data streams.

I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on my C++ as well,
but I found the same functionality much more straight-forward to implement purely in C++.
Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports and syntax can make
it very challenging to reproduce some perfectly functioning C++ code in Cython. I'll try to
keep the Cython layer very thin and keep everything in either Python or C++.

On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield < emkornfield@gmail.com > wrote:

> 
> One more thought, at 7TB your data starts entering the realm of "big". 
> You might consider doing the grouping with a distributed compute framework
> outputting then outputting to plasma.
> 
> On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield < emkornfield@ gmail. com (
> emkornfield@gmail.com ) > wrote:
> 
> 
>> +1 to everything Weston said.
>> 
>> 
>> From your comments about Gandiva, it sounded like you were OK with the
>> filter based approach but maybe you had a different idea of using Gandiva?
>> 
>> 
>> 
>> I believe "filter" also has optimizations if your data was already mostly
>> grouped by name.
>> 
>> 
>> I agree algorithmically, the map approach is probably optimal but as
>> Weston alluded to there are hidden constant overheads that might even out
>> with different approaches.
>> 
>> 
>> Also if your data is already grouped using the dataset expression might be
>> fairly efficient since it does row group pruning based on predicates on
>> the underlying parquet data.
>> 
>> 
>> -Micah
>> 
>> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace < weston. pace@ gmail. com (
>> weston.pace@gmail.com ) > wrote:
>> 
>> 
>>> Correct, the "group by" operation you're looking for doesn't quite exist
>>> (externally) yet (others can correct me if I'm wrong here). ARROW-3978 (
>>> https://issues.apache.org/jira/browse/ARROW-3978 ) sometimes gets brought
>>> up in reference to this.  There are some things (e.g. C++ query execution
>>> engine (
>>> https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing
>>> ) ) in the works which would provide this.  There is also an internal
>>> implementation (arrow::compute::internal::Grouper) that is used for
>>> computing partitions but I believe it was intentionally kept internal,
>>> others may be able to explain more the reason.
>>> 
>>> 
>>> Expressions are (or will soon be) built on compute so using them is unable
>>> to provide much benefit over what is in compute.  I want to say the best
>>> approach you could get in what is in compute for 4.0.0 is O(num_rows *
>>> num_unique_names).  To create the mask you would use the equals function. 
>>> So the whole operation would be...
>>> 
>>> 
>>> 1) Use unique to get the possible string values
>>> 2) For each string value
>>> a) Use equals to get a mask
>>> b) Use filter to get a subarray
>>> 
>>> 
>>> 
>>> So what you have may be a pretty reasonable workaround.  I'd recommend
>>> comparing with what you get from compute just for the sake of comparison.
>>> 
>>> 
>>> So there are a few minor optimizations you can make that shouldn't be too
>>> much harder.  You want to avoid GetScalar if you can as it will make an
>>> allocation / copy for every item you access.  Grab the column from the
>>> record batch and cast it to the appropriate typed array (this is only easy
>>> because it appears you have a fairly rigid schema).  This will allow you
>>> to access values directly without wrapping them in a scalar.  For example,
>>> in C++ (I'll leave the cython to you :)) it would look like...
>>> 
>>> 
>>> auto arr =
>>> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
>>> std::cout << arr->Value(0) << std::endl;
>>> 
>>> 
>>> For the string array I believe it is...
>>> 
>>> 
>>> auto str_arr =
>>> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
>>> arrow::util::string_view view = arr->GetView(0);
>>> 
>>> 
>>> It may take a slight bit of finesse to figure out how to get
>>> arrow::util::string_view to work with map but it should be doable.  There
>>> is also GetString which returns std::string which should only be slightly
>>> more expensive and GetValue which returns a uint8_t* and writes the length
>>> into an out parameter.
>>> 
>>> 
>>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn < xander@ xander. ai (
>>> xander@xander.ai ) > wrote:
>>> 
>>> 
>>>> Thanks, I did try a few things with pyarrow.compute. However, the
>>>> pyarrow.compute.filter interface indicates that it takes a boolean mask to
>>>> do the filtering: https:/ / arrow. apache. org/ docs/ python/ generated/
pyarrow.
>>>> compute. filter. html (
>>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
>>>> )
>>>> 
>>>> 
>>>> 
>>>> But it doesn't actually help me create the mask? I'm back to iterating
>>>> through the rows and now I would need to create a boolean array of size
>>>> (num_rows) for every unique value of `name`.
>>>> 
>>>> 
>>>> 
>>>> I saw in the dataset docs ( https:/ / arrow. apache. org/ docs/ python/ dataset.
>>>> html ( https://arrow.apache.org/docs/python/dataset.html ) ) some
>>>> discussion on Expressions, such as `ds.field("name") == "Xander"`.
>>>> However, I don't see a way of computing such an expression without loading
>>>> the entire dataset into memory with `dataset.to_table()`, which doesn't
>>>> work for my dataset because it's many times larger than RAM. Can an
>>>> Expression be computed on a RecordBatch?
>>>> 
>>>> 
>>>> 
>>>> But it's also hard to foresee how applying filter for each unique value of
>>>> `name` will be more computationally efficient. The loop I posted above is
>>>> O(num_rows), whereas applying filter for each name would be O(num_rows *
>>>> num_unique_names). It could still be faster if my loop code is poorly
>>>> implemented or if filter is multi-threaded.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < emkornfield@ gmail.
com
>>>> ( emkornfield@gmail.com ) > wrote:
>>>> 
>>>>> Have you looked at the pyarrow compute functions [1][2]?
>>>>> 
>>>>> 
>>>>> Unique and filter seems like they would help.
>>>>> 
>>>>> 
>>>>> [1] https:/ / arrow. apache. org/ docs/ python/ compute. html?highlight=pyarrow%20compute
>>>>> (
>>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
>>>>> )
>>>>> [2] https:/ / arrow. apache. org/ docs/ cpp/ compute. html#compute-function-list
>>>>> ( https://arrow.apache.org/docs/cpp/compute.html#compute-function-list
)
>>>>> 
>>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn < xander@ xander. ai (
>>>>> xander@xander.ai ) > wrote:
>>>>> 
>>>>> 
>>>>>> Thanks Weston,
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Performance is paramount here, I'm streaming through 7TB of data.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> I actually need to separate the data based on the value of the `name`
>>>>>> column. For every unique value of `name`, I need a batch of those
rows. I
>>>>>> tried using gandiva's filter function but can't get gandiva installed
on
>>>>>> Ubuntu (see my earlier thread " [Python] pyarrow.gandiva unavailable
on
>>>>>> Ubuntu? " on this mailing list).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Aside from that, I'm not sure of a way to separate the data faster
than
>>>>>> iterating through every row and placing the values into a map keyed
on
>>>>>> `name`:
>>>>>> 
>>>>>> ```
>>>>>> 
>>>>>> cdef struct myUpdateStruct:
>>>>>> 
>>>>>> double value
>>>>>> 
>>>>>> int64_t checksum
>>>>>> 
>>>>>> 
>>>>>> cdef iterate_dataset():
>>>>>> cdef map[c_string, deque[myUpdateStruct]] myUpdates
>>>>>> 
>>>>>> cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner
of
>>>>>> .parquet files
>>>>>> 
>>>>>> cdef int64_t batch_row_index = 0
>>>>>> while batch_row_index < batch.get().num_rows():
>>>>>> 
>>>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>> 
>>>>>> GetScalar(batch_row_index)).get()).value
>>>>>> 
>>>>>> name = <char *>name_buffer.get().data()
>>>>>> 
>>>>>> value = (<CDoubleScalar*>GetResultValue(values.get().\
>>>>>> 
>>>>>> GetScalar(batch_row_index)).get()).value
>>>>>> 
>>>>>> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>>>>>> 
>>>>>> GetScalar(batch_row_index)).get()).value
>>>>>> 
>>>>>> newUpdate = myUpdateStruct(value, checksum)
>>>>>> 
>>>>>> if myUpdates.count(name) <= 0:
>>>>>> 
>>>>>> myUpdates[name] = deque[myUpdateStruct]()
>>>>>> 
>>>>>> myUpdates[name].push_front(newUpdate)
>>>>>> 
>>>>>> if myUpdates[name].size() > 1024:
>>>>>> 
>>>>>> myUpdates[name].pop_back()
>>>>>> 
>>>>>> batch_row_index += 1
>>>>>> ```
>>>>>> 
>>>>>> This takes 107minutes to iterate through the first 290GB of data.
Without
>>>>>> accessing or filtering the data in any way it takes only 12min to
read all
>>>>>> the .parquet files into RecordBatches and place them into Plasma.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace < weston. pace@
gmail. com (
>>>>>> weston.pace@gmail.com ) > wrote:
>>>>>> 
>>>>>>> If you don't need the performance, you could stay in python (use
>>>>>>> to_pylist() for the array or as_py() for scalars).
>>>>>>> 
>>>>>>> 
>>>>>>> If you do need the performance then you're probably better served
getting
>>>>>>> the buffers and operating on them directly.  Or, even better,
making use
>>>>>>> of the compute kernels:
>>>>>>> 
>>>>>>> 
>>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>>>>>> desired = pa.array(['Xander'], pa.string())
>>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn < xander@ xander.
ai (
>>>>>>> xander@xander.ai ) > wrote:
>>>>>>> 
>>>>>>> 
>>>>>>>> This works for getting a c string out of the CScalar:
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>>>> 
>>>>>>>> GetScalar(batch_row_index)).get()).value
>>>>>>>> 
>>>>>>>> name = <char *>name_buffer.get().data()
>>>>>>>> 
>>>>>>>> ```
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn < xander@
xander. ai (
>>>>>>>> xander@xander.ai ) > wrote:
>>>>>>>> 
>>>>>>>>> Here is an example code snippet from a .pyx file that
successfully
>>>>>>>>> iterates through a CRecordBatch and ensures that the
timestamps are
>>>>>>>>> ascending:
>>>>>>>>> 
>>>>>>>>> ```
>>>>>>>>> 
>>>>>>>>> while batch_row_index < batch.get().num_rows():
>>>>>>>>> 
>>>>>>>>> timestamp = GetResultValue(times.get().GetScalar(batch_row_index))
>>>>>>>>> 
>>>>>>>>> new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>>>>>> 
>>>>>>>>> current_timestamp = timestamps[name]
>>>>>>>>> 
>>>>>>>>> if current_timestamp > new_timestamp.value:
>>>>>>>>> 
>>>>>>>>> abort()
>>>>>>>>> 
>>>>>>>>> batch_row_index += 1
>>>>>>>>> 
>>>>>>>>> ```
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> However, I'm having difficulty operating on the values
in a column of
>>>>>>>>> string type. Unlike CTimestampScalar, there is no CStringScalar.
Although
>>>>>>>>> there is a StringScalar type in C++, it isn't defined
in the Cython
>>>>>>>>> interface. There is a `CStringType` and a `c_string`
type.
>>>>>>>>> 
>>>>>>>>> ```
>>>>>>>>> 
>>>>>>>>> while batch_row_index < batch.get().num_rows():
>>>>>>>>> 
>>>>>>>>> name = GetResultValue(names.get().GetScalar(batch_row_index))
>>>>>>>>> 
>>>>>>>>> name_string = <CStringType*>name.get() # This is
wrong
>>>>>>>>> 
>>>>>>>>> printf("%s\n", name_string) # This prints garbage
>>>>>>>>> 
>>>>>>>>> if name_string == b"Xander": # Doesn't work
>>>>>>>>> 
>>>>>>>>> print("found it")
>>>>>>>>> 
>>>>>>>>> batch_row_index += 1
>>>>>>>>> 
>>>>>>>>> ```
>>>>>>>>> 
>>>>>>>>> How do I get the string value as a C type and compare
it to other strings?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Xander
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
>
Mime
View raw message