Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F03EC200C54 for ; Wed, 12 Apr 2017 19:05:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EE8A5160B95; Wed, 12 Apr 2017 17:05:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 493FE160B85 for ; Wed, 12 Apr 2017 19:05:28 +0200 (CEST) Received: (qmail 72360 invoked by uid 500); 12 Apr 2017 17:05:27 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 72351 invoked by uid 99); 12 Apr 2017 17:05:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Apr 2017 17:05:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DB5EDFDAC; Wed, 12 Apr 2017 17:05:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Message-Id: <0f2f62c6e7a74fa5bcc1157991ca3213@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: arrow git commit: ARROW-539: [Python] Add support for reading partitioned Parquet files with Hive-like directory schemes Date: Wed, 12 Apr 2017 17:05:27 +0000 (UTC) archived-at: Wed, 12 Apr 2017 17:05:30 -0000 Repository: arrow Updated Branches: refs/heads/master 9db96fea4 -> 9d532c49d ARROW-539: [Python] Add support for reading partitioned Parquet files with Hive-like directory schemes I probably didn't get all the use cases, but this should be a good start. First, the directory structure is walked to determine the distinct partition keys. These keys are later used as the dictionary for `arrow::DictionaryArray` objects which are constructed. I also created the `ParquetDatasetPiece` class to enable distributed processing of file components in frameworks like Dask. We may need to address pickling of the `ParquetPartitions` object (which must be passed to `ParquetDatasetPiece.read` so the right array metadata can be constructed. Author: Wes McKinney Author: Miki Tebeka Closes #529 from wesm/ARROW-539 and squashes the following commits: a0451fa [Wes McKinney] Code review comments deb6d82 [Wes McKinney] Don't make file-like Python object on LocalFilesystem 04dc691 [Wes McKinney] Complete initial partitioned reads, supporting unit tests. Expose arrow::Table::AddColumn 7d33755 [Wes McKinney] Untested draft of ParquetManifest for partitioned directory structures. Get test suite passing again ba8825f [Wes McKinney] Prototyping 18fe639 [Wes McKinney] Refactoring, add ParquetDataset, ParquetDatasetPiece 016b445 [Miki Tebeka] [ARROW-539] [Python] Support reading Parquet datasets with standard partition directory schemes Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9d532c49 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9d532c49 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9d532c49 Branch: refs/heads/master Commit: 9d532c49d563ec22f73af3cc49549eb2e5cb6898 Parents: 9db96fe Author: Wes McKinney Authored: Wed Apr 12 13:05:21 2017 -0400 Committer: Wes McKinney Committed: Wed Apr 12 13:05:21 2017 -0400 ---------------------------------------------------------------------- python/pyarrow/filesystem.py | 25 +- python/pyarrow/includes/libarrow.pxd | 2 + python/pyarrow/parquet.py | 547 ++++++++++++++++++++++++++---- python/pyarrow/table.pxd | 1 + python/pyarrow/table.pyx | 40 ++- python/pyarrow/tests/test_parquet.py | 156 +++++++-- python/pyarrow/tests/test_table.py | 31 ++ 7 files changed, 692 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index e820806..269cf1c 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -87,20 +87,10 @@ class Filesystem(object): ------- table : pyarrow.Table """ - from pyarrow.parquet import read_multiple_files - - if self.isdir(path): - paths_to_read = [] - for path in self.ls(path): - if path.endswith('parq') or path.endswith('parquet'): - paths_to_read.append(path) - else: - paths_to_read = [path] - - return read_multiple_files(paths_to_read, columns=columns, - filesystem=self, schema=schema, - metadata=metadata, - nthreads=nthreads) + from pyarrow.parquet import ParquetDataset + dataset = ParquetDataset(path, schema=schema, metadata=metadata, + filesystem=self) + return dataset.read(columns=columns, nthreads=nthreads) class LocalFilesystem(Filesystem): @@ -117,6 +107,13 @@ class LocalFilesystem(Filesystem): def ls(self, path): return sorted(pjoin(path, x) for x in os.listdir(path)) + @implements(Filesystem.mkdir) + def mkdir(self, path, create_parents=True): + if create_parents: + os.makedirs(path) + else: + os.mkdir(path) + @implements(Filesystem.isdir) def isdir(self, path): return os.path.isdir(path) http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 40dd837..ae2b45f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -291,6 +291,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CSchema] schema() shared_ptr[CColumn] column(int i) + CStatus AddColumn(int i, const shared_ptr[CColumn]& column, + shared_ptr[CTable]* out) CStatus RemoveColumn(int i, shared_ptr[CTable]* out) cdef cppclass CTensor" arrow::Tensor": http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index d95c3b3..f81b6c2 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -17,18 +17,23 @@ import six +import numpy as np + +from pyarrow.filesystem import LocalFilesystem from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa RowGroupMetaData, Schema, ParquetWriter) import pyarrow._parquet as _parquet # noqa -from pyarrow.table import concat_tables +import pyarrow.array as _array +import pyarrow.table as _table -EXCLUDED_PARQUET_PATHS = {'_metadata', '_common_metadata', '_SUCCESS'} +# ---------------------------------------------------------------------- +# Reading a single Parquet file class ParquetFile(object): """ - Open a Parquet binary file for reading + Reader interface for a single Parquet file Parameters ---------- @@ -72,7 +77,8 @@ class ParquetFile(object): Content of the row group as a table (of columns) """ column_indices = self._get_column_indices(columns) - self.reader.set_num_threads(nthreads) + if nthreads is not None: + self.reader.set_num_threads(nthreads) return self.reader.read_row_group(i, column_indices=column_indices) def read(self, columns=None, nthreads=1): @@ -93,7 +99,8 @@ class ParquetFile(object): Content of the file as a table (of columns) """ column_indices = self._get_column_indices(columns) - self.reader.set_num_threads(nthreads) + if nthreads is not None: + self.reader.set_num_threads(nthreads) return self.reader.read_all(column_indices=column_indices) def _get_column_indices(self, column_names): @@ -104,6 +111,463 @@ class ParquetFile(object): for column in column_names] +# ---------------------------------------------------------------------- +# Metadata container providing instructions about reading a single Parquet +# file, possibly part of a partitioned dataset + + +class ParquetDatasetPiece(object): + """ + A single chunk of a potentially larger Parquet dataset to read. The + arguments will indicate to read either a single row group or all row + groups, and whether to add partition keys to the resulting pyarrow.Table + + Parameters + ---------- + path : str + Path to file in the file system where this piece is located + partition_keys : list of tuples + [(column name, ordinal index)] + row_group : int, default None + Row group to load. By default, reads all row groups + """ + + def __init__(self, path, row_group=None, partition_keys=None): + self.path = path + self.row_group = row_group + self.partition_keys = partition_keys or [] + + def __eq__(self, other): + if not isinstance(other, ParquetDatasetPiece): + return False + return (self.path == other.path and + self.row_group == other.row_group and + self.partition_keys == other.partition_keys) + + def __ne__(self, other): + return not (self == other) + + def __repr__(self): + return ('{0}({1!r}, row_group={2!r}, partition_keys={3!r})' + .format(type(self).__name__, self.path, + self.row_group, + self.partition_keys)) + + def __str__(self): + result = '' + + if len(self.partition_keys) > 0: + partition_str = ', '.join('{0}={1}'.format(name, index) + for name, index in self.partition_keys) + result += 'partition[{0}] '.format(partition_str) + + result += self.path + + if self.row_group is not None: + result += ' | row_group={0}'.format(self.row_group) + + return result + + def get_metadata(self, open_file_func=None): + """ + Given a function that can create an open ParquetFile object, return the + file's metadata + """ + return self._open(open_file_func).metadata + + def _open(self, open_file_func=None): + """ + Returns instance of ParquetFile + """ + if open_file_func is None: + def simple_opener(path): + return ParquetFile(path) + open_file_func = simple_opener + return open_file_func(self.path) + + def read(self, columns=None, nthreads=1, partitions=None, + open_file_func=None): + """ + Read this piece as a pyarrow.Table + + Parameters + ---------- + columns : list of column names, default None + nthreads : int, default 1 + For multithreaded file reads + partitions : ParquetPartitions, default None + open_file_func : function, default None + A function that knows how to construct a ParquetFile object given + the file path in this piece + + Returns + ------- + table : pyarrow.Table + """ + reader = self._open(open_file_func) + + if self.row_group is not None: + table = reader.read_row_group(self.row_group, columns=columns, + nthreads=nthreads) + else: + table = reader.read(columns=columns, nthreads=nthreads) + + if len(self.partition_keys) > 0: + if partitions is None: + raise ValueError('Must pass partition sets') + + # Here, the index is the categorical code of the partition where + # this piece is located. Suppose we had + # + # /foo=a/0.parq + # /foo=b/0.parq + # /foo=c/0.parq + # + # Then we assign a=0, b=1, c=2. And the resulting Table pieces will + # have a DictionaryArray column named foo having the constant index + # value as indicated. The distinct categories of the partition have + # been computed in the ParquetManifest + for i, (name, index) in enumerate(self.partition_keys): + # The partition code is the same for all values in this piece + indices = np.array([index], dtype='i4').repeat(len(table)) + + # This is set of all partition values, computed as part of the + # manifest, so ['a', 'b', 'c'] as in our example above. + dictionary = partitions.levels[i].dictionary + + arr = _array.DictionaryArray.from_arrays(indices, dictionary) + col = _table.Column.from_array(name, arr) + table = table.append_column(col) + + return table + + +def _is_parquet_file(path): + return path.endswith('parq') or path.endswith('parquet') + + +class PartitionSet(object): + """A data structure for cataloguing the observed Parquet partitions at a + particular level. So if we have + + /foo=a/bar=0 + /foo=a/bar=1 + /foo=a/bar=2 + /foo=b/bar=0 + /foo=b/bar=1 + /foo=b/bar=2 + + Then we have two partition sets, one for foo, another for bar. As we visit + levels of the partition hierarchy, a PartitionSet tracks the distinct + values and assigns categorical codes to use when reading the pieces + """ + + def __init__(self, name, keys=None): + self.name = name + self.keys = keys or [] + self.key_indices = {k: i for i, k in enumerate(self.keys)} + self._dictionary = None + + def get_index(self, key): + """ + Get the index of the partition value if it is known, otherwise assign + one + """ + if key in self.key_indices: + return self.key_indices[key] + else: + index = len(self.key_indices) + self.keys.append(key) + self.key_indices[key] = index + return index + + @property + def dictionary(self): + if self._dictionary is not None: + return self._dictionary + + if len(self.keys) == 0: + raise ValueError('No known partition keys') + + # Only integer and string partition types are supported right now + try: + integer_keys = [int(x) for x in self.keys] + dictionary = _array.from_pylist(integer_keys) + except ValueError: + dictionary = _array.from_pylist(self.keys) + + self._dictionary = dictionary + return dictionary + + @property + def is_sorted(self): + return list(self.keys) == sorted(self.keys) + + +class ParquetPartitions(object): + + def __init__(self): + self.levels = [] + self.partition_names = set() + + def __len__(self): + return len(self.levels) + + def __getitem__(self, i): + return self.levels[i] + + def get_index(self, level, name, key): + """ + Record a partition value at a particular level, returning the distinct + code for that value at that level. Example: + + partitions.get_index(1, 'foo', 'a') returns 0 + partitions.get_index(1, 'foo', 'b') returns 1 + partitions.get_index(1, 'foo', 'c') returns 2 + partitions.get_index(1, 'foo', 'a') returns 0 + + Parameters + ---------- + level : int + The nesting level of the partition we are observing + name : string + The partition name + key : string or int + The partition value + """ + if level == len(self.levels): + if name in self.partition_names: + raise ValueError('{0} was the name of the partition in ' + 'another level'.format(name)) + + part_set = PartitionSet(name) + self.levels.append(part_set) + self.partition_names.add(name) + + return self.levels[level].get_index(key) + + +def is_string(x): + return isinstance(x, six.string_types) + + +class ParquetManifest(object): + """ + + """ + def __init__(self, dirpath, filesystem=None, pathsep='/', + partition_scheme='hive'): + self.filesystem = filesystem or LocalFilesystem.get_instance() + self.pathsep = pathsep + self.dirpath = dirpath + self.partition_scheme = partition_scheme + self.partitions = ParquetPartitions() + self.pieces = [] + + self.common_metadata_path = None + self.metadata_path = None + + self._visit_level(0, self.dirpath, []) + + def _visit_level(self, level, base_path, part_keys): + directories = [] + files = [] + fs = self.filesystem + + if not fs.isdir(base_path): + raise ValueError('"{0}" is not a directory'.format(base_path)) + + for path in sorted(fs.ls(base_path)): + if fs.isfile(path): + if _is_parquet_file(path): + files.append(path) + elif path.endswith('_common_metadata'): + self.common_metadata_path = path + elif path.endswith('_metadata'): + self.metadata_path = path + elif not self._should_silently_exclude(path): + print('Ignoring path: {0}'.format(path)) + elif fs.isdir(path): + directories.append(path) + + if len(files) > 0 and len(directories) > 0: + raise ValueError('Found files in an intermediate ' + 'directory: {0}'.format(base_path)) + elif len(directories) > 0: + self._visit_directories(level, directories, part_keys) + else: + self._push_pieces(files, part_keys) + + def _should_silently_exclude(self, path): + _, tail = path.rsplit(self.pathsep, 1) + return tail.endswith('.crc') or tail in EXCLUDED_PARQUET_PATHS + + def _visit_directories(self, level, directories, part_keys): + for path in directories: + head, tail = _path_split(path, self.pathsep) + name, key = _parse_hive_partition(tail) + + index = self.partitions.get_index(level, name, key) + dir_part_keys = part_keys + [(name, index)] + self._visit_level(level + 1, path, dir_part_keys) + + def _parse_partition(self, dirname): + if self.partition_scheme == 'hive': + return _parse_hive_partition(dirname) + else: + raise NotImplementedError('partition schema: {0}' + .format(self.partition_scheme)) + + def _push_pieces(self, files, part_keys): + self.pieces.extend([ + ParquetDatasetPiece(path, partition_keys=part_keys) + for path in files + ]) + + +def _parse_hive_partition(value): + if '=' not in value: + raise ValueError('Directory name did not appear to be a ' + 'partition: {0}'.format(value)) + return value.split('=', 1) + + +def _path_split(path, sep): + i = path.rfind(sep) + 1 + head, tail = path[:i], path[i:] + head = head.rstrip(sep) + return head, tail + + +EXCLUDED_PARQUET_PATHS = {'_SUCCESS'} + + +class ParquetDataset(object): + """ + Encapsulates details of reading a complete Parquet dataset possibly + consisting of multiple files and partitions in subdirectories + + Parameters + ---------- + path_or_paths : str or List[str] + A directory name, single file name, or list of file names + filesystem : Filesystem, default None + If nothing passed, paths assumed to be found in the local on-disk + filesystem + metadata : pyarrow.parquet.FileMetaData + Use metadata obtained elsewhere to validate file schemas + schema : pyarrow.parquet.Schema + Use schema obtained elsewhere to validate file schemas. Alternative to + metadata parameter + split_row_groups : boolean, default False + Divide files into pieces for each row group in the file + validate_schema : boolean, default True + Check that individual file schemas are all the same / compatible + """ + def __init__(self, path_or_paths, filesystem=None, schema=None, + metadata=None, split_row_groups=False, validate_schema=True): + if filesystem is None: + self.fs = LocalFilesystem.get_instance() + else: + self.fs = filesystem + + self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs) + + self.metadata = metadata + self.schema = schema + + self.split_row_groups = split_row_groups + + if split_row_groups: + raise NotImplementedError("split_row_groups not yet implemented") + + if validate_schema: + self.validate_schemas() + + def validate_schemas(self): + open_file = self._get_open_file_func() + + if self.metadata is None and self.schema is None: + self.schema = self.pieces[0].get_metadata(open_file).schema + elif self.schema is None: + self.schema = self.metadata.schema + + # Verify schemas are all equal + for piece in self.pieces: + file_metadata = piece.get_metadata(open_file) + if not self.schema.equals(file_metadata.schema): + raise ValueError('Schema in {0!s} was different. ' + '{1!s} vs {2!s}' + .format(piece, file_metadata.schema, + self.schema)) + + def read(self, columns=None, nthreads=1): + """ + Read multiple Parquet files as a single pyarrow.Table + + Parameters + ---------- + columns : List[str] + Names of columns to read from the file + nthreads : int, default 1 + Number of columns to read in parallel. Requires that the underlying + file source is threadsafe + + Returns + ------- + pyarrow.Table + Content of the file as a table (of columns) + """ + open_file = self._get_open_file_func() + + tables = [] + for piece in self.pieces: + table = piece.read(columns=columns, nthreads=nthreads, + partitions=self.partitions, + open_file_func=open_file) + tables.append(table) + + all_data = _table.concat_tables(tables) + return all_data + + def _get_open_file_func(self): + if self.fs is None or isinstance(self.fs, LocalFilesystem): + def open_file(path, meta=None): + return ParquetFile(path, metadata=meta) + else: + def open_file(path, meta=None): + return ParquetFile(self.fs.open(path, mode='rb'), + metadata=meta) + return open_file + + +def _make_manifest(path_or_paths, fs, pathsep='/'): + partitions = None + + if is_string(path_or_paths) and fs.isdir(path_or_paths): + manifest = ParquetManifest(path_or_paths, filesystem=fs, + pathsep=pathsep) + pieces = manifest.pieces + partitions = manifest.partitions + else: + if not isinstance(path_or_paths, list): + path_or_paths = [path_or_paths] + + # List of paths + if len(path_or_paths) == 0: + raise ValueError('Must pass at least one file path') + + pieces = [] + for path in path_or_paths: + if not fs.isfile(path): + raise IOError('Passed non-file path: {0}' + .format(path)) + piece = ParquetDatasetPiece(path) + pieces.append(piece) + + return pieces, partitions + + def read_table(source, columns=None, nthreads=1, metadata=None): """ Read a Table from Parquet format @@ -127,9 +591,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None): pyarrow.Table Content of the file as a table (of columns) """ - from pyarrow.filesystem import LocalFilesystem - - if isinstance(source, six.string_types): + if is_string(source): fs = LocalFilesystem.get_instance() if fs.isdir(source): return fs.read_parquet(source, columns=columns, @@ -139,70 +601,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None): return pf.read(columns=columns, nthreads=nthreads) -def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1, - metadata=None, schema=None): - """ - Read multiple Parquet files as a single pyarrow.Table - - Parameters - ---------- - paths : List[str] - List of file paths - columns : List[str] - Names of columns to read from the file - filesystem : Filesystem, default None - If nothing passed, paths assumed to be found in the local on-disk - filesystem - nthreads : int, default 1 - Number of columns to read in parallel. Requires that the underlying - file source is threadsafe - metadata : pyarrow.parquet.FileMetaData - Use metadata obtained elsewhere to validate file schemas - schema : pyarrow.parquet.Schema - Use schema obtained elsewhere to validate file schemas. Alternative to - metadata parameter - - Returns - ------- - pyarrow.Table - Content of the file as a table (of columns) - """ - if filesystem is None: - def open_file(path, meta=None): - return ParquetFile(path, metadata=meta) - else: - def open_file(path, meta=None): - return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta) - - if len(paths) == 0: - raise ValueError('Must pass at least one file path') - - if metadata is None and schema is None: - schema = open_file(paths[0]).schema - elif schema is None: - schema = metadata.schema - - # Verify schemas are all equal - all_file_metadata = [] - for path in paths: - file_metadata = open_file(path).metadata - if not schema.equals(file_metadata.schema): - raise ValueError('Schema in {0} was different. {1!s} vs {2!s}' - .format(path, file_metadata.schema, schema)) - all_file_metadata.append(file_metadata) - - # Read the tables - tables = [] - for path, path_metadata in zip(paths, all_file_metadata): - reader = open_file(path, meta=path_metadata) - table = reader.read(columns=columns, nthreads=nthreads) - tables.append(table) - - all_data = concat_tables(tables) - return all_data - - -def write_table(table, sink, row_group_size=None, version='1.0', +def write_table(table, where, row_group_size=None, version='1.0', use_dictionary=True, compression='snappy', **kwargs): """ Write a Table to Parquet format @@ -210,7 +609,7 @@ def write_table(table, sink, row_group_size=None, version='1.0', Parameters ---------- table : pyarrow.Table - sink: string or pyarrow.io.NativeFile + where: string or pyarrow.io.NativeFile row_group_size : int, default None The maximum number of rows in each Parquet RowGroup. As a default, we will write a single RowGroup per file. @@ -223,7 +622,7 @@ def write_table(table, sink, row_group_size=None, version='1.0', Specify the compression codec, either on a general basis or per-column. """ row_group_size = kwargs.get('chunk_size', row_group_size) - writer = ParquetWriter(sink, use_dictionary=use_dictionary, + writer = ParquetWriter(where, use_dictionary=use_dictionary, compression=compression, version=version) writer.write_table(table, row_group_size=row_group_size) http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd index 389727b..f564042 100644 --- a/python/pyarrow/table.pxd +++ b/python/pyarrow/table.pxd @@ -58,5 +58,6 @@ cdef class RecordBatch: cdef init(self, const shared_ptr[CRecordBatch]& table) cdef _check_nullptr(self) +cdef object box_column(const shared_ptr[CColumn]& ccolumn) cdef api object table_from_ctable(const shared_ptr[CTable]& ctable) cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch) http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 94389a7..3972bda 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -30,8 +30,9 @@ import pyarrow.config from pyarrow.array cimport Array, box_array, wrap_array_output from pyarrow.error import ArrowException from pyarrow.error cimport check_status -from pyarrow.schema cimport box_data_type, box_schema, DataType +from pyarrow.schema cimport box_data_type, box_schema, DataType, Field +from pyarrow.schema import field from pyarrow.compat import frombytes, tobytes cimport cpython @@ -141,6 +142,19 @@ cdef class Column: self.sp_column = column self.column = column.get() + @staticmethod + def from_array(object field_or_name, Array arr): + cdef Field boxed_field + + if isinstance(field_or_name, Field): + boxed_field = field_or_name + else: + boxed_field = field(field_or_name, arr.type) + + cdef shared_ptr[CColumn] sp_column + sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array)) + return box_column(sp_column) + def to_pandas(self): """ Convert the arrow::Column to a pandas.Series @@ -828,6 +842,24 @@ cdef class Table: """ return (self.num_rows, self.num_columns) + def add_column(self, int i, Column column): + """ + Add column to Table at position. Returns new table + """ + cdef: + shared_ptr[CTable] c_table + + with nogil: + check_status(self.table.AddColumn(i, column.sp_column, &c_table)) + + return table_from_ctable(c_table) + + def append_column(self, Column column): + """ + Append column at end of columns. Returns new table + """ + return self.add_column(self.num_columns, column) + def remove_column(self, int i): """ Create new Table with the indicated column removed @@ -865,6 +897,12 @@ def concat_tables(tables): return table_from_ctable(c_result) +cdef object box_column(const shared_ptr[CColumn]& ccolumn): + cdef Column column = Column() + column.init(ccolumn) + return column + + cdef api object table_from_ctable(const shared_ptr[CTable]& ctable): cdef Table table = Table() table.init(ctable) http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 86165be..de1b148 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -16,11 +16,13 @@ # under the License. from os.path import join as pjoin +import datetime import io import os import pytest -from pyarrow.compat import guid +from pyarrow.compat import guid, u +from pyarrow.filesystem import LocalFilesystem import pyarrow as pa import pyarrow.io as paio from .pandas_examples import dataframe_with_arrays, dataframe_with_lists @@ -28,7 +30,7 @@ from .pandas_examples import dataframe_with_arrays, dataframe_with_lists import numpy as np import pandas as pd -import pandas.util.testing as pdt +import pandas.util.testing as tm try: import pyarrow.parquet as pq @@ -93,7 +95,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -125,7 +127,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): # We pass uint32_t as int64_t if we write Parquet version 1.0 df['uint32'] = df['uint32'].values.astype(np.int64) - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -142,7 +144,7 @@ def test_pandas_column_selection(tmpdir): table_read = pq.read_table(filename.strpath, columns=['uint8']) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df[['uint8']], df_read) + tm.assert_frame_equal(df[['uint8']], df_read) def _random_integers(size, dtype): @@ -169,7 +171,7 @@ def _test_dataframe(size=10000, seed=0): 'float64': np.random.randn(size), 'float64': np.arange(size, dtype=np.float64), 'bool': np.random.randn(size) > 0, - 'strings': [pdt.rands(10) for i in range(size)] + 'strings': [tm.rands(10) for i in range(size)] }) return df @@ -183,7 +185,7 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir): buf = imos.get_result() reader = paio.BufferReader(buf) df_read = pq.read_table(reader).to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -207,7 +209,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir): table_read = pq.read_table(data) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -236,7 +238,7 @@ def test_pandas_parquet_configuration_options(tmpdir): use_dictionary=use_dictionary) table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) for compression in ['NONE', 'SNAPPY', 'GZIP']: pq.write_table(arrow_table, filename.strpath, @@ -244,7 +246,7 @@ def test_pandas_parquet_configuration_options(tmpdir): compression=compression) table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) def make_sample_file(df): @@ -331,7 +333,7 @@ def test_column_of_arrays(tmpdir): pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -344,7 +346,7 @@ def test_column_of_lists(tmpdir): pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() - pdt.assert_frame_equal(df, df_read) + tm.assert_frame_equal(df, df_read) @parquet @@ -399,7 +401,7 @@ def test_pass_separate_metadata(): fileh = pq.ParquetFile(buf, metadata=metadata) - pdt.assert_frame_equal(df, fileh.read().to_pandas()) + tm.assert_frame_equal(df, fileh.read().to_pandas()) @parquet @@ -422,13 +424,121 @@ def test_read_single_row_group(): row_groups = [pf.read_row_group(i) for i in range(K)] result = pa.concat_tables(row_groups) - pdt.assert_frame_equal(df, result.to_pandas()) + tm.assert_frame_equal(df, result.to_pandas()) cols = df.columns[:2] row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)] result = pa.concat_tables(row_groups) - pdt.assert_frame_equal(df[cols], result.to_pandas()) + tm.assert_frame_equal(df[cols], result.to_pandas()) + + +@parquet +def test_parquet_piece_basics(): + path = '/baz.parq' + + piece1 = pq.ParquetDatasetPiece(path) + piece2 = pq.ParquetDatasetPiece(path, row_group=1) + piece3 = pq.ParquetDatasetPiece( + path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)]) + + assert str(piece1) == path + assert str(piece2) == '/baz.parq | row_group=1' + assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1' + + assert piece1 == piece1 + assert piece2 == piece2 + assert piece3 == piece3 + assert piece1 != piece3 + + +@parquet +def test_partition_set_dictionary_type(): + set1 = pq.PartitionSet('key1', [u('foo'), u('bar'), u('baz')]) + set2 = pq.PartitionSet('key2', [2007, 2008, 2009]) + + assert isinstance(set1.dictionary, pa.StringArray) + assert isinstance(set2.dictionary, pa.IntegerArray) + + set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)]) + with pytest.raises(TypeError): + set3.dictionary + + +@parquet +def test_read_partitioned_directory(tmpdir): + foo_keys = [0, 1] + bar_keys = ['a', 'b', 'c'] + partition_spec = [ + ['foo', foo_keys], + ['bar', bar_keys] + ] + N = 30 + + df = pd.DataFrame({ + 'index': np.arange(N), + 'foo': np.array(foo_keys, dtype='i4').repeat(15), + 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2), + 'values': np.random.randn(N) + }, columns=['index', 'foo', 'bar', 'values']) + + base_path = str(tmpdir) + _generate_partition_directories(base_path, partition_spec, df) + + dataset = pq.ParquetDataset(base_path) + table = dataset.read() + result_df = (table.to_pandas() + .sort_values(by='index') + .reset_index(drop=True)) + + expected_df = (df.sort_values(by='index') + .reset_index(drop=True) + .reindex(columns=result_df.columns)) + expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) + expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys) + + assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all() + + tm.assert_frame_equal(result_df, expected_df) + + +def _generate_partition_directories(base_dir, partition_spec, df): + # partition_spec : list of lists, e.g. [['foo', [0, 1, 2], + # ['bar', ['a', 'b', 'c']] + # part_table : a pyarrow.Table to write to each partition + DEPTH = len(partition_spec) + fs = LocalFilesystem.get_instance() + + def _visit_level(base_dir, level, part_keys): + name, values = partition_spec[level] + for value in values: + this_part_keys = part_keys + [(name, value)] + + level_dir = pjoin(base_dir, '{0}={1}'.format(name, value)) + fs.mkdir(level_dir) + + if level == DEPTH - 1: + # Generate example data + file_path = pjoin(level_dir, 'data.parq') + + filtered_df = _filter_partition(df, this_part_keys) + part_table = pa.Table.from_pandas(filtered_df) + pq.write_table(part_table, file_path) + else: + _visit_level(level_dir, level + 1, this_part_keys) + + _visit_level(base_dir, 0, []) + + +def _filter_partition(df, part_keys): + predicate = np.ones(len(df), dtype=bool) + + to_drop = [] + for name, value in part_keys: + to_drop.append(name) + predicate &= df[name] == value + + return df[predicate].drop(to_drop, axis=1) @parquet @@ -459,7 +569,11 @@ def test_read_multiple_files(tmpdir): with open(pjoin(dirpath, '_SUCCESS.crc'), 'wb') as f: f.write(b'0') - result = pq.read_multiple_files(paths) + def read_multiple_files(paths, columns=None, nthreads=None, **kwargs): + dataset = pq.ParquetDataset(paths, **kwargs) + return dataset.read(columns=columns, nthreads=nthreads) + + result = read_multiple_files(paths) expected = pa.concat_tables(test_data) assert result.equals(expected) @@ -467,7 +581,7 @@ def test_read_multiple_files(tmpdir): # Read with provided metadata metadata = pq.ParquetFile(paths[0]).metadata - result2 = pq.read_multiple_files(paths, metadata=metadata) + result2 = read_multiple_files(paths, metadata=metadata) assert result2.equals(expected) result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema) @@ -493,15 +607,15 @@ def test_read_multiple_files(tmpdir): bad_meta = pq.ParquetFile(bad_apple_path).metadata with pytest.raises(ValueError): - pq.read_multiple_files(paths + [bad_apple_path]) + read_multiple_files(paths + [bad_apple_path]) with pytest.raises(ValueError): - pq.read_multiple_files(paths, metadata=bad_meta) + read_multiple_files(paths, metadata=bad_meta) mixed_paths = [bad_apple_path, paths[0]] with pytest.raises(ValueError): - pq.read_multiple_files(mixed_paths, schema=bad_meta.schema) + read_multiple_files(mixed_paths, schema=bad_meta.schema) with pytest.raises(ValueError): - pq.read_multiple_files(mixed_paths) + read_multiple_files(mixed_paths) http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 548f478..79b4c15 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -39,6 +39,14 @@ class TestColumn(unittest.TestCase): assert column.shape == (5,) assert column.to_pylist() == [-10, -5, 0, 5, 10] + def test_from_array(self): + arr = pa.from_pylist([0, 1, 2, 3, 4]) + + col1 = pa.Column.from_array('foo', arr) + col2 = pa.Column.from_array(pa.field('foo', arr.type), arr) + + assert col1.equals(col2) + def test_pandas(self): data = [ pa.from_pylist([-10, -5, 0, 5, 10]) @@ -169,6 +177,29 @@ def test_table_basics(): assert chunk is not None +def test_table_add_column(): + data = [ + pa.from_pylist(range(5)), + pa.from_pylist([-10, -5, 0, 5, 10]), + pa.from_pylist(range(5, 10)) + ] + table = pa.Table.from_arrays(data, names=('a', 'b', 'c')) + + col = pa.Column.from_array('d', data[1]) + t2 = table.add_column(3, col) + t3 = table.append_column(col) + + expected = pa.Table.from_arrays(data + [data[1]], + names=('a', 'b', 'c', 'd')) + assert t2.equals(expected) + assert t3.equals(expected) + + t4 = table.add_column(0, col) + expected = pa.Table.from_arrays([data[1]] + data, + names=('d', 'a', 'b', 'c')) + assert t4.equals(expected) + + def test_table_remove_column(): data = [ pa.from_pylist(range(5)),