From user-return-528-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Wed Jul 15 22:43:40 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5B6741804BB for ; Thu, 16 Jul 2020 00:43:40 +0200 (CEST) Received: (qmail 53226 invoked by uid 500); 15 Jul 2020 22:43:39 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 53215 invoked by uid 99); 15 Jul 2020 22:43:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jul 2020 22:43:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id CAA0B1813E0 for ; Wed, 15 Jul 2020 22:43:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id SQRl_B-ibpzJ for ; Wed, 15 Jul 2020 22:43:37 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2a00:1450:4864:20::542; helo=mail-ed1-x542.google.com; envelope-from=mattcorley@gmail.com; receiver= Received: from mail-ed1-x542.google.com (mail-ed1-x542.google.com [IPv6:2a00:1450:4864:20::542]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 8E2057F6DB for ; Wed, 15 Jul 2020 22:43:36 +0000 (UTC) Received: by mail-ed1-x542.google.com with SMTP id dm19so2987012edb.13 for ; Wed, 15 Jul 2020 15:43:36 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=12iB4u0MrJ493zT8pOLpS7DK4HituAVw0ma8y4cUI0o=; b=Bx64BxfnqHSuXDJet1krFYhRvFgEwITfeeSYaQrNmhwg3Dknkz/st6G/hwUzlN6b0+ c44kgG8RAwp11zE3Xxt8c0PIk4xvaH7WpMnWaP2j0hxYX1Dl6BkWZA82EQ/UQg3X/w0i m3OszTrjO2D3R21BJ0rPJ6qfS7cCy5k2EbGlhqYBz3QZuKpNH69btR2BEcvEEh7iQGwz ZvSmJ0i7lVcE47P6XXN/G4JI/113FQNrJ27oSssKJCorYFLp3rajed2Ox8pITKSnL3Kz xmwW0aoVF/PU3xFc+BTU6lgHwZ0qsRy4nyZ50EYDSPIT/rJ2O3Jw1Z3zDulBUelGo+G5 s8eA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=12iB4u0MrJ493zT8pOLpS7DK4HituAVw0ma8y4cUI0o=; b=YuftubgeEKjZz7a6J+i/gx4LmXA8pfEYqpZFsULAV6yiTBGc4Tne5Oi9FSTWpgKly5 MJQjr+OfznQ2ql5D2Ccc4ES+2qi+yZWBkehyOuR5T3ENw+iZSU68ENojiEzaRT40XLKk niIY4DsGwmWuAvUTP/m/rkWvBNmmud6vC5/G4UN1T6uXNreGU06EQgyV4jezf1gu70+V IlW6QzTPAzlP6WxQAJquCbXyOp01aAOSIfLhFWdvwpbWusE3OgK4J05HHUYt2w8wiERC hKtCcofyCjqeBR9XHw1jR4mLryRpiP0r7ozZlYsaafEXlBykDnnazA/5gWSRW/H09Boy eoZQ== X-Gm-Message-State: AOAM530e9sQJob83bTcYc1e3tMNwMgT0AAyZCsG3lb4Ok3DbSvNNuI5z AEpnhlQlNZh8NCsaixUieIp6kmiYo1Jtnj7u9a3Mw7tE9tI= X-Google-Smtp-Source: ABdhPJyzS9LW9vzt0tOWvxG+JqDoyjwTSd5O3aNVQ6e4/9CvoUM765p58D5HEN5wS7gvBT5i4bIEIorBJzi/mK79q5s= X-Received: by 2002:aa7:c6d3:: with SMTP id b19mr1705899eds.207.1594853015516; Wed, 15 Jul 2020 15:43:35 -0700 (PDT) MIME-Version: 1.0 From: Matthew Corley Date: Wed, 15 Jul 2020 15:43:23 -0700 Message-ID: Subject: PyArrow: Best approach to concurrency when using read_table()? To: user@arrow.apache.org Content-Type: multipart/alternative; boundary="000000000000757a6d05aa82a764" --000000000000757a6d05aa82a764 Content-Type: text/plain; charset="UTF-8" Currently, we are developing a Python client that reads parquet data stored on S3 using PyArrow as an intermediary. I am mostly looking to describe how we are using PyArrow, get any feedback with respect to what we might be missing, and also get some pointers about the best way to use PyArrow in concurrent Python applications. *Background: * Our data tends to be quite wide (many hundreds of thousands up to millions of columns) with mostly high cardinality columns. So, we have vertically sharded our data, where each shard contains 1000 columns of data. Within each shard, the data is further partitioned into several parquet files that share the same schema (primarily done to ease the burden on data write by ensuring that each final parquet file is ~1gb in size). More concretely, the prefix structure on S3 might look something like this: data/ vertical_shard_id=1/ 00001.parquet 00002.parquet My understanding of the current implementation is that this call read_table(source="s3://data/vertical_shard=1/", columns=["col1", "col2", ..."colN"], use_threads=True, use_legacy_dataset=True, filesystem=s3fs) evaluates as follows: - read each parquet file (00001.parquet, etc) as a PyArrow table in sequence: for each file, columns are read concurrently because use_threads=True - concat the tables together and return a single table *My first question:* Ignoring for a moment the question of whether this is a good idea for performance... if I were to implement my own version of read_table() that concurrently reads each parquetfile within a vertical shard into a table, e.g. using multiprocessing, and then concats them in the parent process, does PyArrow provide any primitives that make it easier to do this without the typical serialization overhead/limits? From the docs it seems that there might be some APIs that would ease sharing memory between subprocesses but I'm not sure where to start. *My second question: * Right now, our client handles requests for data that are distributed across vertical shards by, in sequence, for each shard: - read shard using read_table() - convert to pandas dataframe via to_pandas() - filter and post-process as needed (to some extent, a work-around for lack of rowgroup predicate pushdown when we initially started using pyarrow) If we were to push the reading + processing/filtering of each shard off into its own subprocess using multiprocessing, what is the best way to share each dataframe back to the parent process (minimizing copying/serialization overhead/etc)? In particular, I wondered if https://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects might in some way prove useful, but I wasn't sure I fully understand the use cases from the documentation. --000000000000757a6d05aa82a764 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Currently, we are developing a Python client that reads pa= rquet data stored on S3 using PyArrow as an intermediary.=C2=A0 I am mostly= looking to describe how we are using PyArrow, get any feedback with respec= t to what we might be missing, and also get some pointers about the best wa= y to use PyArrow in concurrent Python applications.

B= ackground:=C2=A0
Our data tends to be quite wide (many hundreds of t= housands up to millions of columns) with mostly high cardinality columns.= =C2=A0 So, we have vertically sharded our data, where each shard contains 1= 000 columns of data.

Within each shard, the data is further partitio= ned into several parquet files that share the same schema (primarily done t= o ease the burden on data write by ensuring that each final parquet file is= ~1gb in size).

More concretely, the prefix structure on S3 might lo= ok something like this:
data/
=C2=A0 vertica= l_shard_id=3D1/
=C2=A0 =C2=A0 00001.parquet
=C2=A0 =C2=A0 00002.parqu= et


My understanding of the current implementation is that thi= s call=C2=A0read_table(source=3D"s3://data/ve= rtical_shard=3D1/", columns=3D["col1", "col2", ...= "colN"], use_threads=3DTrue, use_legacy_dataset=3DTrue, filesyste= m=3Ds3fs)=C2=A0evaluates as follows:
=C2=A0 =C2=A0- read each par= quet file (00001.parquet, etc) as a PyArrow= table in sequence:
=C2=A0 =C2=A0 =C2=A0 =C2=A0for each file, columns ar= e read concurrently because use_threads=3DTrue=C2=A0
=C2=A0 =C2=A0- conc= at the tables together and return a single table

My first questio= n:
Ignoring for a moment the question of whether this is a good idea= for performance... if I were to implement my own version of=C2=A0read_table()=C2=A0that concurrently reads each parqu= etfile within a vertical shard into a table, e.g. using multiprocessing, an= d then concats them in the parent process, does PyArrow provide any primiti= ves that make it easier to do this without the typical serialization overhe= ad/limits?=C2=A0 From the docs it seems that there might be some APIs that = would ease sharing memory between subprocesses but I'm not sure where t= o start.

My second question:=C2=A0
Right now, our client h= andles requests for data that are distributed across vertical shards by, in= sequence, for each shard:
=C2=A0- read shard using=C2=A0read_table()
=C2=A0- convert to pandas dataframe via=C2= =A0to_pandas()
=C2=A0- filter and post-p= rocess as needed (to some extent, a work-around for lack of rowgroup predic= ate pushdown when we initially started using pyarrow)

If we were to = push the reading=C2=A0+ processing/filtering of each shard off into its own= subprocess using multiprocessing, what is the best way to share each dataf= rame back to the parent process (minimizing copying/serialization overhead/= etc)?=C2=A0 In particular, I wondered if=C2=A0h= ttps://arrow.apache.org/docs/python/ipc.html#serializing-pandas-objects= =C2=A0might in some way prove useful, but I wasn't sure I fully underst= and the use cases from the documentation.
--000000000000757a6d05aa82a764--