arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: [Cython] Getting and Comparing String Scalars?
Date Fri, 16 Apr 2021 03:50:29 GMT
>
>  Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports
> and syntax can make it very challenging to reproduce some perfectly
> functioning C++ code in Cython. I'll try to keep the Cython layer very thin
> and keep everything in either Python or C++.


This sounds like a sane approach to me :)

On Thu, Apr 15, 2021 at 11:26 AM Xander Dunn <xander@xander.ai> wrote:

> Big thanks to both of you, this was very helpful.
>
> I moved my loop down to pure C++ and replaced my GetScalarValue calls with
> array type casts as Weston suggested. Time on the first 290GB of data went
> from 107min to 21min! Same functionality. You were right, I was introducing
> a lot of overhead with the allocations.
>
> As you predicted, the string_view as map keys didn't quite work out of the
> box. This may be related (
> https://stackoverflow.com/questions/35525777/use-of-string-view-for-map-lookup)
> but I haven't been able to try it yet because of some Cython silliness.
> Currently I'm just using the std::string from GetString().
>
> My data do not start out with any grouping by `name`. It's sorted by
> timestamp and it's important that I process it in order of timestamp, so
> the `name` column is close to uniformly distributed. Although the grouping
> by `name` is not a stateful process, I am in parallel computing some
> stateful functions. At some point you're right this dataset is going to
> become large enough that I need to learn a distributed compute framework
> and break these tasks up. For now it's convenient that the processing of
> stored data is the same code as the processing of real-time production data
> streams.
>
> I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on
> my C++ as well, but I found the same functionality much more
> straight-forward to implement purely in C++. Cython is 75% Python, 25%
> C/C++, and 100% neither. Differences in imports and syntax can make it very
> challenging to reproduce some perfectly functioning C++ code in Cython.
> I'll try to keep the Cython layer very thin and keep everything in either
> Python or C++.
>
>
> On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield <emkornfield@gmail.com>
> wrote:
>
>> One more thought, at 7TB your data starts entering the realm of "big".
>> You might consider doing the grouping with a distributed compute framework
>> outputting then outputting to plasma.
>>
>> On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield <emkornfield@gmail.com>
>> wrote:
>>
>>> +1 to everything Weston said.
>>>
>>> From your comments about Gandiva, it sounded like you were OK with the
>>> filter based approach but maybe you had a different idea of using Gandiva?
>>>
>>> I believe "filter" also has optimizations if your data was already
>>> mostly grouped by name.
>>>
>>> I agree algorithmically, the map approach is probably optimal but as
>>> Weston alluded to there are hidden constant overheads that might even out
>>> with different approaches.
>>>
>>> Also if your data is already grouped using the dataset expression might
>>> be fairly efficient since it does row group pruning based on predicates on
>>> the underlying parquet data.
>>>
>>> -Micah
>>>
>>> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <weston.pace@gmail.com>
>>> wrote:
>>>
>>>> Correct, the "group by" operation you're looking for doesn't quite
>>>> exist (externally) yet (others can correct me if I'm wrong here).
>>>> ARROW-3978 <https://issues.apache.org/jira/browse/ARROW-3978>
>>>> sometimes gets brought up in reference to this.  There are some things
>>>> (e.g. C++ query execution engine
>>>> <https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>)
>>>> in the works which would provide this.  There is also an internal
>>>> implementation (arrow::compute::internal::Grouper) that is used for
>>>> computing partitions but I believe it was intentionally kept internal,
>>>> others may be able to explain more the reason.
>>>>
>>>> Expressions are (or will soon be) built on compute so using them is
>>>> unable to provide much benefit over what is in compute.  I want to say the
>>>> best approach you could get in what is in compute for 4.0.0 is O(num_rows
*
>>>> num_unique_names).  To create the mask you would use the equals function.
>>>> So the whole operation would be...
>>>>
>>>> 1) Use unique to get the possible string values
>>>> 2) For each string value
>>>>   a) Use equals to get a mask
>>>>   b) Use filter to get a subarray
>>>>
>>>> So what you have may be a pretty reasonable workaround.  I'd recommend
>>>> comparing with what you get from compute just for the sake of comparison.
>>>>
>>>> So there are a few minor optimizations you can make that shouldn't be
>>>> too much harder.  You want to avoid GetScalar if you can as it will make
an
>>>> allocation / copy for every item you access.  Grab the column from the
>>>> record batch and cast it to the appropriate typed array (this is only easy
>>>> because it appears you have a fairly rigid schema).  This will allow you
to
>>>> access values directly without wrapping them in a scalar.  For example, in
>>>> C++ (I'll leave the cython to you :)) it would look like...
>>>>
>>>>   auto arr =
>>>> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
>>>>   std::cout << arr->Value(0) << std::endl;
>>>>
>>>> For the string array I believe it is...
>>>>
>>>> auto str_arr =
>>>> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
>>>> arrow::util::string_view view = arr->GetView(0);
>>>>
>>>> It may take a slight bit of finesse to figure out how to get
>>>> arrow::util::string_view to work with map but it should be doable.  There
>>>> is also GetString which returns std::string which should only be slightly
>>>> more expensive and GetValue which returns a uint8_t* and writes the length
>>>> into an out parameter.
>>>>
>>>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <xander@xander.ai> wrote:
>>>>
>>>>> Thanks, I did try a few things with pyarrow.compute. However, the
>>>>> pyarrow.compute.filter interface indicates that it takes a boolean mask
to
>>>>> do the filtering:
>>>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
>>>>>
>>>>> But it doesn't actually help me create the mask? I'm back to iterating
>>>>> through the rows and now I would need to create a boolean array of size
>>>>> (num_rows) for every unique value of `name`.
>>>>>
>>>>> I saw in the dataset docs (
>>>>> https://arrow.apache.org/docs/python/dataset.html) some discussion on
>>>>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't
see a
>>>>> way of computing such an expression without loading the entire dataset
into
>>>>> memory with `dataset.to_table()`, which doesn't work for my dataset because
>>>>> it's many times larger than RAM. Can an Expression be computed on a
>>>>> RecordBatch?
>>>>>
>>>>> But it's also hard to foresee how applying filter for each unique
>>>>> value of `name` will be more computationally efficient. The loop I posted
>>>>> above is O(num_rows), whereas applying filter for each name would be
>>>>> O(num_rows * num_unique_names). It could still be faster if my loop code
is
>>>>> poorly implemented or if filter is multi-threaded.
>>>>>
>>>>>
>>>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <
>>>>> emkornfield@gmail.com> wrote:
>>>>>
>>>>>> Have you looked at the pyarrow compute functions [1][2]?
>>>>>>
>>>>>> Unique and filter seems like they would help.
>>>>>>
>>>>>> [1]
>>>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
>>>>>> [2]
>>>>>> https://arrow.apache.org/docs/cpp/compute.html#compute-function-list
>>>>>>
>>>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <xander@xander.ai>
wrote:
>>>>>>
>>>>>>> Thanks Weston,
>>>>>>>
>>>>>>> Performance is paramount here, I'm streaming through 7TB of data.
>>>>>>>
>>>>>>> I actually need to separate the data based on the value of the
>>>>>>> `name` column. For every unique value of `name`, I need a batch
of those
>>>>>>> rows. I tried using gandiva's filter function but can't get gandiva
>>>>>>> installed on Ubuntu (see my earlier thread "[Python]
>>>>>>> pyarrow.gandiva unavailable on Ubuntu?" on this mailing list).
>>>>>>>
>>>>>>> Aside from that, I'm not sure of a way to separate the data faster
>>>>>>> than iterating through every row and placing the values into
a map keyed on
>>>>>>> `name`:
>>>>>>> ```
>>>>>>> cdef struct myUpdateStruct:
>>>>>>>     double value
>>>>>>>     int64_t checksum
>>>>>>>
>>>>>>> cdef iterate_dataset():
>>>>>>>     cdef map[c_string, deque[myUpdateStruct]] myUpdates
>>>>>>>     cdef shared_ptr[CRecordBatch] batch # This is populated by
a
>>>>>>> scanner of .parquet files
>>>>>>>     cdef int64_t batch_row_index = 0
>>>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>>>         name_buffer =
>>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>>>         name = <char *>name_buffer.get().data()
>>>>>>>         value = (<CDoubleScalar*>GetResultValue(values.get().\
>>>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>>>         checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>>>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>>>         newUpdate = myUpdateStruct(value, checksum)
>>>>>>>         if myUpdates.count(name) <= 0:
>>>>>>>             myUpdates[name] = deque[myUpdateStruct]()
>>>>>>>         myUpdates[name].push_front(newUpdate)
>>>>>>>         if myUpdates[name].size() > 1024:
>>>>>>>             myUpdates[name].pop_back()
>>>>>>>         batch_row_index += 1
>>>>>>> ```
>>>>>>> This takes 107minutes to iterate through the first 290GB of data.
>>>>>>> Without accessing or filtering the data in any way it takes only
12min to
>>>>>>> read all the .parquet files into RecordBatches and place them
into Plasma.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <weston.pace@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> If you don't need the performance, you could stay in python
(use
>>>>>>>> to_pylist() for the array or as_py() for scalars).
>>>>>>>>
>>>>>>>> If you do need the performance then you're probably better
served
>>>>>>>> getting the buffers and operating on them directly.  Or,
even better,
>>>>>>>> making use of the compute kernels:
>>>>>>>>
>>>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>>>>>>> desired = pa.array(['Xander'], pa.string())
>>>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>>>>>>>
>>>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This works for getting a c string out of the CScalar:
>>>>>>>>> ```
>>>>>>>>>                 name_buffer =
>>>>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>>>>>                         GetScalar(batch_row_index)).get()).value
>>>>>>>>>                 name = <char *>name_buffer.get().data()
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <xander@xander.ai>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Here is an example code snippet from a .pyx file
that
>>>>>>>>>> successfully iterates through a CRecordBatch and
ensures that the
>>>>>>>>>> timestamps are ascending:
>>>>>>>>>> ```
>>>>>>>>>>             while batch_row_index < batch.get().num_rows():
>>>>>>>>>>                 timestamp =
>>>>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index))
>>>>>>>>>>                 new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>>>>>>>                 current_timestamp = timestamps[name]
>>>>>>>>>>                 if current_timestamp > new_timestamp.value:
>>>>>>>>>>                     abort()
>>>>>>>>>>                 batch_row_index += 1
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> However, I'm having difficulty operating on the values
in a
>>>>>>>>>> column of string type. Unlike CTimestampScalar, there
is no CStringScalar.
>>>>>>>>>> Although there is a StringScalar type in C++, it
isn't defined in the
>>>>>>>>>> Cython interface. There is a `CStringType` and a
`c_string` type.
>>>>>>>>>> ```
>>>>>>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>>>>>>         name =
>>>>>>>>>> GetResultValue(names.get().GetScalar(batch_row_index))
>>>>>>>>>>         name_string = <CStringType*>name.get()
# This is wrong
>>>>>>>>>>         printf("%s\n", name_string) # This prints
garbage
>>>>>>>>>>         if name_string == b"Xander": # Doesn't work
>>>>>>>>>>             print("found it")
>>>>>>>>>>         batch_row_index += 1
>>>>>>>>>> ```
>>>>>>>>>> How do I get the string value as a C type and compare
it to other
>>>>>>>>>> strings?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xander
>>>>>>>>>>
>>>>>>>>>
>

Mime
View raw message