arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: PyArrow: Best approach to concurrency when using read_table()?
Date Sat, 18 Jul 2020 16:49:23 GMT
Hi Matthew,
I'm not an expert in this area but a to answer to try to answer your
questions:

does PyArrow provide any primitives that make it easier to do this without
> the typical serialization overhead/limits?  From the docs it seems that
> there might be some APIs that would ease sharing memory between
> subprocesses but I'm not sure where to start.


As far as I'm aware python doesn't provide shared memory primitives that
would make this absolutely zero costThe easiest things to potentially do
this with minimal overhead (and share similar underlying infrastructure):
1.  Write serialized tables from the child process to well known locations
that the parent process can read from via memory mapping.  This could be
done using python's shared memory [1].
2.  Use Plasma to transfer objects.
3.  It doesn't exist yet to my knowledge, but a MemoryPool [2] that
allocates directly from shared memory would be a useful contribution so one
could simple pass back the pointers to buffers.


I'm not sure about the second question.


[1] https://docs.python.org/3/library/multiprocessing.shared_memory.html
[2] https://arrow.apache.org/docs/python/generated/pyarrow.MemoryPool.html


On Wed, Jul 15, 2020 at 3:43 PM Matthew Corley <mattcorley@gmail.com> wrote:

> Currently, we are developing a Python client that reads parquet data
> stored on S3 using PyArrow as an intermediary.  I am mostly looking to
> describe how we are using PyArrow, get any feedback with respect to what we
> might be missing, and also get some pointers about the best way to use
> PyArrow in concurrent Python applications.
>
> *Background: *
> Our data tends to be quite wide (many hundreds of thousands up to millions
> of columns) with mostly high cardinality columns.  So, we have vertically
> sharded our data, where each shard contains 1000 columns of data.
>
> Within each shard, the data is further partitioned into several parquet
> files that share the same schema (primarily done to ease the burden on data
> write by ensuring that each final parquet file is ~1gb in size).
>
> More concretely, the prefix structure on S3 might look something like this:
> data/
>   vertical_shard_id=1/
>     00001.parquet
>     00002.parquet
>
> My understanding of the current implementation is that this call read_table(source="s3://data/vertical_shard=1/",
> columns=["col1", "col2", ..."colN"], use_threads=True,
> use_legacy_dataset=True, filesystem=s3fs) evaluates as follows:
>    - read each parquet file (00001.parquet, etc) as a PyArrow table in
> sequence:
>        for each file, columns are read concurrently because
> use_threads=True
>    - concat the tables together and return a single table
>
> *My first question:*
> Ignoring for a moment the question of whether this is a good idea for
> performance... if I were to implement my own version of read_table() that
> concurrently reads each parquetfile within a vertical shard into a table,
> e.g. using multiprocessing, and then concats them in the parent process,
> does PyArrow provide any primitives that make it easier to do this without
> the typical serialization overhead/limits?  From the docs it seems that
> there might be some APIs that would ease sharing memory between
> subprocesses but I'm not sure where to start.
>
> *My second question: *
> Right now, our client handles requests for data that are distributed
> across vertical shards by, in sequence, for each shard:
>  - read shard using read_table()
>  - convert to pandas dataframe via to_pandas()
>  - filter and post-process as needed (to some extent, a work-around for
> lack of rowgroup predicate pushdown when we initially started using pyarrow)
>
> If we were to push the reading + processing/filtering of each shard off
> into its own subprocess using multiprocessing, what is the best way to
> share each dataframe back to the parent process (minimizing
> copying/serialization overhead/etc)?  In particular, I wondered if
> https://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects might
> in some way prove useful, but I wasn't sure I fully understand the use
> cases from the documentation.
>

Mime
View raw message