arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1594: [Python] Multithreaded conversions to Arrow in from_pandas
Date Sun, 08 Oct 2017 15:45:13 GMT
Repository: arrow
Updated Branches:
  refs/heads/master e31c2e376 -> 208e79812


ARROW-1594: [Python] Multithreaded conversions to Arrow in from_pandas

This results in nice speedups when column conversions do not require GIL to be held:

```python
In [5]: import numpy as np

In [6]: import pandas as pd

In [7]: import pyarrow as pa

In [8]: NROWS = 1000000

In [9]: NCOLS = 50

In [10]: arr = np.random.randn(NCOLS, NROWS).T

In [11]: arr[::5] = np.nan

In [12]: df = pd.DataFrame(arr)

In [13]: %timeit rb = pa.RecordBatch.from_pandas(df, nthreads=1)
10 loops, best of 3: 179 ms per loop

In [14]: %timeit rb = pa.RecordBatch.from_pandas(df, nthreads=4)
10 loops, best of 3: 59.7 ms per loop
```

This introduces a dependency on the `futures` Python 2.7 backport of concurrent.futures (PSF
license)

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1186 from wesm/multithreaded-from-pandas and squashes the following commits:

a3072f0e [Wes McKinney] Only install futures on py2
c30e4735 [Wes McKinney] Add heuristic to use threadpool conversion only if nrows > ncols
* 100
5a692085 [Wes McKinney] Only install concurrent.futures backport on py2, test serialize_pandas
with nthreads
0afab342 [Wes McKinney] Add nthreads argument to serialize_pandas, make default for serialize/deserialize
consistent
15841d13 [Wes McKinney] Default to cpu_count() for nthreads in from_pandas to conform with
to_pandas default
6a58c038 [Wes McKinney] Add nthreads argument to RecordBatch/Table.from_pandas. Use concurrent.futures
for parallel processing


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/208e7981
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/208e7981
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/208e7981

Branch: refs/heads/master
Commit: 208e79812b5d98f9cd31f4f9ed9e74e6f76f24fd
Parents: e31c2e3
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Sun Oct 8 11:45:06 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Oct 8 11:45:06 2017 -0400

----------------------------------------------------------------------
 ci/travis_script_python.sh                  |  4 +++
 python/pyarrow/ipc.py                       | 13 +++++---
 python/pyarrow/pandas_compat.py             | 42 +++++++++++++++++++-----
 python/pyarrow/table.pxi                    | 17 +++++++---
 python/pyarrow/tests/test_convert_pandas.py | 11 +++++--
 python/pyarrow/tests/test_ipc.py            |  2 +-
 python/setup.py                             | 12 +++++--
 7 files changed, 76 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 6941543..97bde1a 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -68,6 +68,10 @@ popd
 # Other stuff pip install
 pushd $ARROW_PYTHON_DIR
 
+if [ "$PYTHON_VERSION" == "2.7" ]; then
+  pip install futures
+fi
+
 pip install -r requirements.txt
 python setup.py build_ext --with-parquet --with-plasma \
        install --single-version-externally-managed --record=record.text

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 6eb4979..8cb6cdd 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -142,19 +142,21 @@ def open_file(source, footer_offset=None):
     return RecordBatchFileReader(source, footer_offset=footer_offset)
 
 
-def serialize_pandas(df):
+def serialize_pandas(df, nthreads=None):
     """Serialize a pandas DataFrame into a buffer protocol compatible object.
 
     Parameters
     ----------
     df : pandas.DataFrame
+    nthreads : int, default None
+        Number of threads to use for conversion to Arrow, default all CPUs
 
     Returns
     -------
     buf : buffer
         An object compatible with the buffer protocol
     """
-    batch = pa.RecordBatch.from_pandas(df)
+    batch = pa.RecordBatch.from_pandas(df, nthreads=nthreads)
     sink = pa.BufferOutputStream()
     writer = pa.RecordBatchStreamWriter(sink, batch.schema)
     writer.write_batch(batch)
@@ -162,15 +164,16 @@ def serialize_pandas(df):
     return sink.get_result()
 
 
-def deserialize_pandas(buf, nthreads=1):
+def deserialize_pandas(buf, nthreads=None):
     """Deserialize a buffer protocol compatible object into a pandas DataFrame.
 
     Parameters
     ----------
     buf : buffer
         An object compatible with the buffer protocol
-    nthreads : int, optional
-        The number of threads to use to convert the buffer to a DataFrame.
+    nthreads : int, defualt None
+        The number of threads to use to convert the buffer to a DataFrame,
+        default all CPUs
 
     Returns
     -------

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 141b33f..a071e56 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -290,17 +290,17 @@ def _column_name_to_strings(name):
     return str(name)
 
 
-def dataframe_to_arrays(df, schema, preserve_index):
+def dataframe_to_arrays(df, schema, preserve_index, nthreads=1):
     names = []
-    arrays = []
     index_columns = []
-    types = []
     type = None
 
     if preserve_index:
         n = len(getattr(df.index, 'levels', [df.index]))
         index_columns.extend(df.index.get_level_values(i) for i in range(n))
 
+    columns_to_convert = []
+    convert_types = []
     for name in df.columns:
         col = df[name]
         if not isinstance(name, six.string_types):
@@ -310,16 +310,40 @@ def dataframe_to_arrays(df, schema, preserve_index):
             field = schema.field_by_name(name)
             type = getattr(field, "type", None)
 
-        array = pa.array(col, from_pandas=True, type=type)
-        arrays.append(array)
+        columns_to_convert.append(col)
+        convert_types.append(type)
         names.append(name)
-        types.append(array.type)
 
     for i, column in enumerate(index_columns):
-        array = pa.array(column)
-        arrays.append(array)
+        columns_to_convert.append(column)
+        convert_types.append(None)
         names.append(index_level_name(column, i))
-        types.append(array.type)
+
+    # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
+    # using a thread pool is worth it. Currently the heuristic is whether the
+    # nrows > 100 * ncols.
+    if nthreads is None:
+        nrows, ncols = len(df), len(df.columns)
+        if nrows > ncols * 100:
+            nthreads = pa.cpu_count()
+        else:
+            nthreads = 1
+
+    def convert_column(col, ty):
+        return pa.array(col, from_pandas=True, type=ty)
+
+    if nthreads == 1:
+        arrays = [convert_column(c, t)
+                  for c, t in zip(columns_to_convert,
+                                  convert_types)]
+    else:
+        from concurrent import futures
+        with futures.ThreadPoolExecutor(nthreads) as executor:
+            arrays = list(executor.map(convert_column,
+                                       columns_to_convert,
+                                       convert_types))
+
+    types = [x.type for x in arrays]
 
     metadata = construct_metadata(
         df, names, index_columns, preserve_index, types

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index e5422a5..dd42cf2 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -556,7 +556,8 @@ cdef class RecordBatch:
         return Table.from_batches([self]).to_pandas(nthreads=nthreads)
 
     @classmethod
-    def from_pandas(cls, df, Schema schema=None, bint preserve_index=True):
+    def from_pandas(cls, df, Schema schema=None, bint preserve_index=True,
+                    nthreads=None):
         """
         Convert pandas.DataFrame to an Arrow RecordBatch
 
@@ -569,13 +570,16 @@ cdef class RecordBatch:
         preserve_index : bool, optional
             Whether to store the index as an additional column in the resulting
             ``RecordBatch``.
+        nthreads : int, default None (may use up to system CPU count threads)
+            If greater than 1, convert columns to Arrow in parallel using
+            indicated number of threads
 
         Returns
         -------
         pyarrow.RecordBatch
         """
         names, arrays, metadata = pdcompat.dataframe_to_arrays(
-            df, schema, preserve_index
+            df, schema, preserve_index, nthreads=nthreads
         )
         return cls.from_arrays(arrays, names, metadata)
 
@@ -714,7 +718,8 @@ cdef class Table:
         return result
 
     @classmethod
-    def from_pandas(cls, df, Schema schema=None, bint preserve_index=True):
+    def from_pandas(cls, df, Schema schema=None, bint preserve_index=True,
+                    nthreads=None):
         """
         Convert pandas.DataFrame to an Arrow Table
 
@@ -727,6 +732,9 @@ cdef class Table:
         preserve_index : bool, optional
             Whether to store the index as an additional column in the resulting
             ``Table``.
+        nthreads : int, default None (may use up to system CPU count threads)
+            If greater than 1, convert columns to Arrow in parallel using
+            indicated number of threads
 
         Returns
         -------
@@ -747,7 +755,8 @@ cdef class Table:
         names, arrays, metadata = pdcompat.dataframe_to_arrays(
             df,
             schema=schema,
-            preserve_index=preserve_index
+            preserve_index=preserve_index,
+            nthreads=nthreads
         )
         return cls.from_arrays(arrays, names=names, metadata=metadata)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index d42aa0e..41ad201 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -72,9 +72,13 @@ class TestPandasConversion(unittest.TestCase):
     def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
                                 expected_schema=None,
                                 check_dtype=True, schema=None,
-                                check_index=False):
-        table = pa.Table.from_pandas(df,
-                                     schema=schema, preserve_index=check_index)
+                                check_index=False,
+                                as_batch=False):
+        klass = pa.RecordBatch if as_batch else pa.Table
+        table = klass.from_pandas(df, schema=schema,
+                                  preserve_index=check_index,
+                                  nthreads=nthreads)
+
         result = table.to_pandas(nthreads=nthreads)
         if expected_schema:
             assert table.schema.equals(expected_schema)
@@ -663,6 +667,7 @@ class TestPandasConversion(unittest.TestCase):
     def test_threaded_conversion(self):
         df = _alltypes_example()
         self._check_pandas_roundtrip(df, nthreads=2)
+        self._check_pandas_roundtrip(df, nthreads=2, as_batch=True)
 
     def test_category(self):
         repeats = 5

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 6802c43..0d5b673 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -365,7 +365,7 @@ def test_get_record_batch_size():
 
 
 def _check_serialize_pandas_round_trip(df, nthreads=1):
-    buf = pa.serialize_pandas(df)
+    buf = pa.serialize_pandas(df, nthreads=nthreads)
     result = pa.deserialize_pandas(buf, nthreads=nthreads)
     assert_frame_equal(result, df)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/208e7981/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 8966fb8..edcf397 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -381,6 +381,13 @@ class BinaryDistribution(Distribution):
     def has_ext_modules(foo):
         return True
 
+
+install_requires = ['numpy >= 1.10', 'six >= 1.0.0']
+
+if sys.version_info.major == 2:
+    install_requires.append('futures')
+
+
 setup(
     name="pyarrow",
     packages=['pyarrow', 'pyarrow.tests'],
@@ -390,19 +397,18 @@ setup(
     distclass=BinaryDistribution,
     # Dummy extension to trigger build_ext
     ext_modules=[Extension('__dummy__', sources=[])],
-
     cmdclass={
         'clean': clean,
         'build_ext': build_ext
     },
-    entry_points = {
+    entry_points={
         'console_scripts': [
             'plasma_store = pyarrow:_plasma_store_entry_point'
         ]
     },
     use_scm_version={"root": "..", "relative_to": __file__},
     setup_requires=['setuptools_scm', 'cython >= 0.23'],
-    install_requires=['numpy >= 1.10', 'six >= 1.0.0'],
+    install_requires=install_requires,
     tests_require=['pytest'],
     description="Python library for Apache Arrow",
     long_description=long_description,


Mime
View raw message