From user-return-1176-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Wed Apr 14 23:46:06 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 61B4E1804BB for ; Thu, 15 Apr 2021 01:46:06 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 942A643355 for ; Wed, 14 Apr 2021 23:46:05 +0000 (UTC) Received: (qmail 81595 invoked by uid 500); 14 Apr 2021 23:46:05 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 81585 invoked by uid 99); 14 Apr 2021 23:46:05 -0000 Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) (116.203.196.100) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Apr 2021 23:46:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-de.apache.org (ASF Mail Server at spamproc1-he-de.apache.org) with ESMTP id 58BFD1FF465 for ; Wed, 14 Apr 2021 23:46:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org X-Spam-Flag: NO X-Spam-Score: 0.012 X-Spam-Level: X-Spam-Status: No, score=0.012 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_PASS=-0.001, T_FILL_THIS_FORM_SHORT=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-de.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, port 10024) with ESMTP id oZKkF-_g227z for ; Wed, 14 Apr 2021 23:46:03 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.218.49; helo=mail-ej1-f49.google.com; envelope-from=emkornfield@gmail.com; receiver= Received: from mail-ej1-f49.google.com (mail-ej1-f49.google.com [209.85.218.49]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 1B860BCD4E for ; Wed, 14 Apr 2021 23:46:03 +0000 (UTC) Received: by mail-ej1-f49.google.com with SMTP id u17so34031106ejk.2 for ; Wed, 14 Apr 2021 16:46:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:reply-to:from:date:message-id :subject:to; bh=+Dwgdovlavu5x50yb/a2DudrKCYfNPkgL3h6kn8KdFg=; b=dgQ8QXxbmY24DRkSQoz+wCYwPFmybCt21EOTVeZ3MIVWD5P2NXuTVHzCtO8lcrukEx c7qSax71YHdQLcftqe+SvkaE84Bvs9BnbYLXlbZG2UN+DcCuXoXxDv063UpMrwTSeox9 lPnI0vafnosXjhjx+lp9VKwaSwQGcaGmK1jY5BPTvfG0HPrCmVJDLh6Bh2SdxchxXrpC ssUXTD2nDtxbz9KSxzSkjewEqdNqy+IYD4dHRfOjL7oEmfvRNnh+HZiW1STpyNyKW2wG JtEjmuNROToEeMpiWyHCVZvXzQEXlGFBeECQHEQns7S3EeopDlhuWyMEVB6LOX4nTHlv zkgA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:reply-to :from:date:message-id:subject:to; bh=+Dwgdovlavu5x50yb/a2DudrKCYfNPkgL3h6kn8KdFg=; b=sGuLZrIgzJ9qW7hl/lFaCAZ0nAgGV9OMxTcbHs1yrk+zClB18O//hm3AEWdIJNtXPQ R1WnDJeswIQAK+d6NEsoX7yAKiraxCOcLiKZ0Yd18os850s5tKezWlJuEoPtY/jRSjBL K6rtiN0MKq++N0oLixwrLVtnPajLI78DAw6eOYUEPBjyeLUa5H0z8Awy9TH3OujKy5to ckoJnvId6ighCBtd9oMPYVmWSL+mZpifm1yfZSc8VHo+a8MXNf2ZNxyKvnTKibwbMQEC CUcReqqwwcnlGnGYhzdF19jnkZVHJaCf/uO4wQSV/zHcvOgNJTC+Bzz4SiWsem1/UFrP aslQ== X-Gm-Message-State: AOAM531cyDERKHob0ZohKlJRwjUuybQzUoEZiKQLtUm6LHQOPXyEJndt UDr8o6S+YOUkjHa11l1148zqoj8bawF3kHVgZp9Wu9Wq2o8OLw== X-Google-Smtp-Source: ABdhPJzo2QhAr2g5W5kWPOJjPLaQ8c22N4ZB0NTsD/u7K0Y9vw1cMjoqdBtlLv0xj11C+jSGeg7o97UeNtsyQBTLFTM= X-Received: by 2002:a17:906:250d:: with SMTP id i13mr573630ejb.474.1618443961756; Wed, 14 Apr 2021 16:46:01 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: Reply-To: emkornfield@gmail.com From: Micah Kornfield Date: Wed, 14 Apr 2021 16:45:50 -0700 Message-ID: Subject: Re: [Cython] Getting and Comparing String Scalars? To: user@arrow.apache.org Content-Type: multipart/alternative; boundary="0000000000006df12405bff75904" --0000000000006df12405bff75904 Content-Type: text/plain; charset="UTF-8" Have you looked at the pyarrow compute functions [1][2]? Unique and filter seems like they would help. [1] https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute [2] https://arrow.apache.org/docs/cpp/compute.html#compute-function-list On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn wrote: > Thanks Weston, > > Performance is paramount here, I'm streaming through 7TB of data. > > I actually need to separate the data based on the value of the `name` > column. For every unique value of `name`, I need a batch of those rows. I > tried using gandiva's filter function but can't get gandiva installed on > Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on > Ubuntu?" on this mailing list). > > Aside from that, I'm not sure of a way to separate the data faster than > iterating through every row and placing the values into a map keyed on > `name`: > ``` > cdef struct myUpdateStruct: > double value > int64_t checksum > > cdef iterate_dataset(): > cdef map[c_string, deque[myUpdateStruct]] myUpdates > cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner > of .parquet files > cdef int64_t batch_row_index = 0 > while batch_row_index < batch.get().num_rows(): > name_buffer = (GetResultValue(names.get().\ > GetScalar(batch_row_index)).get()).value > name = name_buffer.get().data() > value = (GetResultValue(values.get().\ > GetScalar(batch_row_index)).get()).value > checksum = (GetResultValue(checksums.get().\ > GetScalar(batch_row_index)).get()).value > newUpdate = myUpdateStruct(value, checksum) > if myUpdates.count(name) <= 0: > myUpdates[name] = deque[myUpdateStruct]() > myUpdates[name].push_front(newUpdate) > if myUpdates[name].size() > 1024: > myUpdates[name].pop_back() > batch_row_index += 1 > ``` > This takes 107minutes to iterate through the first 290GB of data. Without > accessing or filtering the data in any way it takes only 12min to read all > the .parquet files into RecordBatches and place them into Plasma. > > > On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace > wrote: > >> If you don't need the performance, you could stay in python (use >> to_pylist() for the array or as_py() for scalars). >> >> If you do need the performance then you're probably better served getting >> the buffers and operating on them directly. Or, even better, making use of >> the compute kernels: >> >> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >> desired = pa.array(['Xander'], pa.string()) >> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >> >> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn wrote: >> >>> This works for getting a c string out of the CScalar: >>> ``` >>> name_buffer = >>> (GetResultValue(names.get().\ >>> GetScalar(batch_row_index)).get()).value >>> name = name_buffer.get().data() >>> ``` >>> >>> >>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn wrote: >>> >>>> Here is an example code snippet from a .pyx file that successfully >>>> iterates through a CRecordBatch and ensures that the timestamps are >>>> ascending: >>>> ``` >>>> while batch_row_index < batch.get().num_rows(): >>>> timestamp = >>>> GetResultValue(times.get().GetScalar(batch_row_index)) >>>> new_timestamp = timestamp.get() >>>> current_timestamp = timestamps[name] >>>> if current_timestamp > new_timestamp.value: >>>> abort() >>>> batch_row_index += 1 >>>> ``` >>>> >>>> However, I'm having difficulty operating on the values in a column of >>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although >>>> there is a StringScalar type in C++, it isn't defined in the Cython >>>> interface. There is a `CStringType` and a `c_string` type. >>>> ``` >>>> while batch_row_index < batch.get().num_rows(): >>>> name = GetResultValue(names.get().GetScalar(batch_row_index)) >>>> name_string = name.get() # This is wrong >>>> printf("%s\n", name_string) # This prints garbage >>>> if name_string == b"Xander": # Doesn't work >>>> print("found it") >>>> batch_row_index += 1 >>>> ``` >>>> How do I get the string value as a C type and compare it to other >>>> strings? >>>> >>>> Thanks, >>>> Xander >>>> >>> > --0000000000006df12405bff75904 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Have you looked at the pyarrow compute functions [1][2]?= =C2=A0=C2=A0


On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <xander@xander.ai> wrote:
Than= ks Weston,

Performance is paramount here, I= 9;m streaming through 7TB of data.

I actually = need to separate the data based on the value of the `name` column. For ever= y unique value of `name`, I need a batch of those rows. I tried using gandi= va's filter function but can't get gandiva installed on Ubuntu (see= my earlier thread "[Python] pyarrow.gandiva unavailable on Ubuntu?" on this mailing list).=C2=A0

Aside= from that, I'm not sure of a way to separate the data faster than iter= ating through every row and placing the values into a map keyed on `name`:<= br>
```
cdef struct myUpdateStruct:
= =C2=A0=C2=A0=C2=A0 double value
=C2=A0 =C2=A0 int64_t checksu= m

cdef iterate_dataset():
=C2= =A0 =C2=A0 cdef map[c_string, deque[myUpdateStruct]] myUpdates
=C2=A0 =C2=A0 cdef shared_ptr[CRecordBatch] batch # This is populated by = a scanner of .parquet files
=C2=A0 =C2=A0 cdef int64_t batch_= row_index =3D 0
=C2=A0=C2=A0=C2=A0 while batch_row_index < bat= ch.get().num_rows():
=C2=A0 =C2=A0 =C2=A0 =C2=A0 name_buffer = =3D (<CBaseBinaryScalar*>GetResultValue(names.get().\
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 GetScalar(batch_row_index)).get()).value
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 name =3D <char *>name_buffer.get().data()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 value =3D (<CDoubleScalar*>Ge= tResultValue(values.get().\
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 GetScalar(batch_r= ow_index)).get()).value
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 checksum =3D (<CInt64Scalar*>GetResultValue(checksums.get().\<= br>
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 GetScalar(batch_row_index)).get()).value
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 newUpdate =3D myUpdateStruct(value, = checksum)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if myUpd= ates.count(name) <=3D 0:
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 myUpdates[name] =3D deque[myUpdateStruct]()
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0myUpdates[name].push_front(newUpd= ate)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 if=C2=A0myUpd= ates[name].size() > 1024:
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0myUpdates[name].pop_back()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 batch_row_index +=3D 1
```
This takes 107minutes to iterate through the first 290GB of dat= a. Without accessing or filtering the data in any way it takes only 12min t= o read all the .parquet files into RecordBatches and place them into Plasma= .
3D""

<= div>

On Wed, Apr 14, 2021 at= 12:57 PM, Weston Pace <weston.pace@gmail.com> wrote:
If you don't need the performance, you could stay in py= thon (use to_pylist() for the array or as_py() for scalars).

=
If you do need the performance then you're probably better s= erved getting the buffers and operating on them directly.=C2=A0 Or, even be= tter, making use of the compute kernels:

arr =3D p= a.array(['abc', 'ab', 'Xander', None], pa.string())=
desired =3D pa.array(['Xander'], pa.string())
= pc.any(pc.is_in(arr, value_set=3Ddesired)).as_py() # True
On Wed, = Apr 14, 2021 at 6:29 AM Xander Dunn <xander@xander.ai> wro= te:
This works for getting a c string out of the CScalar:
```
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= name_buffer =3D (<CBaseBinaryScalar*>GetResultValue(names.get().\
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= GetScalar(batch_row_index)).get()).value
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 name =3D <char *>name_buffer.g= et().data()
```


On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <= span dir=3D"ltr"><xander@xander.ai> wrote:
Here is an example code snippet from a .pyx= file that successfully iterates through a CRecordBatch and ensures that th= e timestamps are ascending:
```
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 while batch_row_index &= lt; batch.get().num_rows():
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 timestamp =3D Get= ResultValue(times.get().GetScalar(batch_row_index))
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 new_timestamp =3D <CTimestampScalar*>timestamp.get()
=
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 current_timestamp =3D timestamps[name]
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0 if current_timestamp > new_timestamp.value:
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 abort()
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 batch_row_index +=3D 1
```

However, I'm having difficulty operating on the values in a column of= string type. Unlike CTimestampScalar, there is no CStringScalar. Although = there is a StringScalar type in C++, it isn't defined in the Cython int= erface. There is a `CStringType` and a `c_string` type.
```
=C2=A0=C2=A0=C2=A0 while batch_row_index < batch.get().num_= rows():
=C2=A0 =C2=A0 =C2=A0 =C2=A0 name =3D GetResultValue(n= ames.get().GetScalar(batch_row_index))
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 name_string =3D <CStringType*>name.get() # This is wrong
=C2=A0 =C2=A0 =C2=A0 =C2=A0 printf("%s\n", name_string) = # This prints garbage
=C2=A0 =C2=A0 =C2=A0 =C2=A0 if name_str= ing =3D=3D b"Xander": # Doesn't work
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 print("found it")
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 batch_row_index +=3D 1
=
```
How do I get the string value as a C type and compar= e it to other strings?=C2=A0

Thanks,
=
Xander

= --0000000000006df12405bff75904--