arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daniël Heres <danielhe...@gmail.com>
Subject Re: [Rust] DataFusion performance
Date Sat, 12 Dec 2020 13:09:47 GMT
Hello Matthew,

If you want to try to get absolutely the best performance you can get now
from DataFusion:

* Make sure you are using the latest version from master, there have been a
lot of improvements lately.

* Compile DataFusion with "simd" feature on. This requires a recent version
of DataFusion, but it gives speeds for some computations.

* Compile your code with lto = true like this in your Cargo.toml file:

[profile.release]
lto = true

This will increase the compile time considerably, but allows Rust / LLVM to
do more optimizations on the entire program. There are some other settings
documented here
https://doc.rust-lang.org/cargo/reference/profiles.html#release

* Set the environment variable RUSTFLAGS="-C target-cpu=native". this
allows Rust/LLVM to use all CPU instructions available on your CPU. This
way the binary becomes not portable anymore though.

We are also improving the performance over time, e.g. recently a lot parts
in Arrow / DataFusion have been improved in the last months such as faster
CSV reader and faster computations, and there is still a lot to come.

Best,

Daniël

Op za 12 dec. 2020 om 05:06 schreef Jorge Cardoso Leitão <
jorgecarleitao@gmail.com>:

> Hi Mattew,
>
> SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on
> an Arc.
>
> We do this because the plans are often passed between thread boundaries
> and thus wrapping them on an Arc allows that.
>
> Best,
> Jorge
>
>
> On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner <
> matthew.m.turner@outlook.com> wrote:
>
>> Thanks! Converting the schema to owned made it work.
>>
>>
>>
>> The type of the schema param is SchemaRef – which I thought would allow a
>> reference.  Is this not the case?
>>
>>
>>
>> *Matthew M. Turner*
>>
>> Email*:* matthew.m.turner@outlook.com
>>
>> Phone: (908)-868-2786
>>
>>
>>
>> *From:* Andy Grove <andygrove73@gmail.com>
>> *Sent:* Friday, December 11, 2020 10:16 AM
>> *To:* user@arrow.apache.org
>> *Subject:* Re: [Rust] DataFusion performance
>>
>>
>>
>> Hi Matthew,
>>
>>
>>
>> Using the latest DataFusion from GitHub master branch, the following code
>> works for in-memory:
>>
>> use std::sync::Arc;
>> use std::time::Instant;
>>
>> use datafusion::error::Result;
>> use datafusion::prelude::*;
>> use datafusion::datasource::MemTable;
>>
>> #[tokio::main]
>> async fn main() -> Result<()> {
>>     //
>> *TODO add command-line args    *let ratings_csv = "/tmp/movies/ratings_small.csv";
>>     let mut ctx = ExecutionContext::*new*();
>>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::*new*()).unwrap();
>>     let batches = vec![df.collect().await?];
>>     let provider = MemTable::*new*(Arc::*new*(df.schema().to_owned().into()), batches)?;
>>     ctx.register_table("memory_table", Box::*new*(provider));
>>     let mem_df = ctx.table("memory_table")?;
>>     let q_start = Instant::*now*();
>>     let _results = mem_df
>>         .filter(col("userId").eq(lit(1)))?
>>         .collect()
>>         .await
>>         .unwrap();
>>     println!("Duration: {:?}", q_start.elapsed());
>>     *Ok*(())
>> }
>>
>>
>>
>> Andy.
>>
>>
>>
>> On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <
>> matthew.m.turner@outlook.com> wrote:
>>
>> Played around some more - it was because I wasn’t using --release flag.
>> Sry about that, still learning rust.
>>
>> Using that flag, the total time to read and filter is between 52 and 80ms.
>>
>> In general, what should I expect when comparing the performance of pandas
>> to datafusion?
>>
>> @Andy Grove thanks for adding that.  If there is a need for additional
>> datafusion benchmarks and what I do could help with that then I would be
>> happy to contribute it.  I will send a follow up once ive made progress.
>>
>> I'm also still having trouble with that memory table, so any help there
>> is appreciated.
>>
>> Thanks for your time!  Very excited by this.
>>
>> Matthew M. Turner
>> Email: matthew.m.turner@outlook.com
>> Phone: (908)-868-2786
>>
>> -----Original Message-----
>> From: Matthew Turner <matthew.m.turner@outlook.com>
>> Sent: Friday, December 11, 2020 12:24 AM
>> To: user@arrow.apache.org
>> Subject: RE: [Rust] DataFusion performance
>>
>> Thanks for context! Makes sense.
>>
>> Even with that, when comparing the total time of each (read + filter)
>> DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?
>>
>> Also, im trying to bring the table in memory now to perform the
>> computation from there and compare performance.  Code below.  But I'm
>> getting an error (beneath the code) even though I think ive constructed the
>> MemTable correctly (from [1]).  From what I see all the types are the same
>> as when I used the original df from read_csv so I'm not sure what I'm doing
>> wrong.
>>
>> I also saw there was an open issue [2] for this error type raised on
>> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
>> or Rust issue.
>>
>> Thanks again for help!
>>
>> ```
>>     let sch = Arc::new(df.schema());
>>     let batches = vec![df.collect().await?];
>>     let provider = MemTable::new(sch, batches)?;
>>
>>     ctx.register_table("memory_table", Box::new(provider));
>>
>>     let mem_df = ctx.table("memory_table")?;
>>
>>     let q_start = Instant::now();
>>     let results = mem_df
>>         .filter(col("userId").eq(lit(1)))?
>>         .collect()
>>         .await
>>         .unwrap();
>> ```
>>
>> Which is returning this error:
>>
>> error[E0698]: type inside `async` block must be known in this context
>>   --> src\main.rs:37:38
>>    |
>> 37 |         .filter(col("userId").eq(lit(1)))?
>>    |                                      ^ cannot infer type for type
>> `{integer}`
>>    |
>> note: the type is part of the `async` block because of this `await`
>>   --> src\main.rs:36:19
>>    |
>> 36 |       let results = mem_df
>>    |  ___________________^
>> 37 | |         .filter(col("userId").eq(lit(1)))?
>> 38 | |         .collect()
>> 39 | |         .await
>>    | |______________^
>>
>>
>> [1]
>> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
>> [2] https://github.com/rust-lang/rust/issues/63502
>>
>> Matthew M. Turner
>> Email: matthew.m.turner@outlook.com
>> Phone: (908)-868-2786
>>
>> -----Original Message-----
>> From: Michael Mior <mmior@apache.org>
>> Sent: Thursday, December 10, 2020 8:55 PM
>> To: user@arrow.apache.org
>> Subject: Re: [Rust] DataFusion performance
>>
>> Contrary to what you might expect given the name, read_csv does not
>> actually read the CSV file. It instead creates the start of a logical
>> execution plan which involves reading the CSV file when that plan is
>> finally executed. This happens when you call collect().
>>
>> Pandas read_csv on the other hand immediately reads the CSV file. So
>> you're comparing the time of reading AND filtering the file
>> (DataFusion) with the time to filter data which has already been read
>> (Pandas).
>>
>> There's nothing wrong with your use of DataFusion per se, you simply
>> weren't measuring what you thought you were measuring.
>> --
>> Michael Mior
>> mmior@apache.org
>>
>> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <
>> matthew.m.turner@outlook.com> a écrit :
>> >
>> > Hello,
>> >
>> >
>> >
>> > I’ve been playing around with DataFusion to explore the feasibility of
>> replacing current python/pandas data processing jobs with Rust/datafusion.
>> Ultimately, looking to improve performance / decrease cost.
>> >
>> >
>> >
>> > I was doing some simple tests to start to measure performance
>> differences on a simple task (read a csv[1] and filter it).
>> >
>> >
>> >
>> > Reading the csv datafusion seemed to outperform pandas by around 30%
>> which was nice.
>> >
>> > *Rust took around 20-25ms to read the csv (compared to 32ms from
>> > pandas)
>> >
>> >
>> >
>> > However, when filtering the data I was surprised to see that pandas was
>> way faster.
>> >
>> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
>> > pandas)
>> >
>> >
>> >
>> > My code for each is below.  I know I should be running the DataFusion
>> times through something similar to pythons %timeit but I didn’t have that
>> immediately accessible and I ran many times to confirm it was roughly
>> consistent.
>> >
>> >
>> >
>> > Is this performance expected? Or am I using datafusion incorrectly?
>> >
>> >
>> >
>> > Any insight is much appreciated!
>> >
>> >
>> >
>> > [Rust]
>> >
>> > ```
>> >
>> > use datafusion::error::Result;
>> >
>> > use datafusion::prelude::*;
>> >
>> > use std::time::Instant;
>> >
>> >
>> >
>> > #[tokio::main]
>> >
>> > async fn main() -> Result<()> {
>> >
>> >     let start = Instant::now();
>> >
>> >
>> >
>> >     let mut ctx = ExecutionContext::new();
>> >
>> >
>> >
>> >     let ratings_csv = "ratings_small.csv";
>> >
>> >
>> >
>> >     let df = ctx.read_csv(ratings_csv,
>> > CsvReadOptions::new()).unwrap();
>> >
>> >     println!("Read CSV Duration: {:?}", start.elapsed());
>> >
>> >
>> >
>> >     let q_start = Instant::now();
>> >
>> >     let results = df
>> >
>> >         .filter(col("userId").eq(lit(1)))?
>> >
>> >         .collect()
>> >
>> >         .await
>> >
>> >         .unwrap();
>> >
>> >     println!("Filter duration: {:?}", q_start.elapsed());
>> >
>> >
>> >
>> >     println!("Duration: {:?}", start.elapsed());
>> >
>> >
>> >
>> >     Ok(())
>> >
>> > }
>> >
>> > ```
>> >
>> >
>> >
>> > [Python]
>> >
>> > ```
>> >
>> > In [1]: df = pd.read_csv(“ratings_small.csv”)
>> >
>> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>> >
>> >
>> >
>> > In [2]: df.query(“userId==1”)
>> >
>> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
>> > each)
>> >
>> > ```
>> >
>> >
>> >
>> > [1]:
>> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
>> > sv
>> >
>> >
>> >
>> >
>> >
>> > Matthew M. Turner
>> >
>> > Email: matthew.m.turner@outlook.com
>> >
>> > Phone: (908)-868-2786
>> >
>> >
>>
>>

-- 
Daniël Heres

Mime
View raw message