arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-305: Add compression and use_dictionary options to Parquet
Date Thu, 29 Sep 2016 01:45:55 GMT
Repository: arrow
Updated Branches:
  refs/heads/master bf30235fa -> 30f60832a


ARROW-305: Add compression and use_dictionary options to Parquet

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #148 from xhochy/arrow-305 and squashes the following commits:

93d653b [Uwe L. Korn] ARROW-305: Add compression and use_dictionary options to Parquet interface


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/30f60832
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/30f60832
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/30f60832

Branch: refs/heads/master
Commit: 30f60832a5f4bd3063699061796d2107fb7a9738
Parents: bf30235
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Wed Sep 28 21:45:46 2016 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Wed Sep 28 21:45:46 2016 -0400

----------------------------------------------------------------------
 python/pyarrow/includes/parquet.pxd  | 12 ++++++++
 python/pyarrow/parquet.pyx           | 49 ++++++++++++++++++++++++++++++-
 python/pyarrow/tests/test_parquet.py | 40 +++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/30f60832/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index 9085b0b..754eecc 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -37,6 +37,13 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
       PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
       PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0"
 
+  enum Compression" parquet::Compression::type":
+      UNCOMPRESSED" parquet::Compression::UNCOMPRESSED"
+      SNAPPY" parquet::Compression::SNAPPY"
+      GZIP" parquet::Compression::GZIP"
+      LZO" parquet::Compression::LZO"
+      BROTLI" parquet::Compression::BROTLI"
+
   cdef cppclass SchemaDescriptor:
     shared_ptr[Node] schema()
     GroupNode* group()
@@ -90,6 +97,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
     cdef cppclass WriterProperties:
         cppclass Builder:
             Builder* version(ParquetVersion version)
+            Builder* compression(Compression codec)
+            Builder* compression(const c_string& path, Compression codec)
+            Builder* disable_dictionary()
+            Builder* enable_dictionary()
+            Builder* enable_dictionary(const c_string& path)
             shared_ptr[WriterProperties] build()
 
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/30f60832/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index fb36b29..099e148 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -90,7 +90,8 @@ def read_table(source, columns=None):
     return reader.read_all()
 
 
-def write_table(table, filename, chunk_size=None, version=None):
+def write_table(table, filename, chunk_size=None, version=None,
+                use_dictionary=True, compression=None):
     """
     Write a Table to Parquet format
 
@@ -102,6 +103,11 @@ def write_table(table, filename, chunk_size=None, version=None):
         The maximum number of rows in each Parquet RowGroup
     version : {"1.0", "2.0"}, default "1.0"
         The Parquet format version, defaults to 1.0
+    use_dictionary : bool or list
+        Specify if we should use dictionary encoding in general or only for
+        some columns.
+    compression : str or dict
+        Specify the compression codec, either on a general basis or per-column.
     """
     cdef Table table_ = table
     cdef CTable* ctable_ = table_.table
@@ -121,6 +127,47 @@ def write_table(table, filename, chunk_size=None, version=None):
         else:
             raise ArrowException("Unsupported Parquet format version")
 
+    if isinstance(use_dictionary, bool):
+        if use_dictionary:
+            properties_builder.enable_dictionary()
+        else:
+            properties_builder.disable_dictionary()
+    else:
+        # Deactivate dictionary encoding by default
+        properties_builder.disable_dictionary()
+        for column in use_dictionary:
+            properties_builder.enable_dictionary(column)
+
+    if isinstance(compression, basestring):
+        if compression == "NONE":
+            properties_builder.compression(UNCOMPRESSED)
+        elif compression == "SNAPPY":
+            properties_builder.compression(SNAPPY)
+        elif compression == "GZIP":
+            properties_builder.compression(GZIP)
+        elif compression == "LZO":
+            properties_builder.compression(LZO)
+        elif compression == "BROTLI":
+            properties_builder.compression(BROTLI)
+        else:
+            raise ArrowException("Unsupport compression codec")
+    elif compression is not None:
+        # Deactivate dictionary encoding by default
+        properties_builder.disable_dictionary()
+        for column, codec in compression.iteritems():
+            if codec == "NONE":
+                properties_builder.compression(column, UNCOMPRESSED)
+            elif codec == "SNAPPY":
+                properties_builder.compression(column, SNAPPY)
+            elif codec == "GZIP":
+                properties_builder.compression(column, GZIP)
+            elif codec == "LZO":
+                properties_builder.compression(column, LZO)
+            elif codec == "BROTLI":
+                properties_builder.compression(column, BROTLI)
+            else:
+                raise ArrowException("Unsupport compression codec")
+
     sink.reset(new LocalFileOutputStream(tobytes(filename)))
     with nogil:
         check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink,

http://git-wip-us.apache.org/repos/asf/arrow/blob/30f60832/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 8a2d8ca..0f9f2e4 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -110,3 +110,43 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
     df['uint32'] = df['uint32'].values.astype(np.int64)
 
     pdt.assert_frame_equal(df, df_read)
+
+@parquet
+def test_pandas_parquet_configuration_options(tmpdir):
+    size = 10000
+    np.random.seed(0)
+    df = pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16),
+        'uint32': np.arange(size, dtype=np.uint32),
+        'uint64': np.arange(size, dtype=np.uint64),
+        'int8': np.arange(size, dtype=np.int16),
+        'int16': np.arange(size, dtype=np.int16),
+        'int32': np.arange(size, dtype=np.int32),
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0
+    })
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = A.from_pandas_dataframe(df)
+
+    for use_dictionary in [True, False]:
+        A.parquet.write_table(
+                arrow_table,
+                filename.strpath,
+                version="2.0",
+                use_dictionary=use_dictionary)
+        table_read = pq.read_table(filename.strpath)
+        df_read = table_read.to_pandas()
+        pdt.assert_frame_equal(df, df_read)
+
+    for compression in ['NONE', 'SNAPPY', 'GZIP']:
+        A.parquet.write_table(
+                arrow_table,
+                filename.strpath,
+                version="2.0",
+                compression=compression)
+        table_read = pq.read_table(filename.strpath)
+        df_read = table_read.to_pandas()
+        pdt.assert_frame_equal(df, df_read)


Mime
View raw message