Our data tends to be quite wide (many hundreds of thousands up to millions of columns) with mostly high cardinality columns. So, we have vertically sharded our data, where each shard contains 1000 columns of data.
Within each shard, the data is further partitioned into several parquet files that share the same schema (primarily done to ease the burden on data write by ensuring that each final parquet file is ~1gb in size).
More concretely, the prefix structure on S3 might look something like this:data/
My understanding of the current implementation is that this call read_table(source="s3://data/vertical_shard=1/", columns=["col1", "col2", ..."colN"], use_threads=True, use_legacy_dataset=True, filesystem=s3fs)
evaluates as follows:
- read each parquet file (00001.parquet
, etc) as a PyArrow table in sequence:
for each file, columns are read concurrently because use_threads=True
- concat the tables together and return a single tableMy first question:
Ignoring for a moment the question of whether this is a good idea for performance... if I were to implement my own version of read_table()
that concurrently reads each parquetfile within a vertical shard into a table, e.g. using multiprocessing, and then concats them in the parent process, does PyArrow provide any primitives that make it easier to do this without the typical serialization overhead/limits? From the docs it seems that there might be some APIs that would ease sharing memory between subprocesses but I'm not sure where to start.My second question:
Right now, our client handles requests for data that are distributed across vertical shards by, in sequence, for each shard:
- read shard using read_table()
- convert to pandas dataframe via to_pandas()
- filter and post-process as needed (to some extent, a work-around for lack of rowgroup predicate pushdown when we initially started using pyarrow)
If we were to push the reading + processing/filtering of each shard off into its own subprocess using multiprocessing, what is the best way to share each dataframe back to the parent process (minimizing copying/serialization overhead/etc)? In particular, I wondered if https://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects
might in some way prove useful, but I wasn't sure I fully understand the use cases from the documentation.