arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1377: [Python] Add ParquetFile.scan_contents function to use for benchmarking
Date Tue, 05 Sep 2017 23:13:11 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 243328931 -> 2660dda40


ARROW-1377: [Python] Add ParquetFile.scan_contents function to use for benchmarking

Requires PARQUET-1087: https://github.com/apache/parquet-cpp/pull/387

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1039 from wesm/ARROW-1377 and squashes the following commits:

39caa344 [Wes McKinney] Add ParquetFile.scan_contents function to use for benchmarking


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2660dda4
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2660dda4
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2660dda4

Branch: refs/heads/master
Commit: 2660dda4082a5b74d82d773b1c35feb69ddbf447
Parents: 2433289
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Sep 5 19:13:06 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Sep 5 19:13:06 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_parquet.pxd          |  3 +++
 python/pyarrow/_parquet.pyx          | 19 +++++++++++++++++++
 python/pyarrow/parquet.py            | 27 +++++++++++++++++++++++++--
 python/pyarrow/tests/test_parquet.py | 19 +++++++++++++++++++
 4 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index ced6549..5094232 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -231,6 +231,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
         CStatus ReadTable(const vector[int]& column_indices,
                           shared_ptr[CTable]* out)
 
+        CStatus ScanContents(vector[int] columns, int32_t column_batch_size,
+                             int64_t* num_rows)
+
         const ParquetFileReader* parquet_reader()
 
         void set_num_threads(int num_threads)

http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index f3b7875..aea6fb6 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -467,6 +467,25 @@ cdef class ParquetReader:
                              .ReadTable(&ctable))
         return pyarrow_wrap_table(ctable)
 
+    def scan_contents(self, column_indices=None, batch_size=65536):
+        cdef:
+            vector[int] c_column_indices
+            int32_t c_batch_size
+            int64_t c_num_rows
+
+        if column_indices is not None:
+            for index in column_indices:
+                c_column_indices.push_back(index)
+
+        c_batch_size = batch_size
+
+        with nogil:
+            check_status(self.reader.get()
+                         .ScanContents(c_column_indices, c_batch_size,
+                                       &c_num_rows))
+
+        return c_num_rows
+
     def column_name_idx(self, column_name):
         """
         Find the matching index of a column in the schema.

http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 5dabca9..568aad4 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -118,6 +118,27 @@ class ParquetFile(object):
         return self.reader.read_all(column_indices=column_indices,
                                     nthreads=nthreads)
 
+    def scan_contents(self, columns=None, batch_size=65536):
+        """
+        Read contents of file with a single thread for indicated columns and
+        batch size. Number of rows in file is returned. This function is used
+        for benchmarking
+
+        Parameters
+        ----------
+        columns : list of integers, default None
+            If None, scan all columns
+        batch_size : int, default 64K
+            Number of rows to read at a time internally
+
+        Returns
+        -------
+        num_rows : number of rows in file
+        """
+        column_indices = self._get_column_indices(columns)
+        return self.reader.scan_contents(column_indices,
+                                         batch_size=batch_size)
+
     def _get_column_indices(self, column_names, use_pandas_metadata=False):
         if column_names is None:
             return None
@@ -648,12 +669,14 @@ class ParquetDataset(object):
 def _ensure_filesystem(fs):
     fs_type = type(fs)
 
-    # If the arrow filesystem was subclassed, assume it supports the full interface and return
it
+    # If the arrow filesystem was subclassed, assume it supports the full
+    # interface and return it
     if not issubclass(fs_type, FileSystem):
         for mro in inspect.getmro(fs_type):
             if mro.__name__ is 'S3FileSystem':
                 return S3FSWrapper(fs)
-            # In case its a simple LocalFileSystem (e.g. dask) use native arrow FS
+            # In case its a simple LocalFileSystem (e.g. dask) use native arrow
+            # FS
             elif mro.__name__ is 'LocalFileSystem':
                 return LocalFileSystem.get_instance()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2660dda4/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index de6f431..fa9455b 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -676,6 +676,25 @@ def test_read_single_row_group_with_column_subset():
 
 
 @parquet
+def test_scan_contents():
+    import pyarrow.parquet as pq
+
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df)
+
+    buf = io.BytesIO()
+    _write_table(a_table, buf, row_group_size=N / K,
+                 compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
+    assert pf.scan_contents() == 10000
+    assert pf.scan_contents(df.columns[:4]) == 10000
+
+
+@parquet
 def test_parquet_piece_read(tmpdir):
     import pyarrow.parquet as pq
 


Mime
View raw message