From user-return-843-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Fri Dec 11 15:16:44 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id D34F1180637 for ; Fri, 11 Dec 2020 16:16:43 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 0E938494ED for ; Fri, 11 Dec 2020 15:16:43 +0000 (UTC) Received: (qmail 46963 invoked by uid 500); 11 Dec 2020 15:16:42 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 46953 invoked by uid 99); 11 Dec 2020 15:16:42 -0000 Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org) (95.217.134.168) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2020 15:16:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-fi.apache.org (ASF Mail Server at spamproc1-he-fi.apache.org) with ESMTP id E1E9AC0110 for ; Fri, 11 Dec 2020 15:16:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org X-Spam-Flag: NO X-Spam-Score: 3.549 X-Spam-Level: *** X-Spam-Status: No, score=3.549 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=0.2, MANY_SPAN_IN_TEXT=3.299, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([116.203.227.195]) by localhost (spamproc1-he-fi.apache.org [95.217.134.168]) (amavisd-new, port 10024) with ESMTP id hgP-W-e70d8y for ; Fri, 11 Dec 2020 15:16:40 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::12f; helo=mail-lf1-x12f.google.com; envelope-from=andygrove73@gmail.com; receiver= Received: from mail-lf1-x12f.google.com (mail-lf1-x12f.google.com [IPv6:2a00:1450:4864:20::12f]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 33A6B7F9E5 for ; Fri, 11 Dec 2020 15:16:40 +0000 (UTC) Received: by mail-lf1-x12f.google.com with SMTP id w13so13796933lfd.5 for ; Fri, 11 Dec 2020 07:16:40 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=EsnsWHoPXzyPJXPlmdyUWdrn6d6lysQxDLwbe8pawtA=; b=tTnuZTY7aMYkZ8kIAxXQniv1mjcLf+Ry7XBcAulIcnLKN2M6opxwjH83j/7MpY9U+J xOnuCF0OtayaVR1H69Lxc5f3vTmEdsPyOMrYbLjovTj95xAdm0WGfQjZntzce3PXGjIJ lvGFZy3etTl/7oY2oasE+0t4v2MJJ5Mv4vuHXzeK84AeKweQocJmrXuuuHO12n9pEgLC IKEhucHSKTTgmet8d/55G5/a8bDsDJIoXS1pZh7ROanXRZucwXGWlFQ2ne8dnCdWX3aI s4vswbxZ5CS+gdpK+b243pZNW9PlcNSM/b09DhRK0wBa81QqeVoyGOo6iaf1pDT+N+8P JMHw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=EsnsWHoPXzyPJXPlmdyUWdrn6d6lysQxDLwbe8pawtA=; b=Jqs28FvGJPaxGfTdt/gblJin42IcgM83rLsufXSj5CR+OgsYQd4o8r4Igvfrb4nirG NeX2bwr4+AB8YSyQ3GnUiM1LxJq3Vzwl9Av1PiFXzgzi3Nl12hab3yL5I6TGRUwwxSUs UYdV6HMI7T2GoglvLgAerU6ExcGi2ut5FXzYzR8ZBVExvtsbgxgzi8gUyug+6THEGeXG bfo+o2yNWuRnFKIbmDPjn/BFJQQ9+tTZNcDAJjiaQftY0ZZAsPlh5THBcywvsoeQ96wb Nkl3Mds25pXvEyYI+YW7uRqxTjRAHvIsDjl9pe0KBM5zGMc6C5ihIPCfdCh6qZ76hEYr yHaQ== X-Gm-Message-State: AOAM532mBxdxovUDX+OwbNasAnL65ayLJEbPaItUGel4b4STp45d5LHX C4E8i0K+VlhHYWQqYDtkd0yaCYcP1R1yBVHPPp/pxqCFG7BAgw== X-Google-Smtp-Source: ABdhPJxnYiQfVrSprT4P3wciOoQSFE3CHnEm4uilrgPeIOwgJa+m0yhfY+XeSRl8MlWBGWBJHay4h3PBT8ASP7BJdj8= X-Received: by 2002:a19:a0c:: with SMTP id 12mr4878805lfk.436.1607699798999; Fri, 11 Dec 2020 07:16:38 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Andy Grove Date: Fri, 11 Dec 2020 08:16:27 -0700 Message-ID: Subject: Re: [Rust] DataFusion performance To: user@arrow.apache.org Content-Type: multipart/alternative; boundary="0000000000006cc8f305b631c743" --0000000000006cc8f305b631c743 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Matthew, Using the latest DataFusion from GitHub master branch, the following code works for in-memory: use std::sync::Arc; use std::time::Instant; use datafusion::error::Result; use datafusion::prelude::*; use datafusion::datasource::MemTable; #[tokio::main] async fn main() -> Result<()> { //TODO add command-line args let ratings_csv =3D "/tmp/movies/ratings_small.csv"; let mut ctx =3D ExecutionContext::new(); let df =3D ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap(); let batches =3D vec![df.collect().await?]; let provider =3D MemTable::new(Arc::new(df.schema().to_owned().into()), batches)?; ctx.register_table("memory_table", Box::new(provider)); let mem_df =3D ctx.table("memory_table")?; let q_start =3D Instant::now(); let _results =3D mem_df .filter(col("userId").eq(lit(1)))? .collect() .await .unwrap(); println!("Duration: {:?}", q_start.elapsed()); Ok(()) } Andy. On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner wrote: > Played around some more - it was because I wasn=E2=80=99t using --release= flag. > Sry about that, still learning rust. > > Using that flag, the total time to read and filter is between 52 and 80ms= . > > In general, what should I expect when comparing the performance of pandas > to datafusion? > > @Andy Grove thanks for adding that. If there is a need for additional > datafusion benchmarks and what I do could help with that then I would be > happy to contribute it. I will send a follow up once ive made progress. > > I'm also still having trouble with that memory table, so any help there i= s > appreciated. > > Thanks for your time! Very excited by this. > > Matthew M. Turner > Email: matthew.m.turner@outlook.com > Phone: (908)-868-2786 > > -----Original Message----- > From: Matthew Turner > Sent: Friday, December 11, 2020 12:24 AM > To: user@arrow.apache.org > Subject: RE: [Rust] DataFusion performance > > Thanks for context! Makes sense. > > Even with that, when comparing the total time of each (read + filter) > DataFusion still appears much slower(~625ms vs 33ms). Is that expected? > > Also, im trying to bring the table in memory now to perform the > computation from there and compare performance. Code below. But I'm > getting an error (beneath the code) even though I think ive constructed t= he > MemTable correctly (from [1]). From what I see all the types are the sam= e > as when I used the original df from read_csv so I'm not sure what I'm doi= ng > wrong. > > I also saw there was an open issue [2] for this error type raised on > rust-lang - so im unsure if its my implementation, datafusion/arrow issue= , > or Rust issue. > > Thanks again for help! > > ``` > let sch =3D Arc::new(df.schema()); > let batches =3D vec![df.collect().await?]; > let provider =3D MemTable::new(sch, batches)?; > > ctx.register_table("memory_table", Box::new(provider)); > > let mem_df =3D ctx.table("memory_table")?; > > let q_start =3D Instant::now(); > let results =3D mem_df > .filter(col("userId").eq(lit(1)))? > .collect() > .await > .unwrap(); > ``` > > Which is returning this error: > > error[E0698]: type inside `async` block must be known in this context > --> src\main.rs:37:38 > | > 37 | .filter(col("userId").eq(lit(1)))? > | ^ cannot infer type for type > `{integer}` > | > note: the type is part of the `async` block because of this `await` > --> src\main.rs:36:19 > | > 36 | let results =3D mem_df > | ___________________^ > 37 | | .filter(col("userId").eq(lit(1)))? > 38 | | .collect() > 39 | | .await > | |______________^ > > > [1] > https://github.com/apache/arrow/blob/master/rust/datafusion/examples/data= frame_in_memory.rs > [2] https://github.com/rust-lang/rust/issues/63502 > > Matthew M. Turner > Email: matthew.m.turner@outlook.com > Phone: (908)-868-2786 > > -----Original Message----- > From: Michael Mior > Sent: Thursday, December 10, 2020 8:55 PM > To: user@arrow.apache.org > Subject: Re: [Rust] DataFusion performance > > Contrary to what you might expect given the name, read_csv does not > actually read the CSV file. It instead creates the start of a logical > execution plan which involves reading the CSV file when that plan is > finally executed. This happens when you call collect(). > > Pandas read_csv on the other hand immediately reads the CSV file. So > you're comparing the time of reading AND filtering the file > (DataFusion) with the time to filter data which has already been read > (Pandas). > > There's nothing wrong with your use of DataFusion per se, you simply > weren't measuring what you thought you were measuring. > -- > Michael Mior > mmior@apache.org > > Le jeu. 10 d=C3=A9c. 2020 =C3=A0 17:11, Matthew Turner > a =C3=A9crit : > > > > Hello, > > > > > > > > I=E2=80=99ve been playing around with DataFusion to explore the feasibi= lity of > replacing current python/pandas data processing jobs with Rust/datafusion= . > Ultimately, looking to improve performance / decrease cost. > > > > > > > > I was doing some simple tests to start to measure performance > differences on a simple task (read a csv[1] and filter it). > > > > > > > > Reading the csv datafusion seemed to outperform pandas by around 30% > which was nice. > > > > *Rust took around 20-25ms to read the csv (compared to 32ms from > > pandas) > > > > > > > > However, when filtering the data I was surprised to see that pandas was > way faster. > > > > *Rust took around 500-600ms to filter the csv(compared to 1ms from > > pandas) > > > > > > > > My code for each is below. I know I should be running the DataFusion > times through something similar to pythons %timeit but I didn=E2=80=99t h= ave that > immediately accessible and I ran many times to confirm it was roughly > consistent. > > > > > > > > Is this performance expected? Or am I using datafusion incorrectly? > > > > > > > > Any insight is much appreciated! > > > > > > > > [Rust] > > > > ``` > > > > use datafusion::error::Result; > > > > use datafusion::prelude::*; > > > > use std::time::Instant; > > > > > > > > #[tokio::main] > > > > async fn main() -> Result<()> { > > > > let start =3D Instant::now(); > > > > > > > > let mut ctx =3D ExecutionContext::new(); > > > > > > > > let ratings_csv =3D "ratings_small.csv"; > > > > > > > > let df =3D ctx.read_csv(ratings_csv, > > CsvReadOptions::new()).unwrap(); > > > > println!("Read CSV Duration: {:?}", start.elapsed()); > > > > > > > > let q_start =3D Instant::now(); > > > > let results =3D df > > > > .filter(col("userId").eq(lit(1)))? > > > > .collect() > > > > .await > > > > .unwrap(); > > > > println!("Filter duration: {:?}", q_start.elapsed()); > > > > > > > > println!("Duration: {:?}", start.elapsed()); > > > > > > > > Ok(()) > > > > } > > > > ``` > > > > > > > > [Python] > > > > ``` > > > > In [1]: df =3D pd.read_csv(=E2=80=9Cratings_small.csv=E2=80=9D) > > > > 32.4 ms =C2=B1 210 =C2=B5s per loop (mean =C2=B1 std. dev. of 7 runs, 1= 0 loops each) > > > > > > > > In [2]: df.query(=E2=80=9CuserId=3D=3D1=E2=80=9D) > > > > 1.16 ms =C2=B1 24.5 =C2=B5s per loop (mean =C2=B1 std. dev. of 7 runs, = 1000 loops > > each) > > > > ``` > > > > > > > > [1]: > > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=3Dratings.= c > > sv > > > > > > > > > > > > Matthew M. Turner > > > > Email: matthew.m.turner@outlook.com > > > > Phone: (908)-868-2786 > > > > > --0000000000006cc8f305b631c743 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Matthew,

Using the latest DataFusion from GitHub master branch, the following code = works for in-memory:
use std::sync::Arc;
use std::time::Instant;

<= /span>use datafusion::error::R= esult;
use datafusion::prelude::*;
use = datafusion::datasource::MemTable;
=

#[tokio::main]
async fn main()= -> Result<()> {
//<= /span>TODO add comm= and-line args
let ratings_cs= v =3D "/tmp/movies/ratings_small= .csv";
let mut ctx =3D ExecutionContext::= new();
let df =3D ctx.rea= d_csv(ratings_csv, CsvR= eadOptions::new()).unwrap();
let batches =3D vec![df.collect().await?];
let provid= er =3D MemTable::n= ew(Arc::new= (df.schema().to_owned().into()), ba= tches)?;
ctx.r= egister_table("memory_tab= le", Box::new(provider));
let mem_df =3D ctx.table("memory_table&quo= t;)?;
let q_start =3D Instant::now();
let = _results =3D mem_df
.filter(col("userId").eq(lit(1)))?
= .collect()
= .await
.= unwrap();
println!("Duration: {:?}", q_start.elapsed());
Ok(())
}

Andy.

On= Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <matthew.m.turner@outlook.com> wrote:
=
Played around some more -= it was because I wasn=E2=80=99t using --release flag.=C2=A0 Sry about that= , still learning rust.

Using that flag, the total time to read and filter is between 52 and 80ms.<= br>
In general, what should I expect when comparing the performance of pandas t= o datafusion?

@Andy Grove thanks for adding that.=C2=A0 If there is a need for additional= datafusion benchmarks and what I do could help with that then I would be h= appy to contribute it.=C2=A0 I will send a follow up once ive made progress= .

I'm also still having trouble with that memory table, so any help there= is appreciated.

Thanks for your time!=C2=A0 Very excited by this.

Matthew M. Turner
Email: ma= tthew.m.turner@outlook.com
Phone: (908)-868-2786

-----Original Message-----
From: Matthew Turner <matthew.m.turner@outlook.com>
Sent: Friday, December 11, 2020 12:24 AM
To: user@arrow.a= pache.org
Subject: RE: [Rust] DataFusion performance

Thanks for context! Makes sense.

Even with that, when comparing the total time of each (read + filter) DataF= usion still appears much slower(~625ms vs 33ms).=C2=A0 Is that expected?
Also, im trying to bring the table in memory now to perform the computation= from there and compare performance.=C2=A0 Code below.=C2=A0 But I'm ge= tting an error (beneath the code) even though I think ive constructed the M= emTable correctly (from [1]).=C2=A0 From what I see all the types are the s= ame as when I used the original df from read_csv so I'm not sure what I= 'm doing wrong.

I also saw there was an open issue [2] for this error type raised on rust-l= ang - so im unsure if its my implementation, datafusion/arrow issue, or Rus= t issue.

Thanks again for help!

```
=C2=A0 =C2=A0 let sch =3D Arc::new(df.schema());
=C2=A0 =C2=A0 let batches =3D vec![df.collect().await?];
=C2=A0 =C2=A0 let provider =3D MemTable::new(sch, batches)?;

=C2=A0 =C2=A0 ctx.register_table("memory_table", Box::new(provide= r));

=C2=A0 =C2=A0 let mem_df =3D ctx.table("memory_table")?;

=C2=A0 =C2=A0 let q_start =3D Instant::now();
=C2=A0 =C2=A0 let results =3D mem_df
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .filter(col("userId").eq(lit(1)))? =C2=A0 =C2=A0 =C2=A0 =C2=A0 .collect()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .await
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .unwrap();
```

Which is returning this error:

error[E0698]: type inside `async` block must be known in this context
=C2=A0 --> src\main.rs:37:38
=C2=A0 =C2=A0|
37 |=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.filter(col("userId").eq(li= t(1)))?
=C2=A0 =C2=A0|=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ^= cannot infer type for type `{integer}`
=C2=A0 =C2=A0|
note: the type is part of the `async` block because of this `await`
=C2=A0 --> src\main.rs:36:19
=C2=A0 =C2=A0|
36 |=C2=A0 =C2=A0 =C2=A0 =C2=A0let results =3D mem_df
=C2=A0 =C2=A0|=C2=A0 ___________________^
37 | |=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.filter(col("userId").eq(= lit(1)))?
38 | |=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.collect()
39 | |=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.await
=C2=A0 =C2=A0| |______________^


[1] https= ://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_i= n_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502=

Matthew M. Turner
Email: ma= tthew.m.turner@outlook.com
Phone: (908)-868-2786

-----Original Message-----
From: Michael Mior <mmior@apache.org>
Sent: Thursday, December 10, 2020 8:55 PM
To: user@arrow.a= pache.org
Subject: Re: [Rust] DataFusion performance

Contrary to what you might expect given the name, read_csv does not actuall= y read the CSV file. It instead creates the start of a logical execution pl= an which involves reading the CSV file when that plan is finally executed. = This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So you= 9;re comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pand= as).

There's nothing wrong with your use of DataFusion per se, you simply we= ren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org<= br>
Le jeu. 10 d=C3=A9c. 2020 =C3=A0 17:11, Matthew Turner <matthew.m.turner@outlook.= com> a =C3=A9crit :
>
> Hello,
>
>
>
> I=E2=80=99ve been playing around with DataFusion to explore the feasib= ility of replacing current python/pandas data processing jobs with Rust/dat= afusion.=C2=A0 Ultimately, looking to improve performance / decrease cost.<= br> >
>
>
> I was doing some simple tests to start to measure performance differen= ces on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% w= hich was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas wa= s way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from
> pandas)
>
>
>
> My code for each is below.=C2=A0 I know I should be running the DataFu= sion times through something similar to pythons %timeit but I didn=E2=80=99= t have that immediately accessible and I ran many times to confirm it was r= oughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly? >
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>=C2=A0 =C2=A0 =C2=A0let start =3D Instant::now();
>
>
>
>=C2=A0 =C2=A0 =C2=A0let mut ctx =3D ExecutionContext::new();
>
>
>
>=C2=A0 =C2=A0 =C2=A0let ratings_csv =3D "ratings_small.csv";<= br> >
>
>
>=C2=A0 =C2=A0 =C2=A0let df =3D ctx.read_csv(ratings_csv,
> CsvReadOptions::new()).unwrap();
>
>=C2=A0 =C2=A0 =C2=A0println!("Read CSV Duration: {:?}", start= .elapsed());
>
>
>
>=C2=A0 =C2=A0 =C2=A0let q_start =3D Instant::now();
>
>=C2=A0 =C2=A0 =C2=A0let results =3D df
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.filter(col("userId").eq(li= t(1)))?
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.collect()
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.await
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0.unwrap();
>
>=C2=A0 =C2=A0 =C2=A0println!("Filter duration: {:?}", q_start= .elapsed());
>
>
>
>=C2=A0 =C2=A0 =C2=A0println!("Duration: {:?}", start.elapsed(= ));
>
>
>
>=C2=A0 =C2=A0 =C2=A0Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df =3D pd.read_csv(=E2=80=9Cratings_small.csv=E2=80=9D)
>
> 32.4 ms =C2=B1 210 =C2=B5s per loop (mean =C2=B1 std. dev. of 7 runs, = 10 loops each)
>
>
>
> In [2]: df.query(=E2=80=9CuserId=3D=3D1=E2=80=9D)
>
> 1.16 ms =C2=B1 24.5 =C2=B5s per loop (mean =C2=B1 std. dev. of 7 runs,= 1000 loops
> each)
>
> ```
>
>
>
> [1]:
> https://www.kaggle.com/= rounakbanik/the-movies-dataset?select=3Dratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>
--0000000000006cc8f305b631c743--