arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: [Cython] Getting and Comparing String Scalars?
Date Thu, 15 Apr 2021 03:05:44 GMT
One more thought, at 7TB your data starts entering the realm of "big".  You
might consider doing the grouping with a distributed compute framework
outputting then outputting to plasma.

On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield <emkornfield@gmail.com>
wrote:

> +1 to everything Weston said.
>
> From your comments about Gandiva, it sounded like you were OK with the
> filter based approach but maybe you had a different idea of using Gandiva?
>
> I believe "filter" also has optimizations if your data was already mostly
> grouped by name.
>
> I agree algorithmically, the map approach is probably optimal but as
> Weston alluded to there are hidden constant overheads that might even out
> with different approaches.
>
> Also if your data is already grouped using the dataset expression might be
> fairly efficient since it does row group pruning based on predicates on the
> underlying parquet data.
>
> -Micah
>
> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <weston.pace@gmail.com> wrote:
>
>> Correct, the "group by" operation you're looking for doesn't quite exist
>> (externally) yet (others can correct me if I'm wrong here). ARROW-3978
>> <https://issues.apache.org/jira/browse/ARROW-3978> sometimes gets
>> brought up in reference to this.  There are some things (e.g. C++ query
>> execution engine
>> <https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>)
>> in the works which would provide this.  There is also an internal
>> implementation (arrow::compute::internal::Grouper) that is used for
>> computing partitions but I believe it was intentionally kept internal,
>> others may be able to explain more the reason.
>>
>> Expressions are (or will soon be) built on compute so using them is
>> unable to provide much benefit over what is in compute.  I want to say the
>> best approach you could get in what is in compute for 4.0.0 is O(num_rows *
>> num_unique_names).  To create the mask you would use the equals function.
>> So the whole operation would be...
>>
>> 1) Use unique to get the possible string values
>> 2) For each string value
>>   a) Use equals to get a mask
>>   b) Use filter to get a subarray
>>
>> So what you have may be a pretty reasonable workaround.  I'd recommend
>> comparing with what you get from compute just for the sake of comparison.
>>
>> So there are a few minor optimizations you can make that shouldn't be too
>> much harder.  You want to avoid GetScalar if you can as it will make an
>> allocation / copy for every item you access.  Grab the column from the
>> record batch and cast it to the appropriate typed array (this is only easy
>> because it appears you have a fairly rigid schema).  This will allow you to
>> access values directly without wrapping them in a scalar.  For example, in
>> C++ (I'll leave the cython to you :)) it would look like...
>>
>>   auto arr =
>> std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
>>   std::cout << arr->Value(0) << std::endl;
>>
>> For the string array I believe it is...
>>
>> auto str_arr =
>> std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
>> arrow::util::string_view view = arr->GetView(0);
>>
>> It may take a slight bit of finesse to figure out how to get
>> arrow::util::string_view to work with map but it should be doable.  There
>> is also GetString which returns std::string which should only be slightly
>> more expensive and GetValue which returns a uint8_t* and writes the length
>> into an out parameter.
>>
>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <xander@xander.ai> wrote:
>>
>>> Thanks, I did try a few things with pyarrow.compute. However, the
>>> pyarrow.compute.filter interface indicates that it takes a boolean mask to
>>> do the filtering:
>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
>>>
>>> But it doesn't actually help me create the mask? I'm back to iterating
>>> through the rows and now I would need to create a boolean array of size
>>> (num_rows) for every unique value of `name`.
>>>
>>> I saw in the dataset docs (
>>> https://arrow.apache.org/docs/python/dataset.html) some discussion on
>>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a
>>> way of computing such an expression without loading the entire dataset into
>>> memory with `dataset.to_table()`, which doesn't work for my dataset because
>>> it's many times larger than RAM. Can an Expression be computed on a
>>> RecordBatch?
>>>
>>> But it's also hard to foresee how applying filter for each unique value
>>> of `name` will be more computationally efficient. The loop I posted above
>>> is O(num_rows), whereas applying filter for each name would be O(num_rows *
>>> num_unique_names). It could still be faster if my loop code is poorly
>>> implemented or if filter is multi-threaded.
>>>
>>>
>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <emkornfield@gmail.com>
>>> wrote:
>>>
>>>> Have you looked at the pyarrow compute functions [1][2]?
>>>>
>>>> Unique and filter seems like they would help.
>>>>
>>>> [1]
>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
>>>> [2]
>>>> https://arrow.apache.org/docs/cpp/compute.html#compute-function-list
>>>>
>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <xander@xander.ai> wrote:
>>>>
>>>>> Thanks Weston,
>>>>>
>>>>> Performance is paramount here, I'm streaming through 7TB of data.
>>>>>
>>>>> I actually need to separate the data based on the value of the `name`
>>>>> column. For every unique value of `name`, I need a batch of those rows.
I
>>>>> tried using gandiva's filter function but can't get gandiva installed
on
>>>>> Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable
>>>>> on Ubuntu?" on this mailing list).
>>>>>
>>>>> Aside from that, I'm not sure of a way to separate the data faster
>>>>> than iterating through every row and placing the values into a map keyed
on
>>>>> `name`:
>>>>> ```
>>>>> cdef struct myUpdateStruct:
>>>>>     double value
>>>>>     int64_t checksum
>>>>>
>>>>> cdef iterate_dataset():
>>>>>     cdef map[c_string, deque[myUpdateStruct]] myUpdates
>>>>>     cdef shared_ptr[CRecordBatch] batch # This is populated by a
>>>>> scanner of .parquet files
>>>>>     cdef int64_t batch_row_index = 0
>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>         name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>         name = <char *>name_buffer.get().data()
>>>>>         value = (<CDoubleScalar*>GetResultValue(values.get().\
>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>         checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>>>>>                 GetScalar(batch_row_index)).get()).value
>>>>>         newUpdate = myUpdateStruct(value, checksum)
>>>>>         if myUpdates.count(name) <= 0:
>>>>>             myUpdates[name] = deque[myUpdateStruct]()
>>>>>         myUpdates[name].push_front(newUpdate)
>>>>>         if myUpdates[name].size() > 1024:
>>>>>             myUpdates[name].pop_back()
>>>>>         batch_row_index += 1
>>>>> ```
>>>>> This takes 107minutes to iterate through the first 290GB of data.
>>>>> Without accessing or filtering the data in any way it takes only 12min
to
>>>>> read all the .parquet files into RecordBatches and place them into Plasma.
>>>>>
>>>>>
>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <weston.pace@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If you don't need the performance, you could stay in python (use
>>>>>> to_pylist() for the array or as_py() for scalars).
>>>>>>
>>>>>> If you do need the performance then you're probably better served
>>>>>> getting the buffers and operating on them directly.  Or, even better,
>>>>>> making use of the compute kernels:
>>>>>>
>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>>>>> desired = pa.array(['Xander'], pa.string())
>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>>>>>
>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai>
wrote:
>>>>>>
>>>>>>> This works for getting a c string out of the CScalar:
>>>>>>> ```
>>>>>>>                 name_buffer =
>>>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>>>                         GetScalar(batch_row_index)).get()).value
>>>>>>>                 name = <char *>name_buffer.get().data()
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <xander@xander.ai>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Here is an example code snippet from a .pyx file that successfully
>>>>>>>> iterates through a CRecordBatch and ensures that the timestamps
are
>>>>>>>> ascending:
>>>>>>>> ```
>>>>>>>>             while batch_row_index < batch.get().num_rows():
>>>>>>>>                 timestamp =
>>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index))
>>>>>>>>                 new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>>>>>                 current_timestamp = timestamps[name]
>>>>>>>>                 if current_timestamp > new_timestamp.value:
>>>>>>>>                     abort()
>>>>>>>>                 batch_row_index += 1
>>>>>>>> ```
>>>>>>>>
>>>>>>>> However, I'm having difficulty operating on the values in
a column
>>>>>>>> of string type. Unlike CTimestampScalar, there is no CStringScalar.
>>>>>>>> Although there is a StringScalar type in C++, it isn't defined
in the
>>>>>>>> Cython interface. There is a `CStringType` and a `c_string`
type.
>>>>>>>> ```
>>>>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>>>>         name =
>>>>>>>> GetResultValue(names.get().GetScalar(batch_row_index))
>>>>>>>>         name_string = <CStringType*>name.get() # This
is wrong
>>>>>>>>         printf("%s\n", name_string) # This prints garbage
>>>>>>>>         if name_string == b"Xander": # Doesn't work
>>>>>>>>             print("found it")
>>>>>>>>         batch_row_index += 1
>>>>>>>> ```
>>>>>>>> How do I get the string value as a C type and compare it
to other
>>>>>>>> strings?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xander
>>>>>>>>
>>>>>>>
>>>

Mime
View raw message