arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matthew Corley <>
Subject PyArrow: Best approach to concurrency when using read_table()?
Date Wed, 15 Jul 2020 22:43:23 GMT
Currently, we are developing a Python client that reads parquet data stored
on S3 using PyArrow as an intermediary.  I am mostly looking to describe
how we are using PyArrow, get any feedback with respect to what we might be
missing, and also get some pointers about the best way to use PyArrow in
concurrent Python applications.

*Background: *
Our data tends to be quite wide (many hundreds of thousands up to millions
of columns) with mostly high cardinality columns.  So, we have vertically
sharded our data, where each shard contains 1000 columns of data.

Within each shard, the data is further partitioned into several parquet
files that share the same schema (primarily done to ease the burden on data
write by ensuring that each final parquet file is ~1gb in size).

More concretely, the prefix structure on S3 might look something like this:

My understanding of the current implementation is that this call
columns=["col1", "col2", ..."colN"], use_threads=True,
use_legacy_dataset=True, filesystem=s3fs) evaluates as follows:
   - read each parquet file (00001.parquet, etc) as a PyArrow table in
       for each file, columns are read concurrently because
   - concat the tables together and return a single table

*My first question:*
Ignoring for a moment the question of whether this is a good idea for
performance... if I were to implement my own version of read_table() that
concurrently reads each parquetfile within a vertical shard into a table,
e.g. using multiprocessing, and then concats them in the parent process,
does PyArrow provide any primitives that make it easier to do this without
the typical serialization overhead/limits?  From the docs it seems that
there might be some APIs that would ease sharing memory between
subprocesses but I'm not sure where to start.

*My second question: *
Right now, our client handles requests for data that are distributed across
vertical shards by, in sequence, for each shard:
 - read shard using read_table()
 - convert to pandas dataframe via to_pandas()
 - filter and post-process as needed (to some extent, a work-around for
lack of rowgroup predicate pushdown when we initially started using pyarrow)

If we were to push the reading + processing/filtering of each shard off
into its own subprocess using multiprocessing, what is the best way to
share each dataframe back to the parent process (minimizing
copying/serialization overhead/etc)?  In particular, I wondered if might
in some way prove useful, but I wasn't sure I fully understand the use
cases from the documentation.

View raw message