arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-428: [Python] Multithreaded conversion from Arrow table to pandas.DataFrame
Date Wed, 28 Dec 2016 12:49:15 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 1079a3206 -> ab5f66a2e


ARROW-428: [Python] Multithreaded conversion from Arrow table to pandas.DataFrame

This yields a substantial speedup on my laptop. On a 1GB numeric dataset, with 1 thread (the
default prior to this patch):

```
>>> %timeit df2 = table.to_pandas(nthreads=1)
1 loop, best of 3: 498 ms per loop
```

With 4 threads (this is a true quad-core machine)

```
>>> %timeit df2 = table.to_pandas(nthreads=4)
1 loop, best of 3: 151 ms per loop
```

The default number of cores used is the `os.cpu_count` divided by 2 (since hyperthreading
doesn't help with this largely memory-bound operation).

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #252 from wesm/ARROW-428 and squashes the following commits:

da929bf [Wes McKinney] Factor out common compiler flag code between Arrow C++ and Python CMake
files. Add pyarrow.cpu_count/set_cpu_count functions per feedback
cad89e9 [Wes McKinney] Tweak pyarrow cmake flags
e70f16d [Wes McKinney] Add missing GIL acquisition. Do not spawn too many threads if few columns
bc4dff7 [Wes McKinney] Return errors from threaded conversion. Add doc about number of cpus
used
79f5fd9 [Wes McKinney] Implement multithreaded conversion from Arrow table to pandas.DataFrame.
Default to multiprocessing.cpu_count for now


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ab5f66a2
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ab5f66a2
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ab5f66a2

Branch: refs/heads/master
Commit: ab5f66a2e9a2b6af312ffdfa2f95c65b1d6f5739
Parents: 1079a32
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Wed Dec 28 07:49:06 2016 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Wed Dec 28 07:49:06 2016 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                          |  71 +------------
 cpp/cmake_modules/SetupCxxFlags.cmake       |  86 ++++++++++++++++
 python/CMakeLists.txt                       |  36 +------
 python/pyarrow/__init__.py                  |   1 +
 python/pyarrow/config.pyx                   |  23 +++++
 python/pyarrow/table.pyx                    |  38 +++----
 python/pyarrow/tests/test_convert_pandas.py |  42 ++++++--
 python/src/pyarrow/adapters/pandas.cc       | 121 ++++++++++++++++-------
 8 files changed, 250 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 93e9853..4507e67 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -105,76 +105,7 @@ endif()
 # Compiler flags
 ############################################################
 
-# Check if the target architecture and compiler supports some special
-# instruction sets that would boost performance.
-include(CheckCXXCompilerFlag)
-# x86/amd64 compiler flags
-CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3)
-# power compiler flags
-CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC)
-
-# compiler flags that are common across debug/release builds
-#  - Wall: Enable all warnings.
-set(CXX_COMMON_FLAGS "-std=c++11 -Wall")
-
-# Only enable additional instruction sets if they are supported
-if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3)
-    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3")
-endif()
-if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC)
-    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec")
-endif()
-
-if (APPLE)
-  # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be
-  # the default standard library which does not support C++11. libc++ is the
-  # default from 10.9 onward.
-  set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++")
-endif()
-
-# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
-# For all builds:
-# For CMAKE_BUILD_TYPE=Debug
-#   -ggdb: Enable gdb debugging
-# For CMAKE_BUILD_TYPE=FastDebug
-#   Same as DEBUG, except with some optimizations on.
-# For CMAKE_BUILD_TYPE=Release
-#   -O3: Enable all compiler optimizations
-#   -g: Enable symbols for profiler tools (TODO: remove for shipping)
-if (NOT MSVC)
-  set(CXX_FLAGS_DEBUG "-ggdb -O0")
-  set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
-  set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
-endif()
-
-set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate")
-set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use")
-
-# if no build build type is specified, default to debug builds
-if (NOT CMAKE_BUILD_TYPE)
-  set(CMAKE_BUILD_TYPE Debug)
-endif(NOT CMAKE_BUILD_TYPE)
-
-string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
-
-
-# Set compile flags based on the build type.
-message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
-if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN")
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD")
-  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}")
-else()
-  message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
-endif ()
-
-message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")
+include(SetupCxxFlags)
 
 # Add common flags
 set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/cmake_modules/SetupCxxFlags.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
new file mode 100644
index 0000000..ee672bd
--- /dev/null
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if the target architecture and compiler supports some special
+# instruction sets that would boost performance.
+include(CheckCXXCompilerFlag)
+# x86/amd64 compiler flags
+CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3)
+# power compiler flags
+CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC)
+
+# compiler flags that are common across debug/release builds
+#  - Wall: Enable all warnings.
+set(CXX_COMMON_FLAGS "-std=c++11 -Wall")
+
+# Only enable additional instruction sets if they are supported
+if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3)
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3")
+endif()
+if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC)
+    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec")
+endif()
+
+if (APPLE)
+  # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be
+  # the default standard library which does not support C++11. libc++ is the
+  # default from 10.9 onward.
+  set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++")
+endif()
+
+# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
+# For all builds:
+# For CMAKE_BUILD_TYPE=Debug
+#   -ggdb: Enable gdb debugging
+# For CMAKE_BUILD_TYPE=FastDebug
+#   Same as DEBUG, except with some optimizations on.
+# For CMAKE_BUILD_TYPE=Release
+#   -O3: Enable all compiler optimizations
+#   -g: Enable symbols for profiler tools (TODO: remove for shipping)
+if (NOT MSVC)
+  set(CXX_FLAGS_DEBUG "-ggdb -O0")
+  set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
+  set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
+endif()
+
+set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate")
+set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use")
+
+# if no build build type is specified, default to debug builds
+if (NOT CMAKE_BUILD_TYPE)
+  set(CMAKE_BUILD_TYPE Debug)
+endif(NOT CMAKE_BUILD_TYPE)
+
+string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
+
+# Set compile flags based on the build type.
+message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
+if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD")
+  set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}")
+else()
+  message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
+endif ()
+
+message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 6ad55f8..6c24772 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -65,41 +65,7 @@ endif(CCACHE_FOUND)
 # Compiler flags
 ############################################################
 
-# compiler flags that are common across debug/release builds
-set(CXX_COMMON_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
-
-# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
-# For all builds:
-# For CMAKE_BUILD_TYPE=Debug
-#   -ggdb: Enable gdb debugging
-# For CMAKE_BUILD_TYPE=FastDebug
-#   Same as DEBUG, except with some optimizations on.
-# For CMAKE_BUILD_TYPE=Release
-#   -O3: Enable all compiler optimizations
-#   -g: Enable symbols for profiler tools (TODO: remove for shipping)
-#   -DNDEBUG: Turn off dchecks/asserts/debug only code.
-set(CXX_FLAGS_DEBUG "-ggdb -O0")
-set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
-set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
-
-# if no build build type is specified, default to debug builds
-if (NOT CMAKE_BUILD_TYPE)
-  set(CMAKE_BUILD_TYPE Debug)
-endif(NOT CMAKE_BUILD_TYPE)
-
-string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
-
-# Set compile flags based on the build type.
-message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
-if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG})
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG})
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
-  set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE})
-else()
-  message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
-endif ()
+include(SetupCxxFlags)
 
 # Add common flags
 set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 9ede934..6f81ef4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -26,6 +26,7 @@ except DistributionNotFound:
 
 
 import pyarrow.config
+from pyarrow.config import cpu_count, set_cpu_count
 
 from pyarrow.array import (Array,
                            from_pandas_series, from_pylist,

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/config.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx
index 778c15a..aa30f09 100644
--- a/python/pyarrow/config.pyx
+++ b/python/pyarrow/config.pyx
@@ -29,3 +29,26 @@ pyarrow_init()
 
 import numpy as np
 pyarrow_set_numpy_nan(np.nan)
+
+import multiprocessing
+import os
+cdef int CPU_COUNT = int(
+    os.environ.get('OMP_NUM_THREADS',
+                   max(multiprocessing.cpu_count() // 2, 1)))
+
+def cpu_count():
+    """
+    Returns
+    -------
+    count : Number of CPUs to use by default in parallel operations. Default is
+      max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
+      OMP_NUM_THREADS environment variable. For the default, we divide the CPU
+      count by 2 because most modern computers have hyperthreading turned on,
+      so doubling the CPU count beyond the number of physical cores does not
+      help.
+    """
+    return CPU_COUNT
+
+def set_cpu_count(count):
+    global CPU_COUNT
+    CPU_COUNT = max(int(count), 1)

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 9375557..20137e3 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -439,7 +439,9 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
     from pandas.core.internals import BlockManager, make_block
     from pandas import RangeIndex
 
-    check_status(pyarrow.ConvertTableToPandas(table, nthreads, &result_obj))
+    with nogil:
+        check_status(pyarrow.ConvertTableToPandas(table, nthreads,
+                                                  &result_obj))
 
     result = PyObject_to_object(result_obj)
 
@@ -610,36 +612,28 @@ cdef class Table:
         table.init(c_table)
         return table
 
-    def to_pandas(self, nthreads=1, block_based=True):
+    def to_pandas(self, nthreads=None):
         """
         Convert the arrow::Table to a pandas DataFrame
 
+        Parameters
+        ----------
+        nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
+            For the default, we divide the CPU count by 2 because most modern
+            computers have hyperthreading turned on, so doubling the CPU count
+            beyond the number of physical cores does not help
+
         Returns
         -------
         pandas.DataFrame
         """
-        cdef:
-            PyObject* arr
-            shared_ptr[CColumn] col
-            Column column
-
         import pandas as pd
 
-        if block_based:
-            mgr = table_to_blockmanager(self.sp_table, nthreads)
-            return pd.DataFrame(mgr)
-        else:
-            names = []
-            data = []
-            for i in range(self.table.num_columns()):
-                col = self.table.column(i)
-                column = self.column(i)
-                check_status(pyarrow.ConvertColumnToPandas(
-                    col, <PyObject*> column, &arr))
-                names.append(frombytes(col.get().name()))
-                data.append(PyObject_to_object(arr))
-
-            return pd.DataFrame(dict(zip(names, data)), columns=names)
+        if nthreads is None:
+            nthreads = pyarrow.config.cpu_count()
+
+        mgr = table_to_blockmanager(self.sp_table, nthreads)
+        return pd.DataFrame(mgr)
 
     @property
     def name(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index da34f85..863aa30 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -27,6 +27,29 @@ from pyarrow.compat import u
 import pyarrow as A
 
 
+def _alltypes_example(size=100):
+    return pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16),
+        'uint32': np.arange(size, dtype=np.uint32),
+        'uint64': np.arange(size, dtype=np.uint64),
+        'int8': np.arange(size, dtype=np.int16),
+        'int16': np.arange(size, dtype=np.int16),
+        'int32': np.arange(size, dtype=np.int32),
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        # TODO(wesm): Pandas only support ns resolution, Arrow supports s, ms,
+        # us, ns
+        'datetime': np.arange("2016-01-01T00:00:00.001", size,
+                              dtype='datetime64[ms]'),
+        'str': [str(x) for x in range(size)],
+        'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
+        'empty_str': [''] * size
+    })
+
+
 class TestPandasConversion(unittest.TestCase):
 
     def setUp(self):
@@ -35,10 +58,10 @@ class TestPandasConversion(unittest.TestCase):
     def tearDown(self):
         pass
 
-    def _check_pandas_roundtrip(self, df, expected=None,
+    def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
                                 timestamps_to_ms=False):
         table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms)
-        result = table.to_pandas()
+        result = table.to_pandas(nthreads=nthreads)
         if expected is None:
             expected = df
         tm.assert_frame_equal(result, expected)
@@ -217,18 +240,21 @@ class TestPandasConversion(unittest.TestCase):
 
     def test_date(self):
         df = pd.DataFrame({
-            'date': [
-                datetime.date(2000, 1, 1),
-                None,
-                datetime.date(1970, 1, 1),
-                datetime.date(2040, 2, 26)
-        ]})
+            'date': [datetime.date(2000, 1, 1),
+                     None,
+                     datetime.date(1970, 1, 1),
+                     datetime.date(2040, 2, 26)]})
         table = A.from_pandas_dataframe(df)
         result = table.to_pandas()
         expected = df.copy()
         expected['date'] = pd.to_datetime(df['date'])
         tm.assert_frame_equal(result, expected)
 
+    def test_threaded_conversion(self):
+        df = _alltypes_example()
+        self._check_pandas_roundtrip(df, nthreads=2,
+                                     timestamps_to_ms=False)
+
     # def test_category(self):
     #     repeats = 1000
     #     values = [b'foo', None, u'bar', 'qux', np.nan]

http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 899eb55..5e5826b 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -19,15 +19,18 @@
 
 #include <Python.h>
 
-#include "pyarrow/numpy_interop.h"
-
 #include "pyarrow/adapters/pandas.h"
+#include "pyarrow/numpy_interop.h"
 
+#include <algorithm>
+#include <atomic>
 #include <cmath>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <sstream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 
 #include "arrow/api.h"
@@ -1031,7 +1034,8 @@ class PandasBlock {
       : num_rows_(num_rows), num_columns_(num_columns) {}
 
   virtual Status Allocate() = 0;
-  virtual Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement)
= 0;
+  virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) = 0;
 
   PyObject* block_arr() { return block_arr_.obj(); }
 
@@ -1057,7 +1061,6 @@ class PandasBlock {
 
     block_arr_.reset(block_arr);
     placement_arr_.reset(placement_arr);
-    current_placement_index_ = 0;
 
     block_data_ = reinterpret_cast<uint8_t*>(
         PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr)));
@@ -1070,7 +1073,6 @@ class PandasBlock {
 
   int64_t num_rows_;
   int num_columns_;
-  int current_placement_index_;
 
   OwnedRef block_arr_;
   uint8_t* block_data_;
@@ -1088,11 +1090,12 @@ class ObjectBlock : public PandasBlock {
 
   Status Allocate() override { return AllocateNDArray(NPY_OBJECT); }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     PyObject** out_buffer =
-        reinterpret_cast<PyObject**>(block_data_) + current_placement_index_ * num_rows_;
+        reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_;
 
     const ChunkedArray& data = *col->data().get();
 
@@ -1108,7 +1111,7 @@ class ObjectBlock : public PandasBlock {
       return Status::NotImplemented(ss.str());
     }
 
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1122,18 +1125,19 @@ class IntBlock : public PandasBlock {
     return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type);
   }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     C_TYPE* out_buffer =
-        reinterpret_cast<C_TYPE*>(block_data_) + current_placement_index_ * num_rows_;
+        reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_;
 
     const ChunkedArray& data = *col->data().get();
 
     if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString());
}
 
     ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer);
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1153,16 +1157,16 @@ class Float32Block : public PandasBlock {
 
   Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString());
}
 
-    float* out_buffer =
-        reinterpret_cast<float*>(block_data_) + current_placement_index_ * num_rows_;
+    float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_;
 
     ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer);
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1173,11 +1177,12 @@ class Float64Block : public PandasBlock {
 
   Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     double* out_buffer =
-        reinterpret_cast<double*>(block_data_) + current_placement_index_ * num_rows_;
+        reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_;
 
     const ChunkedArray& data = *col->data().get();
 
@@ -1214,7 +1219,7 @@ class Float64Block : public PandasBlock {
 
 #undef INTEGER_CASE
 
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1225,16 +1230,17 @@ class BoolBlock : public PandasBlock {
 
   Status Allocate() override { return AllocateNDArray(NPY_BOOL); }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString());
}
 
     uint8_t* out_buffer =
-        reinterpret_cast<uint8_t*>(block_data_) + current_placement_index_ * num_rows_;
+        reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_;
 
     ConvertBooleanNoNulls(*col->data().get(), out_buffer);
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1253,11 +1259,12 @@ class DatetimeBlock : public PandasBlock {
     return Status::OK();
   }
 
-  Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override
{
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
     Type::type type = col->type()->type;
 
     int64_t* out_buffer =
-        reinterpret_cast<int64_t*>(block_data_) + current_placement_index_ * num_rows_;
+        reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_;
 
     const ChunkedArray& data = *col.get()->data();
 
@@ -1283,7 +1290,7 @@ class DatetimeBlock : public PandasBlock {
       return Status::NotImplemented(col->type()->ToString());
     }
 
-    placement_data_[current_placement_index_++] = placement;
+    placement_data_[rel_placement] = abs_placement;
     return Status::OK();
   }
 };
@@ -1333,6 +1340,7 @@ class DataFrameBlockCreator {
 
   Status Convert(int nthreads, PyObject** output) {
     column_types_.resize(table_->num_columns());
+    column_block_placement_.resize(table_->num_columns());
     type_counts_.clear();
     blocks_.clear();
 
@@ -1397,7 +1405,9 @@ class DataFrameBlockCreator {
       }
 
       auto it = type_counts_.find(output_type);
+      int block_placement = 0;
       if (it != type_counts_.end()) {
+        block_placement = it->second;
         // Increment count
         it->second += 1;
       } else {
@@ -1406,6 +1416,7 @@ class DataFrameBlockCreator {
       }
 
       column_types_[i] = output_type;
+      column_block_placement_[i] = block_placement;
     }
     return Status::OK();
   }
@@ -1421,22 +1432,61 @@ class DataFrameBlockCreator {
   }
 
   Status WriteTableToBlocks(int nthreads) {
-    if (nthreads > 1) {
-      return Status::NotImplemented("multithreading not yet implemented");
-    }
+    auto WriteColumn = [this](int i) {
+      std::shared_ptr<Column> col = this->table_->column(i);
+      PandasBlock::type output_type = this->column_types_[i];
 
-    for (int i = 0; i < table_->num_columns(); ++i) {
-      std::shared_ptr<Column> col = table_->column(i);
-      PandasBlock::type output_type = column_types_[i];
+      int rel_placement = this->column_block_placement_[i];
+
+      auto it = this->blocks_.find(output_type);
+      if (it == this->blocks_.end()) { return Status::KeyError("No block allocated");
}
+      return it->second->Write(col, i, rel_placement);
+    };
 
-      auto it = blocks_.find(output_type);
-      if (it == blocks_.end()) { return Status::KeyError("No block allocated"); }
-      RETURN_NOT_OK(it->second->WriteNext(col, i));
+    nthreads = std::min<int>(nthreads, table_->num_columns());
+
+    if (nthreads == 1) {
+      for (int i = 0; i < table_->num_columns(); ++i) {
+        RETURN_NOT_OK(WriteColumn(i));
+      }
+    } else {
+      std::vector<std::thread> thread_pool;
+      thread_pool.reserve(nthreads);
+      std::atomic<int> task_counter(0);
+
+      std::mutex error_mtx;
+      bool error_occurred = false;
+      Status error;
+
+      for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+        thread_pool.emplace_back(
+            [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]()
{
+              int column_num;
+              while (!error_occurred) {
+                column_num = task_counter.fetch_add(1);
+                if (column_num >= this->table_->num_columns()) { break; }
+                Status s = WriteColumn(column_num);
+                if (!s.ok()) {
+                  std::lock_guard<std::mutex> lock(error_mtx);
+                  error_occurred = true;
+                  error = s;
+                  break;
+                }
+              }
+            });
+      }
+      for (auto&& thread : thread_pool) {
+        thread.join();
+      }
+
+      if (error_occurred) { return error; }
     }
     return Status::OK();
   }
 
   Status GetResultList(PyObject** out) {
+    PyAcquireGIL lock;
+
     auto num_blocks = static_cast<Py_ssize_t>(blocks_.size());
     PyObject* result = PyList_New(num_blocks);
     RETURN_IF_PYERROR();
@@ -1463,8 +1513,13 @@ class DataFrameBlockCreator {
 
  private:
   std::shared_ptr<Table> table_;
+
+  // column num -> block type id
   std::vector<PandasBlock::type> column_types_;
 
+  // column num -> relative placement within internal block
+  std::vector<int> column_block_placement_;
+
   // block type -> type count
   std::unordered_map<int, int> type_counts_;
 


Mime
View raw message