arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jorge Cardoso Leitão <jorgecarlei...@gmail.com>
Subject Re: [Rust] DataFusion performance
Date Sat, 12 Dec 2020 04:05:32 GMT
Hi Mattew,

SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on an
Arc.

We do this because the plans are often passed between thread boundaries and
thus wrapping them on an Arc allows that.

Best,
Jorge


On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner <matthew.m.turner@outlook.com>
wrote:

> Thanks! Converting the schema to owned made it work.
>
>
>
> The type of the schema param is SchemaRef – which I thought would allow a
> reference.  Is this not the case?
>
>
>
> *Matthew M. Turner*
>
> Email*:* matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>
>
> *From:* Andy Grove <andygrove73@gmail.com>
> *Sent:* Friday, December 11, 2020 10:16 AM
> *To:* user@arrow.apache.org
> *Subject:* Re: [Rust] DataFusion performance
>
>
>
> Hi Matthew,
>
>
>
> Using the latest DataFusion from GitHub master branch, the following code
> works for in-memory:
>
> use std::sync::Arc;
> use std::time::Instant;
>
> use datafusion::error::Result;
> use datafusion::prelude::*;
> use datafusion::datasource::MemTable;
>
> #[tokio::main]
> async fn main() -> Result<()> {
>     //
> *TODO add command-line args    *let ratings_csv = "/tmp/movies/ratings_small.csv";
>     let mut ctx = ExecutionContext::*new*();
>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::*new*()).unwrap();
>     let batches = vec![df.collect().await?];
>     let provider = MemTable::*new*(Arc::*new*(df.schema().to_owned().into()), batches)?;
>     ctx.register_table("memory_table", Box::*new*(provider));
>     let mem_df = ctx.table("memory_table")?;
>     let q_start = Instant::*now*();
>     let _results = mem_df
>         .filter(col("userId").eq(lit(1)))?
>         .collect()
>         .await
>         .unwrap();
>     println!("Duration: {:?}", q_start.elapsed());
>     *Ok*(())
> }
>
>
>
> Andy.
>
>
>
> On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <
> matthew.m.turner@outlook.com> wrote:
>
> Played around some more - it was because I wasn’t using --release flag.
> Sry about that, still learning rust.
>
> Using that flag, the total time to read and filter is between 52 and 80ms.
>
> In general, what should I expect when comparing the performance of pandas
> to datafusion?
>
> @Andy Grove thanks for adding that.  If there is a need for additional
> datafusion benchmarks and what I do could help with that then I would be
> happy to contribute it.  I will send a follow up once ive made progress.
>
> I'm also still having trouble with that memory table, so any help there is
> appreciated.
>
> Thanks for your time!  Very excited by this.
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Matthew Turner <matthew.m.turner@outlook.com>
> Sent: Friday, December 11, 2020 12:24 AM
> To: user@arrow.apache.org
> Subject: RE: [Rust] DataFusion performance
>
> Thanks for context! Makes sense.
>
> Even with that, when comparing the total time of each (read + filter)
> DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?
>
> Also, im trying to bring the table in memory now to perform the
> computation from there and compare performance.  Code below.  But I'm
> getting an error (beneath the code) even though I think ive constructed the
> MemTable correctly (from [1]).  From what I see all the types are the same
> as when I used the original df from read_csv so I'm not sure what I'm doing
> wrong.
>
> I also saw there was an open issue [2] for this error type raised on
> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
> or Rust issue.
>
> Thanks again for help!
>
> ```
>     let sch = Arc::new(df.schema());
>     let batches = vec![df.collect().await?];
>     let provider = MemTable::new(sch, batches)?;
>
>     ctx.register_table("memory_table", Box::new(provider));
>
>     let mem_df = ctx.table("memory_table")?;
>
>     let q_start = Instant::now();
>     let results = mem_df
>         .filter(col("userId").eq(lit(1)))?
>         .collect()
>         .await
>         .unwrap();
> ```
>
> Which is returning this error:
>
> error[E0698]: type inside `async` block must be known in this context
>   --> src\main.rs:37:38
>    |
> 37 |         .filter(col("userId").eq(lit(1)))?
>    |                                      ^ cannot infer type for type
> `{integer}`
>    |
> note: the type is part of the `async` block because of this `await`
>   --> src\main.rs:36:19
>    |
> 36 |       let results = mem_df
>    |  ___________________^
> 37 | |         .filter(col("userId").eq(lit(1)))?
> 38 | |         .collect()
> 39 | |         .await
>    | |______________^
>
>
> [1]
> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
> [2] https://github.com/rust-lang/rust/issues/63502
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Michael Mior <mmior@apache.org>
> Sent: Thursday, December 10, 2020 8:55 PM
> To: user@arrow.apache.org
> Subject: Re: [Rust] DataFusion performance
>
> Contrary to what you might expect given the name, read_csv does not
> actually read the CSV file. It instead creates the start of a logical
> execution plan which involves reading the CSV file when that plan is
> finally executed. This happens when you call collect().
>
> Pandas read_csv on the other hand immediately reads the CSV file. So
> you're comparing the time of reading AND filtering the file
> (DataFusion) with the time to filter data which has already been read
> (Pandas).
>
> There's nothing wrong with your use of DataFusion per se, you simply
> weren't measuring what you thought you were measuring.
> --
> Michael Mior
> mmior@apache.org
>
> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <matthew.m.turner@outlook.com>
> a écrit :
> >
> > Hello,
> >
> >
> >
> > I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
> >
> >
> >
> > I was doing some simple tests to start to measure performance
> differences on a simple task (read a csv[1] and filter it).
> >
> >
> >
> > Reading the csv datafusion seemed to outperform pandas by around 30%
> which was nice.
> >
> > *Rust took around 20-25ms to read the csv (compared to 32ms from
> > pandas)
> >
> >
> >
> > However, when filtering the data I was surprised to see that pandas was
> way faster.
> >
> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
> > pandas)
> >
> >
> >
> > My code for each is below.  I know I should be running the DataFusion
> times through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
> >
> >
> >
> > Is this performance expected? Or am I using datafusion incorrectly?
> >
> >
> >
> > Any insight is much appreciated!
> >
> >
> >
> > [Rust]
> >
> > ```
> >
> > use datafusion::error::Result;
> >
> > use datafusion::prelude::*;
> >
> > use std::time::Instant;
> >
> >
> >
> > #[tokio::main]
> >
> > async fn main() -> Result<()> {
> >
> >     let start = Instant::now();
> >
> >
> >
> >     let mut ctx = ExecutionContext::new();
> >
> >
> >
> >     let ratings_csv = "ratings_small.csv";
> >
> >
> >
> >     let df = ctx.read_csv(ratings_csv,
> > CsvReadOptions::new()).unwrap();
> >
> >     println!("Read CSV Duration: {:?}", start.elapsed());
> >
> >
> >
> >     let q_start = Instant::now();
> >
> >     let results = df
> >
> >         .filter(col("userId").eq(lit(1)))?
> >
> >         .collect()
> >
> >         .await
> >
> >         .unwrap();
> >
> >     println!("Filter duration: {:?}", q_start.elapsed());
> >
> >
> >
> >     println!("Duration: {:?}", start.elapsed());
> >
> >
> >
> >     Ok(())
> >
> > }
> >
> > ```
> >
> >
> >
> > [Python]
> >
> > ```
> >
> > In [1]: df = pd.read_csv(“ratings_small.csv”)
> >
> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
> >
> >
> >
> > In [2]: df.query(“userId==1”)
> >
> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> > each)
> >
> > ```
> >
> >
> >
> > [1]:
> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> > sv
> >
> >
> >
> >
> >
> > Matthew M. Turner
> >
> > Email: matthew.m.turner@outlook.com
> >
> > Phone: (908)-868-2786
> >
> >
>
>

Mime
View raw message