From user-return-1182-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Fri Apr 16 03:50:47 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 6F33218063F for ; Fri, 16 Apr 2021 05:50:47 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 9D94B4495A for ; Fri, 16 Apr 2021 03:50:46 +0000 (UTC) Received: (qmail 82707 invoked by uid 500); 16 Apr 2021 03:50:45 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 82693 invoked by uid 99); 16 Apr 2021 03:50:45 -0000 Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org) (95.217.134.168) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Apr 2021 03:50:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-fi.apache.org (ASF Mail Server at spamproc1-he-fi.apache.org) with ESMTP id 973AFC0480 for ; Fri, 16 Apr 2021 03:50:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org X-Spam-Flag: NO X-Spam-Score: 3.011 X-Spam-Level: *** X-Spam-Status: No, score=3.011 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, GOOGLE_DOC_SUSP=2.999, HTML_MESSAGE=0.2, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_PASS=-0.001, T_FILL_THIS_FORM_SHORT=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-fi.apache.org [95.217.134.168]) (amavisd-new, port 10024) with ESMTP id hy2bWWhmQm-J for ; Fri, 16 Apr 2021 03:50:43 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.218.41; helo=mail-ej1-f41.google.com; envelope-from=emkornfield@gmail.com; receiver= Received: from mail-ej1-f41.google.com (mail-ej1-f41.google.com [209.85.218.41]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 5DEDCBD8F9 for ; Fri, 16 Apr 2021 03:50:42 +0000 (UTC) Received: by mail-ej1-f41.google.com with SMTP id sd23so31398663ejb.12 for ; Thu, 15 Apr 2021 20:50:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:reply-to:from:date:message-id :subject:to:cc; bh=uAhf9js72wKxRpI1wkQkEg6RE9dKmHwrpyO3Xz2OO+s=; b=ZklYLDKo+/uNZR13LlQPGtFGXAikyt8PyAsqj7LDnw0LrWgJbc8EoobT5b4GnEqKOu 801W9EKB0YmJTjJo1rafwdBeMG9UxYqewFkeskfh5V+yTIX47cRwdmV22KfwW0GesP13 XVCFkPHRBVrd5yjkwPkxLNPPRceolAXpfS8f182w65Gy/6UI1CGDKQbUhWCn95hYpooT excZfJSMqjuL73bREoGpWWWdSiRbUSbCJTxh/4DMhCPuTUizKjX+5ftrNiKr6G2Ordtd t/7PXk4bIlO45nGbs6pnLlhBA0sFbxJPlf8S4nHdqobWR2LDS85oqPXCa2fXHlPyJijx R8pQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:reply-to :from:date:message-id:subject:to:cc; bh=uAhf9js72wKxRpI1wkQkEg6RE9dKmHwrpyO3Xz2OO+s=; b=JEWs3bcnViPIvy1y2ahVGBzeiaLAplVM3DlZyFKgca/dvFIUWFq53x2oUZigH9tuAK Sqxo6PJbpph7KvnzBZIBOJdr26pfmG/bk441vI0n6qnGPMDKN2wM52jkD7HfZmJVqPBa 9ftvE938KUeW77qm+pQyytTr2fqYA1vqn7yuvcVcSvLynS24w1hzpWTB7YVZtrIebWAf ROWRGTXde+AumyyMw4Si27+G50vqllIfW+HfBl22PT/LyJrAqSqpeKPKwZH0zPxUo5JD xiDjvUrqf7Qnj0QOXHEeluiOaJ3EhiXUKYzN8PTfyS4Dp1ViebA9cOE+NSfzhTsTsXYI 6omA== X-Gm-Message-State: AOAM533PQy0hxHxdr2jmwB2uDQjLud6zth+l8Y/lIQ7u1Hplxi2QeInP nnfqcs12vv8cnJWCkUwTxYHYCG3Hv61CxpMjH6o= X-Google-Smtp-Source: ABdhPJyEfpcwWt6l0aCALh+eLfl8EGkA++jmkzFaglEFcx+gh2dIrc50DkZRTjEuaYgJtJswHNZmQd4zLudl83vxbSQ= X-Received: by 2002:a17:906:a103:: with SMTP id t3mr6567199ejy.334.1618545041257; Thu, 15 Apr 2021 20:50:41 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: Reply-To: emkornfield@gmail.com From: Micah Kornfield Date: Thu, 15 Apr 2021 20:50:29 -0700 Message-ID: Subject: Re: [Cython] Getting and Comparing String Scalars? To: Xander Dunn Cc: user@arrow.apache.org, Weston Pace Content-Type: multipart/alternative; boundary="0000000000003cb76f05c00ee2bc" --0000000000003cb76f05c00ee2bc Content-Type: text/plain; charset="UTF-8" > > Cython is 75% Python, 25% C/C++, and 100% neither. Differences in imports > and syntax can make it very challenging to reproduce some perfectly > functioning C++ code in Cython. I'll try to keep the Cython layer very thin > and keep everything in either Python or C++. This sounds like a sane approach to me :) On Thu, Apr 15, 2021 at 11:26 AM Xander Dunn wrote: > Big thanks to both of you, this was very helpful. > > I moved my loop down to pure C++ and replaced my GetScalarValue calls with > array type casts as Weston suggested. Time on the first 290GB of data went > from 107min to 21min! Same functionality. You were right, I was introducing > a lot of overhead with the allocations. > > As you predicted, the string_view as map keys didn't quite work out of the > box. This may be related ( > https://stackoverflow.com/questions/35525777/use-of-string-view-for-map-lookup) > but I haven't been able to try it yet because of some Cython silliness. > Currently I'm just using the std::string from GetString(). > > My data do not start out with any grouping by `name`. It's sorted by > timestamp and it's important that I process it in order of timestamp, so > the `name` column is close to uniformly distributed. Although the grouping > by `name` is not a stateful process, I am in parallel computing some > stateful functions. At some point you're right this dataset is going to > become large enough that I need to learn a distributed compute framework > and break these tasks up. For now it's convenient that the processing of > stored data is the same code as the processing of real-time production data > streams. > > I haven't used Cython in ~6 years and it's been quite a pain. I'm rusty on > my C++ as well, but I found the same functionality much more > straight-forward to implement purely in C++. Cython is 75% Python, 25% > C/C++, and 100% neither. Differences in imports and syntax can make it very > challenging to reproduce some perfectly functioning C++ code in Cython. > I'll try to keep the Cython layer very thin and keep everything in either > Python or C++. > > > On Wed, Apr 14, 2021 at 8:05 PM, Micah Kornfield > wrote: > >> One more thought, at 7TB your data starts entering the realm of "big". >> You might consider doing the grouping with a distributed compute framework >> outputting then outputting to plasma. >> >> On Wed, Apr 14, 2021 at 7:39 PM Micah Kornfield >> wrote: >> >>> +1 to everything Weston said. >>> >>> From your comments about Gandiva, it sounded like you were OK with the >>> filter based approach but maybe you had a different idea of using Gandiva? >>> >>> I believe "filter" also has optimizations if your data was already >>> mostly grouped by name. >>> >>> I agree algorithmically, the map approach is probably optimal but as >>> Weston alluded to there are hidden constant overheads that might even out >>> with different approaches. >>> >>> Also if your data is already grouped using the dataset expression might >>> be fairly efficient since it does row group pruning based on predicates on >>> the underlying parquet data. >>> >>> -Micah >>> >>> On Wed, Apr 14, 2021 at 7:13 PM Weston Pace >>> wrote: >>> >>>> Correct, the "group by" operation you're looking for doesn't quite >>>> exist (externally) yet (others can correct me if I'm wrong here). >>>> ARROW-3978 >>>> sometimes gets brought up in reference to this. There are some things >>>> (e.g. C++ query execution engine >>>> ) >>>> in the works which would provide this. There is also an internal >>>> implementation (arrow::compute::internal::Grouper) that is used for >>>> computing partitions but I believe it was intentionally kept internal, >>>> others may be able to explain more the reason. >>>> >>>> Expressions are (or will soon be) built on compute so using them is >>>> unable to provide much benefit over what is in compute. I want to say the >>>> best approach you could get in what is in compute for 4.0.0 is O(num_rows * >>>> num_unique_names). To create the mask you would use the equals function. >>>> So the whole operation would be... >>>> >>>> 1) Use unique to get the possible string values >>>> 2) For each string value >>>> a) Use equals to get a mask >>>> b) Use filter to get a subarray >>>> >>>> So what you have may be a pretty reasonable workaround. I'd recommend >>>> comparing with what you get from compute just for the sake of comparison. >>>> >>>> So there are a few minor optimizations you can make that shouldn't be >>>> too much harder. You want to avoid GetScalar if you can as it will make an >>>> allocation / copy for every item you access. Grab the column from the >>>> record batch and cast it to the appropriate typed array (this is only easy >>>> because it appears you have a fairly rigid schema). This will allow you to >>>> access values directly without wrapping them in a scalar. For example, in >>>> C++ (I'll leave the cython to you :)) it would look like... >>>> >>>> auto arr = >>>> std::dynamic_pointer_cast(record_batch->column(0)); >>>> std::cout << arr->Value(0) << std::endl; >>>> >>>> For the string array I believe it is... >>>> >>>> auto str_arr = >>>> std::dynamic_pointer_cast(record_batch->column(0)); >>>> arrow::util::string_view view = arr->GetView(0); >>>> >>>> It may take a slight bit of finesse to figure out how to get >>>> arrow::util::string_view to work with map but it should be doable. There >>>> is also GetString which returns std::string which should only be slightly >>>> more expensive and GetValue which returns a uint8_t* and writes the length >>>> into an out parameter. >>>> >>>> On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn wrote: >>>> >>>>> Thanks, I did try a few things with pyarrow.compute. However, the >>>>> pyarrow.compute.filter interface indicates that it takes a boolean mask to >>>>> do the filtering: >>>>> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html >>>>> >>>>> But it doesn't actually help me create the mask? I'm back to iterating >>>>> through the rows and now I would need to create a boolean array of size >>>>> (num_rows) for every unique value of `name`. >>>>> >>>>> I saw in the dataset docs ( >>>>> https://arrow.apache.org/docs/python/dataset.html) some discussion on >>>>> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a >>>>> way of computing such an expression without loading the entire dataset into >>>>> memory with `dataset.to_table()`, which doesn't work for my dataset because >>>>> it's many times larger than RAM. Can an Expression be computed on a >>>>> RecordBatch? >>>>> >>>>> But it's also hard to foresee how applying filter for each unique >>>>> value of `name` will be more computationally efficient. The loop I posted >>>>> above is O(num_rows), whereas applying filter for each name would be >>>>> O(num_rows * num_unique_names). It could still be faster if my loop code is >>>>> poorly implemented or if filter is multi-threaded. >>>>> >>>>> >>>>> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < >>>>> emkornfield@gmail.com> wrote: >>>>> >>>>>> Have you looked at the pyarrow compute functions [1][2]? >>>>>> >>>>>> Unique and filter seems like they would help. >>>>>> >>>>>> [1] >>>>>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute >>>>>> [2] >>>>>> https://arrow.apache.org/docs/cpp/compute.html#compute-function-list >>>>>> >>>>>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn wrote: >>>>>> >>>>>>> Thanks Weston, >>>>>>> >>>>>>> Performance is paramount here, I'm streaming through 7TB of data. >>>>>>> >>>>>>> I actually need to separate the data based on the value of the >>>>>>> `name` column. For every unique value of `name`, I need a batch of those >>>>>>> rows. I tried using gandiva's filter function but can't get gandiva >>>>>>> installed on Ubuntu (see my earlier thread "[Python] >>>>>>> pyarrow.gandiva unavailable on Ubuntu?" on this mailing list). >>>>>>> >>>>>>> Aside from that, I'm not sure of a way to separate the data faster >>>>>>> than iterating through every row and placing the values into a map keyed on >>>>>>> `name`: >>>>>>> ``` >>>>>>> cdef struct myUpdateStruct: >>>>>>> double value >>>>>>> int64_t checksum >>>>>>> >>>>>>> cdef iterate_dataset(): >>>>>>> cdef map[c_string, deque[myUpdateStruct]] myUpdates >>>>>>> cdef shared_ptr[CRecordBatch] batch # This is populated by a >>>>>>> scanner of .parquet files >>>>>>> cdef int64_t batch_row_index = 0 >>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>> name_buffer = >>>>>>> (GetResultValue(names.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> name = name_buffer.get().data() >>>>>>> value = (GetResultValue(values.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> checksum = (GetResultValue(checksums.get().\ >>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>> newUpdate = myUpdateStruct(value, checksum) >>>>>>> if myUpdates.count(name) <= 0: >>>>>>> myUpdates[name] = deque[myUpdateStruct]() >>>>>>> myUpdates[name].push_front(newUpdate) >>>>>>> if myUpdates[name].size() > 1024: >>>>>>> myUpdates[name].pop_back() >>>>>>> batch_row_index += 1 >>>>>>> ``` >>>>>>> This takes 107minutes to iterate through the first 290GB of data. >>>>>>> Without accessing or filtering the data in any way it takes only 12min to >>>>>>> read all the .parquet files into RecordBatches and place them into Plasma. >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace >>>>>> > wrote: >>>>>>> >>>>>>>> If you don't need the performance, you could stay in python (use >>>>>>>> to_pylist() for the array or as_py() for scalars). >>>>>>>> >>>>>>>> If you do need the performance then you're probably better served >>>>>>>> getting the buffers and operating on them directly. Or, even better, >>>>>>>> making use of the compute kernels: >>>>>>>> >>>>>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >>>>>>>> desired = pa.array(['Xander'], pa.string()) >>>>>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >>>>>>>> >>>>>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This works for getting a c string out of the CScalar: >>>>>>>>> ``` >>>>>>>>> name_buffer = >>>>>>>>> (GetResultValue(names.get().\ >>>>>>>>> GetScalar(batch_row_index)).get()).value >>>>>>>>> name = name_buffer.get().data() >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Here is an example code snippet from a .pyx file that >>>>>>>>>> successfully iterates through a CRecordBatch and ensures that the >>>>>>>>>> timestamps are ascending: >>>>>>>>>> ``` >>>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>>> timestamp = >>>>>>>>>> GetResultValue(times.get().GetScalar(batch_row_index)) >>>>>>>>>> new_timestamp = timestamp.get() >>>>>>>>>> current_timestamp = timestamps[name] >>>>>>>>>> if current_timestamp > new_timestamp.value: >>>>>>>>>> abort() >>>>>>>>>> batch_row_index += 1 >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> However, I'm having difficulty operating on the values in a >>>>>>>>>> column of string type. Unlike CTimestampScalar, there is no CStringScalar. >>>>>>>>>> Although there is a StringScalar type in C++, it isn't defined in the >>>>>>>>>> Cython interface. There is a `CStringType` and a `c_string` type. >>>>>>>>>> ``` >>>>>>>>>> while batch_row_index < batch.get().num_rows(): >>>>>>>>>> name = >>>>>>>>>> GetResultValue(names.get().GetScalar(batch_row_index)) >>>>>>>>>> name_string = name.get() # This is wrong >>>>>>>>>> printf("%s\n", name_string) # This prints garbage >>>>>>>>>> if name_string == b"Xander": # Doesn't work >>>>>>>>>> print("found it") >>>>>>>>>> batch_row_index += 1 >>>>>>>>>> ``` >>>>>>>>>> How do I get the string value as a C type and compare it to other >>>>>>>>>> strings? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Xander >>>>>>>>>> >>>>>>>>> > --0000000000003cb76f05c00ee2bc Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
=C2=A0Cy= thon is 75% Python, 25% C/C++, and 100% neither. Differences in imports and= syntax can make it very challenging to reproduce some perfectly functionin= g C++ code in Cython. I'll try to keep the Cython layer very thin and k= eep everything in either Python or C++.

Thi= s sounds like a sane approach to me :)=C2=A0

On Thu, Apr 15, 2021 at 1= 1:26 AM Xander Dunn <xander@xander.a= i> wrote:
Big thanks to both of you, this was very helpful.= =C2=A0

I moved my loop down to pure C++ and re= placed my GetScalarValue calls with array type casts as Weston suggested. T= ime on the first 290GB of data went from 107min to 21min! Same functionalit= y. You were right, I was introducing a lot of overhead with the allocations= .

As you predicted, the string_view as map ke= ys didn't quite work out of the box. This may be related (https://stackoverflow.com/= questions/35525777/use-of-string-view-for-map-lookup) but I haven't= been able to try it yet because of some Cython silliness. Currently I'= m just using the std::string from GetString().

My data do not start out with any=C2=A0grouping by `name`. It&#= 39;s sorted by timestamp and it's important that I process it in order = of timestamp, so the `name` column is close to uniformly distributed. Altho= ugh the grouping by `name` is not a stateful process, I am in parallel comp= uting some stateful functions. At some point you're right this dataset = is going to become large enough that I need to learn a distributed compute = framework and break these tasks up. For now it's convenient that the pr= ocessing of stored data is the same code as the processing of real-time pro= duction data streams.

I haven't used= Cython in ~6 years and it's been quite a pain. I'm rusty on my C++= as well, but I found the same functionality much more straight-forward to = implement purely in C++. Cython is 75% Python, 25% C/C++, and 100% neither.= Differences in imports and syntax can make it very challenging to reproduc= e some perfectly functioning C++ code in Cython. I'll try to keep the C= ython layer very thin and keep everything in either Python or C++.
3D""

=

On Wed, Apr 14, 2021 at 8:05 PM, = Micah Kornfield <emkornfield@gmail.com> wrote:
One more thought, at 7TB your data starts entering the realm of "b= ig".=C2=A0 You might consider doing the grouping with a distributed co= mpute framework outputting then outputting=C2=A0to plasma.

On Wed, Apr 14, 2= 021 at 7:39 PM Micah Kornfield <emkornfield@gmail.com>= ; wrote:
+1 to everything Weston said.

From your co= mments about Gandiva, it sounded like you were OK with the filter based app= roach but maybe you had a different idea of using Gandiva?

I believe "filter" also has optimizations if your data w= as already mostly grouped by name.=C2=A0

I agree a= lgorithmically, the map approach is probably optimal but as Weston alluded = to there are hidden constant overheads that might even out with different a= pproaches.

Also if your data is already grouped us= ing the dataset expression might be fairly efficient since it does row grou= p pruning based on predicates on the underlying parquet data.
-Micah

On Wed, Apr 14, 2021 at 7:13 PM Weston Pace <weston.pace@gmail.com> wrote:
Correct, the "grou= p by" operation you're looking for doesn't quite exist (extern= ally) yet (others can correct me if I'm wrong here). ARROW-3978 sometimes gets brought up in reference to this.=C2=A0 There are some thin= gs (e.g. C++ query execution engine) in the works which would pr= ovide this.=C2=A0 There is also an internal implementation (arrow::compute:= :internal::Grouper) that is used for computing partitions but I believe it = was intentionally kept internal, others may be able to explain more the rea= son.

Expressions are (or will soon be) built on co= mpute so using them is unable to provide much benefit over what is in compu= te.=C2=A0 I want to say the best approach you could get in what is in compu= te for 4.0.0 is O(num_rows * num_unique_names).=C2=A0 To create the mask yo= u would use the equals function.=C2=A0 So the whole operation would be...

1) Use unique to get the possible string values
2) For each string value
=C2=A0 a) Use equals to get a mas= k
=C2=A0 b) Use filter to get a subarray

=
So what you have may be a pretty reasonable workaround.=C2=A0 I'd = recommend comparing with what you get from compute just for the sake of com= parison.

So there are a few minor optimizations yo= u can make that shouldn't be too much harder.=C2=A0 You want to avoid G= etScalar if you can as it will make an allocation / copy for every item you= access.=C2=A0 Grab the column from the record batch and cast it to the app= ropriate typed array (this is only easy because it appears you have a fairl= y rigid schema).=C2=A0 This will allow you to access values directly withou= t wrapping them in a scalar.=C2=A0 For example, in C++ (I'll leave the = cython to you :)) it would look like...

=C2=A0 aut= o arr =3D std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch-= >column(0));
=C2=A0 std::cout << arr->Value(0) << std:= :endl;

For the string array I believe it is...

auto str_arr =3D std::dynamic_pointer_cast<arrow::= StringArray>(record_batch->column(0));
arrow::util::string_view vi= ew =3D arr->GetView(0);

It may take a slight bi= t of finesse to figure out how to get arrow::util::string_view to work with= map but it should be doable.=C2=A0 There is also GetString which returns s= td::string which should only be slightly more expensive and GetValue which = returns a uint8_t* and writes the length into an out parameter.

On= Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <xander@xander.ai&g= t; wrote:
<= div>
Thanks, I did try a few things with pyarrow.compute. How= ever, the pyarrow.compute.filter interface indicates that it takes a boolea= n mask to do the filtering: https://arrow.apache.org/docs/python/generated/pyarrow.compute.= filter.html

But it doesn't actually he= lp me create the mask? I'm back to iterating through the rows and now I= would need to create a boolean array of size (num_rows) for every unique v= alue of `name`.

I saw in the dataset docs (https://arrow.apache.org/docs/python/dataset.= html) some discussion on Expressions, such as `ds.field("name"= ;) =3D=3D "Xander"`. However, I don't see a way of computing = such an expression without loading the entire dataset into memory with `dat= aset.to_table()`, which doesn't work for my dataset because it's ma= ny times larger than RAM. Can an Expression be computed on a RecordBatch?

But it's also hard to foresee how applying = filter for each unique value of `name` will be more computationally efficie= nt. The loop I posted above is O(num_rows), whereas applying filter for eac= h name would be O(num_rows * num_unique_names). It could still be faster if= my loop code is poorly implemented or if filter is multi-threaded.


On We= d, Apr 14, 2021 at 4:45 PM, Micah Kornfield <emkornfield@gmail.com> wrote:
Have you looked at the pyarrow compute functions [1][2]= ?=C2=A0=C2=A0


=
On Wed, Apr 14, 2021 at 2:02 PM Xande= r Dunn <xander@xander.ai> wrote:
Thanks Weston,

Performance is paramount here, I'm streamin= g through 7TB of data.

I actually need to sepa= rate the data based on the value of the `name` column. For every unique val= ue of `name`, I need a batch of those rows. I tried using gandiva's fil= ter function but can't get gandiva installed on Ubuntu (see my earlier = thread "[Python] pyarrow.gandiva unavailable on Ubuntu?" = on this mailing list).=C2=A0

Aside from that, = I'm not sure of a way to separate the data faster than iterating throug= h every row and placing the values into a map keyed on `name`:
```
cdef struct myUpdateStruct:
=C2=A0=C2=A0= =C2=A0 double value
=C2=A0 =C2=A0 int64_t checksum
=

cdef iterate_dataset():
=C2=A0 =C2=A0 c= def map[c_string, deque[myUpdateStruct]] myUpdates
=C2=A0 =C2= =A0 cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of= .parquet files
=C2=A0 =C2=A0 cdef int64_t batch_row_index = =3D 0
=C2=A0=C2=A0=C2=A0 while batch_row_index < batch.get().n= um_rows():
=C2=A0 =C2=A0 =C2=A0 =C2=A0 name_buffer =3D (<C= BaseBinaryScalar*>GetResultValue(names.get().\
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 GetScalar(batch_row_index)).get()).value
=C2=A0 =C2=A0= =C2=A0 =C2=A0 name =3D <char *>name_buffer.get().data()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 value =3D (<CDoubleScalar*>GetResultVal= ue(values.get().\
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 GetScalar(batch_row_index)= ).get()).value
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 che= cksum =3D (<CInt64Scalar*>GetResultValue(checksums.get().\
<= div>=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 GetScalar(batch_row_index)).get()).value
=C2=A0 =C2=A0 =C2=A0 =C2=A0 newUpdate =3D myUpdateStruct(value, checksum)=
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if myUpdates.coun= t(name) <=3D 0:
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = myUpdates[name] =3D deque[myUpdateStruct]()
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0myUpdates[name].push_front(newUpdate)
<= /div>
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if=C2=A0myUpdates[name= ].size() > 1024:
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0myUpdates[name].pop_back()
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 batch_row_index +=3D 1
```
<= div>This takes 107minutes to iterate through the first 290GB of data. Witho= ut accessing or filtering the data in any way it takes only 12min to read a= ll the .parquet files into RecordBatches and place them into Plasma.


On W= ed, Apr 14, 2021 at 12:57 PM, Weston Pace <weston.pace@gmail.com> wrote:
If you don't need the p= erformance, you could stay in python (use to_pylist() for the array or as_p= y() for scalars).

If you do need the performance t= hen you're probably better served getting the buffers and operating on = them directly.=C2=A0 Or, even better, making use of the compute kernels:

arr =3D pa.array(['abc', 'ab', '= Xander', None], pa.string())
desired =3D pa.array(['Xande= r'], pa.string())
pc.any(pc.is_in(arr, value_set=3Ddesired)).= as_py() # True

On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai> wrote:
This works for getting a c str= ing out of the CScalar:
```
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 name_buffer =3D (<CBaseBinaryScalar*&= gt;GetResultValue(names.get().\
=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 GetScalar(batch_row_index)).get()).val= ue
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 na= me =3D <char *>name_buffer.get().data()
```
=


On Tue, Apr 1= 3, 2021 at 10:43 PM, Xander Dunn <xander@xander= .ai> wrote:
Here is an example code s= nippet from a .pyx file that successfully iterates through a CRecordBatch a= nd ensures that the timestamps are ascending:
```
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 while= batch_row_index < batch.get().num_rows():
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= timestamp =3D GetResultValue(times.get().GetScalar(batch_row_index))
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 new_timestamp =3D <CTimestampScalar*>timesta= mp.get()
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 current_timestamp =3D timestamps[na= me]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if current_timestamp > new_timestamp.v= alue:
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 abort()
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 batch_row_index +=3D 1
```
=

However, I'm having difficulty operating on the val= ues in a column of string type. Unlike CTimestampScalar, there is no CStrin= gScalar. Although there is a StringScalar type in C++, it isn't defined= in the Cython interface. There is a `CStringType` and a `c_string` type.
```
=C2=A0=C2=A0=C2=A0 while batch_row_index <= ; batch.get().num_rows():
=C2=A0 =C2=A0 =C2=A0 =C2=A0 name = =3D GetResultValue(names.get().GetScalar(batch_row_index))
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 name_string =3D <CStringType*>name.get() = # This is wrong
=C2=A0 =C2=A0 =C2=A0 =C2=A0 printf("%s\n= ", name_string) # This prints garbage
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 if name_string =3D=3D b"Xander": # Doesn't work
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 print("found it&= quot;)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 batch_row_i= ndex +=3D 1
```
How do I get the string value a= s a C type and compare it to other strings?=C2=A0

<= div>Thanks,
Xander
=
<= /div>

--0000000000003cb76f05c00ee2bc--