Big thanks to both of you, this was very helpful. 

I moved my loop down to pure C++ and replaced my GetScalarValue calls with array type casts as Weston suggested. Time on the first 290GB of data went from 107min to 21min! Same functionality. You were right, I was introducing a lot of overhead with the allocations.

As you predicted, the string_view as map keys didn't quite work out of the box. This may be related ( but I haven't been able to try it yet because of some Cython silliness. Currently I'm just using the std::string from GetString().

My data do not start out with any grouping by `name`. It's sorted by timestamp and it's important that I process it in order of timestamp, so the `name` column is close to uniformly distributed. Although the grouping by `name` is not a stateful process, I am in parallel computing some stateful functions. At some point you're right this dataset is going to become large enough that I need to learn a distributed compute framework and break these tasks up. For now it's convenient that the processing of stored data is the same code as the processing of real-time production data streams.

I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on my C++ as well, but I found the same functionality much more straight-forward to implement purely in C++. Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports and syntax can make it very challenging to reproduce some perfectly functioning C++ code in Cython. I'll try to keep the Cython layer very thin and keep everything in either Python or C++.

On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield <> wrote:
One more thought, at 7TB your data starts entering the realm of "big".  You might consider doing the grouping with a distributed compute framework outputting then outputting to plasma.

On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield <> wrote:
+1 to everything Weston said.

From your comments about Gandiva, it sounded like you were OK with the filter based approach but maybe you had a different idea of using Gandiva?

I believe "filter" also has optimizations if your data was already mostly grouped by name. 

I agree algorithmically, the map approach is probably optimal but as Weston alluded to there are hidden constant overheads that might even out with different approaches.

Also if your data is already grouped using the dataset expression might be fairly efficient since it does row group pruning based on predicates on the underlying parquet data.


On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <> wrote:
Correct, the "group by" operation you're looking for doesn't quite exist (externally) yet (others can correct me if I'm wrong here). ARROW-3978 sometimes gets brought up in reference to this.  There are some things (e.g. C++ query execution engine) in the works which would provide this.  There is also an internal implementation (arrow::compute::internal::Grouper) that is used for computing partitions but I believe it was intentionally kept internal, others may be able to explain more the reason.

Expressions are (or will soon be) built on compute so using them is unable to provide much benefit over what is in compute.  I want to say the best approach you could get in what is in compute for 4.0.0 is O(num_rows * num_unique_names).  To create the mask you would use the equals function.  So the whole operation would be...

1) Use unique to get the possible string values
2) For each string value
  a) Use equals to get a mask
  b) Use filter to get a subarray

So what you have may be a pretty reasonable workaround.  I'd recommend comparing with what you get from compute just for the sake of comparison.

So there are a few minor optimizations you can make that shouldn't be too much harder.  You want to avoid GetScalar if you can as it will make an allocation / copy for every item you access.  Grab the column from the record batch and cast it to the appropriate typed array (this is only easy because it appears you have a fairly rigid schema).  This will allow you to access values directly without wrapping them in a scalar.  For example, in C++ (I'll leave the cython to you :)) it would look like...

  auto arr = std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
  std::cout << arr->Value(0) << std::endl;

For the string array I believe it is...

auto str_arr = std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
arrow::util::string_view view = arr->GetView(0);

It may take a slight bit of finesse to figure out how to get arrow::util::string_view to work with map but it should be doable.  There is also GetString which returns std::string which should only be slightly more expensive and GetValue which returns a uint8_t* and writes the length into an out parameter.

On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <> wrote:
Thanks, I did try a few things with pyarrow.compute. However, the pyarrow.compute.filter interface indicates that it takes a boolean mask to do the filtering:

But it doesn't actually help me create the mask? I'm back to iterating through the rows and now I would need to create a boolean array of size (num_rows) for every unique value of `name`.

I saw in the dataset docs ( some discussion on Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a way of computing such an expression without loading the entire dataset into memory with `dataset.to_table()`, which doesn't work for my dataset because it's many times larger than RAM. Can an Expression be computed on a RecordBatch?

But it's also hard to foresee how applying filter for each unique value of `name` will be more computationally efficient. The loop I posted above is O(num_rows), whereas applying filter for each name would be O(num_rows * num_unique_names). It could still be faster if my loop code is poorly implemented or if filter is multi-threaded.

On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <> wrote:
Have you looked at the pyarrow compute functions [1][2]?  

On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <> wrote:
Thanks Weston,

Performance is paramount here, I'm streaming through 7TB of data.

I actually need to separate the data based on the value of the `name` column. For every unique value of `name`, I need a batch of those rows. I tried using gandiva's filter function but can't get gandiva installed on Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on Ubuntu?" on this mailing list). 

Aside from that, I'm not sure of a way to separate the data faster than iterating through every row and placing the values into a map keyed on `name`:
cdef struct myUpdateStruct:
    double value
    int64_t checksum

cdef iterate_dataset():
    cdef map[c_string, deque[myUpdateStruct]] myUpdates
    cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of .parquet files
    cdef int64_t batch_row_index = 0
    while batch_row_index < batch.get().num_rows():
        name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
        name = <char *>name_buffer.get().data()
        value = (<CDoubleScalar*>GetResultValue(values.get().\
        checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
        newUpdate = myUpdateStruct(value, checksum)
        if myUpdates.count(name) <= 0:
            myUpdates[name] = deque[myUpdateStruct]()
        if myUpdates[name].size() > 1024:
        batch_row_index += 1
This takes 107minutes to iterate through the first 290GB of data. Without accessing or filtering the data in any way it takes only 12min to read all the .parquet files into RecordBatches and place them into Plasma.

On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <> wrote:
If you don't need the performance, you could stay in python (use to_pylist() for the array or as_py() for scalars).

If you do need the performance then you're probably better served getting the buffers and operating on them directly.  Or, even better, making use of the compute kernels:

arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
desired = pa.array(['Xander'], pa.string())
pc.any(pc.is_in(arr, value_set=desired)).as_py() # True

On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <> wrote:
This works for getting a c string out of the CScalar:
                name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
                name = <char *>name_buffer.get().data()

On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <> wrote:
Here is an example code snippet from a .pyx file that successfully iterates through a CRecordBatch and ensures that the timestamps are ascending:
            while batch_row_index < batch.get().num_rows():
                timestamp = GetResultValue(times.get().GetScalar(batch_row_index))
                new_timestamp = <CTimestampScalar*>timestamp.get()
                current_timestamp = timestamps[name]
                if current_timestamp > new_timestamp.value:
                batch_row_index += 1

However, I'm having difficulty operating on the values in a column of string type. Unlike CTimestampScalar, there is no CStringScalar. Although there is a StringScalar type in C++, it isn't defined in the Cython interface. There is a `CStringType` and a `c_string` type.
    while batch_row_index < batch.get().num_rows():
        name = GetResultValue(names.get().GetScalar(batch_row_index))
        name_string = <CStringType*>name.get() # This is wrong
        printf("%s\n", name_string) # This prints garbage
        if name_string == b"Xander": # Doesn't work
            print("found it")
        batch_row_index += 1
How do I get the string value as a C type and compare it to other strings?