arrow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Antoine Pitrou <>
Subject Re: [DISCUSS][C++][Proposal] Threading engine for Arrow
Date Fri, 03 May 2019 08:48:59 GMT

Hi Anton,

Another possibility is to look at our C++ CSV reader and parser (in
src/arrow/csv).  It's the only piece of Arrow that uses non-trivial
multi-threading right now (with tasks spawning new tasks dynamically,
see InferringColumnBuilder).  It's based on the ThreadPool and TaskGroup
APIs (in src/arrow/util/).  These APIs are not set in stone, so you're
free to propose changes to make them fit better with a TBB-based



Le 03/05/2019 à 01:42, Malakhov, Anton a écrit :
> Thanks Wes!
> Sounds like a good way to go! We'll create a demo, as you suggested, implementing a parallel
execution model for a simple analytics pipeline that reads and processes the files. My only
concern is about adding more pipeline breaker nodes and compute intensive operations into
this demo because min/max are effectively no-ops fused into I/O scan node. What do you think
about adding group-by into this picture, effectively implementing NY taxi and/or mortgage
benchmarks? Ideally, I'd like to go even further and add sci-kit learn-like stuff for processing
that data in order to demonstrate the co-existence side of the story. What do you think?
> So, the idea of the prototype will be to validate the parallel execution model as the
first step. After that, it'll help to shape API for both - execution nodes and the threading
backend. Does it sound right to you?
> P.S. I can well understand your hesitation about using TBB directly and as non-optional
dependency, thus I'm suggesting threading layers approach here. Please let me clarify myself,
using TBB and nested parallelism is non-goal by itself. The goal is to build components of
efficient execution model, which coexist well with each other and with all the other, external
to Arrow, components of an applications. However, without a rich, composable, and mature parallel
toolkit, it is hard to achieve and to focus on this goal. Thus, I wanted to check with the
community if it is an acceptable way at all and what's the roadmap.
> Thanks,
> // Anton
> -----Original Message-----
> From: Wes McKinney [] 
> Sent: Thursday, May 2, 2019 13:52
> To:
> Subject: Re: [DISCUSS][C++][Proposal] Threading engine for Arrow
> hi Anton,
> Thank you for bringing your expertise to the project -- this is a very useful discussion
to have.
> Partly why our threading capabilities in the project are not further developed is that
there is not much that needs to be parallelized. It would be like designing a supercharger
when you don't have a car yet.
> That being said, it is worthwhile to plan ahead so we aren't trying to retrofit significant
pieces of software to be able to take advantage of a more advanced task scheduler.
> From my perspective, we have a few key practical areas of consideration:
> * Computational tasks that may offer nested parallelism (e.g. an Aggregation or Projection
task may be able to execution in multiple
> threads)
> * IO operations performed from within tasks that appear to be computational in nature
(example: in the course of reading a Parquet file, both computation -- decoding, decompression
-- and IO -- local or remote filesystem operations -- must be performed). The status quo right
now is that IO performed inside a task in the thread pool is not releasing any resources to
other tasks.
> I believe that we should design and develop a sane programming model / API for implementing
our software in the presence of these challenges.
> If the backend / implementation of this API uses TBB and that makes things more efficient
than other approaches, then that sounds great to me. I would be hesitant to use TBB APIs directly
in Arrow application code unless it can be clearly demonstrated by that is a superior option
to alternatives.
> It seems useful to validate the implementation approach by starting with some practical
problems. Suppose, for the sake of argument, you want to read 10 Parquet files (constituting
a single logical dataset) as fast as possible and perform some simple analytics on them --
let's take something very simple like computing the maximum and minimum values of each column
in the dataset. This problem features both problems listed above:
> * Reading a single Parquet file can be parallelized (by columns -- since columns can
be decoded in parallel) on the global thread pool, so reading multiple files in parallel would
cause nested parallelism
> * Within the context of reading a single Parquet file column, IO calls are performed.
CPU threads sit idle while this IO is taking place, particularly if the file system is high
latency (e.g. HDFS)
> What do you think about -- as a way of moving this project forward -- developing a prototype
threading backend and developer API (for people like me to use to develop libraries like the
Parquet library) that addresses these issues? I think it could be difficult to build consensus
around a threading backend developed in the abstract.
> Thanks
> Wes
> On Tue, Apr 30, 2019 at 9:28 PM Malakhov, Anton <> wrote:
>> Hi dear Arrow developers, Antoine,
>> I'd like to kick off the discussion of the threading engine that Arrow can use underneath
for implementing multicore parallelism for execution nodes, kernels, and/or all the functions,
which can be optimized this way.
>> I've documented some ideas on Arrow's Confluence Wiki: 
>> ngine The bottom line is that while Arrow is moving into the right 
>> direction introducing shared thread pool, there are some questions and concerns about
current implementation and the way how it is supposed to co-exist with other threaded libraries
("threading composability") while providing efficient nestable NUMA&cache-aware data and
data-flow parallelism.
>> I suggest to introduce threading layers like in other libraries like MKL and Numba,
starting with TBB-based layer. Or maybe even use TBB directly. In short, there are the following
arguments for it:
>> 1.      Designed for composability from day zero. Avoids mandatory parallelism. Provides
work stealing and FIFO scheduling. Compatible with parallel depth first scheduling (a better
composability research).
>> 2.      TBB Flow Graph. It fits nicely into data flow and execution nodes model of
SQL databases. Besides basic nodes needed for implementing an execution engine, it also provides
a foundation for heterogeneous and distributed computing (async_node, opencl_node, distributed_node)
>> 3.      Arrow's ThreadPool, TaskGroup, and ParallelFor have direct equivalent in
TBB: task_arena, task_group, and parallel_for while providing mature and performant implementation,
which solves many if not all of the XXX todo notes in the comments like exceptions, singletons
and time of initialization, lock-free.
>> 4.      Concurrent hash tables, queues, vector and other concurrent containers. Hash
tables are required for implementing parallel versions of joins, groupby, uniq, dictionary
operations. There is a contribution to integrate libcuckoo under TBB interface.
>> 5.      TBB scalable malloc and memory pools, which can use any user-provided memory
chunk for scalable allocation. Arrow uses jemalloc, which is slower in some cases than tbbmalloc
or tcmalloc.
>> 6.      OpenMP is good for NUMA with static schedule, however, there is no good answer
for dynamic tasks, graphs. TBB provides tools for implementing NUMA support: task_arena, task_scheduler_observer,
task affinity & priorities, committed to improve NUMA for its other customers in 2019.
>> 7.      TBB is licensed under Apache 2.0, has conda-forge feedstock, supports CMake,
it's adopted for CPU scheduling by other industry players, has multiple ports for other OSes
and CPU arches.
>> Full disclosure: I was TBB developer before its 1.0 version, responsible for multiple
core components like hash tables, adaptive partitioning, interfaces of memory pools and task_arena,
all of these are very relevant to Arrow. I've background in scalability and NUMA-aware performance
optimization like what we did for OpenCL runtime for CPU (TBB-based). I also was behind optimizations
for Intel Distribution for Python and its threading composability story<>.
Thus, I'm sincerely hope to reuse all these stuff in order to deliver the best performance
for Arrow.
>> Best regards,
>> Anton Malakhov<>
>> IAGS Scripting Analyzers & Tools
>> O: +1-512-3620-512
>> 1300 S. MoPac Expy
>> Office:  AN4-C1-D4
>> Austin, TX 78746
>> Intel Corporation |

View raw message