arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [arrow] 06/09: ARROW-1684: [Python] Support selecting nested Parquet fields by any path prefix
Date Fri, 01 Dec 2017 16:50:46 GMT
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit b92c435b8f64f98593267fd72ecd61d26c23ffc0
Author: Wes McKinney <wes.mckinney@twosigma.com>
AuthorDate: Tue Nov 28 20:07:11 2017 -0500

    ARROW-1684: [Python] Support selecting nested Parquet fields by any path prefix
    
    Author: Wes McKinney <wes.mckinney@twosigma.com>
    
    Closes #1366 from wesm/ARROW-1684 and squashes the following commits:
    
    e63e42aa [Wes McKinney] Support selecting nested Parquet fields by any path prefix
---
 python/pyarrow/_parquet.pxd          |  1 +
 python/pyarrow/_parquet.pyx          | 29 ++++++++++++++++++++-----
 python/pyarrow/parquet.py            | 41 +++++++++++++++++++++++++++++++-----
 python/pyarrow/tests/test_parquet.py | 22 +++++++++++++++++++
 4 files changed, 83 insertions(+), 10 deletions(-)

diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 7e5e575..55b66b5 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -37,6 +37,7 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
 
     cdef cppclass ColumnPath:
         c_string ToDotString()
+        vector[c_string] ToDotVector()
 
 
 cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index eca6b20..147af21 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -600,9 +600,11 @@ cdef class ParquetReader:
         object source
         CMemoryPool* allocator
         unique_ptr[FileReader] reader
-        column_idx_map
         FileMetaData _metadata
 
+    cdef public:
+        _column_idx_map
+
     def __cinit__(self, MemoryPool memory_pool=None):
         self.allocator = maybe_unbox_memory_pool(memory_pool)
         self._metadata = None
@@ -624,6 +626,23 @@ cdef class ParquetReader:
             check_status(OpenFile(rd_handle, self.allocator, properties,
                                   c_metadata, &self.reader))
 
+    property column_paths:
+
+        def __get__(self):
+            cdef:
+                FileMetaData container = self.metadata
+                const CFileMetaData* metadata = container._metadata
+                vector[c_string] path
+                int i = 0
+
+            paths = []
+            for i in range(0, metadata.num_columns()):
+                path = (metadata.schema().Column(i)
+                        .path().get().ToDotVector())
+                paths.append([frombytes(x) for x in path])
+
+            return paths
+
     @property
     def metadata(self):
         cdef:
@@ -729,14 +748,14 @@ cdef class ParquetReader:
             const CFileMetaData* metadata = container._metadata
             int i = 0
 
-        if self.column_idx_map is None:
-            self.column_idx_map = {}
+        if self._column_idx_map is None:
+            self._column_idx_map = {}
             for i in range(0, metadata.num_columns()):
                 col_bytes = tobytes(metadata.schema().Column(i)
                                     .path().get().ToDotString())
-                self.column_idx_map[col_bytes] = i
+                self._column_idx_map[col_bytes] = i
 
-        return self.column_idx_map[tobytes(column_name)]
+        return self._column_idx_map[tobytes(column_name)]
 
     def read_column(self, int column_index):
         cdef:
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 37da662..9fb890c 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from collections import defaultdict
 import os
 import inspect
 import json
@@ -54,6 +55,24 @@ class ParquetFile(object):
         self.reader = ParquetReader()
         self.reader.open(source, metadata=metadata)
         self.common_metadata = common_metadata
+        self._nested_paths_by_prefix = self._build_nested_paths()
+
+    def _build_nested_paths(self):
+        paths = self.reader.column_paths
+
+        result = defaultdict(list)
+
+        def _visit_piece(i, key, rest):
+            result[key].append(i)
+
+            if len(rest) > 0:
+                nested_key = '.'.join((key, rest[0]))
+                _visit_piece(i, nested_key, rest[1:])
+
+        for i, path in enumerate(paths):
+            _visit_piece(i, path[0], path[1:])
+
+        return result
 
     @property
     def metadata(self):
@@ -75,7 +94,9 @@ class ParquetFile(object):
         Parameters
         ----------
         columns: list
-            If not None, only these columns will be read from the row group.
+            If not None, only these columns will be read from the row group. A
+            column name may be a prefix of a nested field, e.g. 'a' will select
+            'a.b', 'a.c', and 'a.d.e'
         nthreads : int, default 1
             Number of columns to read in parallel. If > 1, requires that the
             underlying file source is threadsafe
@@ -100,7 +121,9 @@ class ParquetFile(object):
         Parameters
         ----------
         columns: list
-            If not None, only these columns will be read from the file.
+            If not None, only these columns will be read from the file. A
+            column name may be a prefix of a nested field, e.g. 'a' will select
+            'a.b', 'a.c', and 'a.d.e'
         nthreads : int, default 1
             Number of columns to read in parallel. If > 1, requires that the
             underlying file source is threadsafe
@@ -143,7 +166,11 @@ class ParquetFile(object):
         if column_names is None:
             return None
 
-        indices = list(map(self.reader.column_name_idx, column_names))
+        indices = []
+
+        for name in column_names:
+            if name in self._nested_paths_by_prefix:
+                indices.extend(self._nested_paths_by_prefix[name])
 
         if use_pandas_metadata:
             file_keyvalues = self.metadata.metadata
@@ -837,7 +864,9 @@ def read_table(source, columns=None, nthreads=1, metadata=None,
         name or directory name. For passing Python file objects or byte
         buffers, see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
     columns: list
-        If not None, only these columns will be read from the file.
+        If not None, only these columns will be read from the file. A column
+        name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
+        'a.c', and 'a.d.e'
     nthreads : int, default 1
         Number of columns to read in parallel. Requires that the underlying
         file source is threadsafe
@@ -875,7 +904,9 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None):
         name. For passing Python file objects or byte buffers,
         see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
     columns: list
-        If not None, only these columns will be read from the file.
+        If not None, only these columns will be read from the file. A column
+        name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
+        'a.c', and 'a.d.e'
     nthreads : int, default 1
         Number of columns to read in parallel. Requires that the underlying
         file source is threadsafe
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 274ff45..9004fc0 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1469,6 +1469,28 @@ def test_index_column_name_duplicate(tmpdir):
 
 
 @parquet
+def test_parquet_nested_convenience(tmpdir):
+    # ARROW-1684
+    import pyarrow.parquet as pq
+
+    df = pd.DataFrame({
+        'a': [[1, 2, 3], None, [4, 5], []],
+        'b': [[1.], None, None, [6., 7.]],
+    })
+
+    path = str(tmpdir / 'nested_convenience.parquet')
+
+    table = pa.Table.from_pandas(df, preserve_index=False)
+    _write_table(table, path)
+
+    read = pq.read_table(path, columns=['a'])
+    tm.assert_frame_equal(read.to_pandas(), df[['a']])
+
+    read = pq.read_table(path, columns=['a', 'b'])
+    tm.assert_frame_equal(read.to_pandas(), df)
+
+
+@parquet
 def test_backwards_compatible_index_naming():
     expected_string = b"""\
 carat        cut  color  clarity  depth  table  price     x     y     z

-- 
To stop receiving notification emails like this one, please contact
"commits@arrow.apache.org" <commits@arrow.apache.org>.

Mime
View raw message