parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [3/3] parquet-cpp git commit: PARQUET-844: Schema, compression consolidation / flattening
Date Thu, 26 Jan 2017 17:02:52 GMT
PARQUET-844: Schema, compression consolidation / flattening

Will look at `encodings/`, `file/`, and `column/` directories later

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #228 from wesm/PARQUET-844 and squashes the following commits:

45b2887 [Wes McKinney] Fix include rename
88f0afe [Wes McKinney] Consolidate schema code and tests into schema.h/schema-internal.h
0385381 [Wes McKinney] Consolidate compression code into a single header


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/13da51d3
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/13da51d3
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/13da51d3

Branch: refs/heads/master
Commit: 13da51d3f3a4da1417beddf409eba90ce3657a68
Parents: 257e65b
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Thu Jan 26 12:02:42 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Jan 26 12:02:42 2017 -0500

----------------------------------------------------------------------
 CMakeLists.txt                               |  12 +-
 benchmarks/decode_benchmark.cc               |   2 +-
 src/parquet/CMakeLists.txt                   |   4 +
 src/parquet/api/schema.h                     |   4 +-
 src/parquet/column/column-reader-test.cc     |   3 +-
 src/parquet/column/properties.h              |   2 +-
 src/parquet/column/reader.h                  |   2 +-
 src/parquet/column/scanner-test.cc           |   3 +-
 src/parquet/column/scanner.h                 |   2 +-
 src/parquet/column/statistics-test.cc        |   2 +-
 src/parquet/column/statistics.h              |   2 +-
 src/parquet/column/writer.h                  |   2 +-
 src/parquet/compression-test.cc              |  84 +++
 src/parquet/compression.cc                   | 259 ++++++++
 src/parquet/compression.h                    | 125 ++++
 src/parquet/compression/CMakeLists.txt       |  23 -
 src/parquet/compression/brotli-codec.cc      |  53 --
 src/parquet/compression/codec-test.cc        |  84 ---
 src/parquet/compression/codec.cc             |  50 --
 src/parquet/compression/codec.h              | 125 ----
 src/parquet/compression/gzip-codec.cc        | 175 ------
 src/parquet/compression/snappy-codec.cc      |  48 --
 src/parquet/encodings/encoding-test.cc       |   3 +-
 src/parquet/encodings/plain-encoding.h       |   2 +-
 src/parquet/file/file-deserialize-test.cc    |   2 +-
 src/parquet/file/file-metadata-test.cc       |   3 +-
 src/parquet/file/metadata.cc                 |   3 +-
 src/parquet/file/metadata.h                  |   4 +-
 src/parquet/file/reader-internal.cc          |   6 +-
 src/parquet/file/reader-internal.h           |   2 +-
 src/parquet/file/reader.h                    |   2 +-
 src/parquet/file/writer-internal.cc          |   3 +-
 src/parquet/file/writer-internal.h           |   2 +-
 src/parquet/file/writer.h                    |   3 +-
 src/parquet/schema-internal.h                |  83 +++
 src/parquet/schema-test.cc                   | 703 ++++++++++++++++++++++
 src/parquet/schema.cc                        | 655 ++++++++++++++++++++
 src/parquet/schema.h                         | 405 +++++++++++++
 src/parquet/schema/CMakeLists.txt            |  28 -
 src/parquet/schema/converter.cc              | 124 ----
 src/parquet/schema/converter.h               |  91 ---
 src/parquet/schema/descriptor.cc             | 138 -----
 src/parquet/schema/descriptor.h              | 142 -----
 src/parquet/schema/printer.cc                | 159 -----
 src/parquet/schema/printer.h                 |  40 --
 src/parquet/schema/schema-converter-test.cc  | 222 -------
 src/parquet/schema/schema-descriptor-test.cc | 190 ------
 src/parquet/schema/schema-printer-test.cc    |  76 ---
 src/parquet/schema/schema-types-test.cc      | 311 ----------
 src/parquet/schema/test-util.h               |  63 --
 src/parquet/schema/types.cc                  | 315 ----------
 src/parquet/schema/types.h                   | 292 ---------
 src/parquet/util/comparison-test.cc          |   2 +-
 src/parquet/util/comparison.h                |   2 +-
 54 files changed, 2348 insertions(+), 2794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ac9d515..8ff1421 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -486,10 +486,7 @@ set(LIBPARQUET_SRCS
   src/parquet/column/scan-all.cc
   src/parquet/column/statistics.cc
 
-  src/parquet/compression/codec.cc
-  src/parquet/compression/brotli-codec.cc
-  src/parquet/compression/snappy-codec.cc
-  src/parquet/compression/gzip-codec.cc
+  src/parquet/compression.cc
 
   src/parquet/file/metadata.cc
   src/parquet/file/reader.cc
@@ -497,10 +494,7 @@ set(LIBPARQUET_SRCS
   src/parquet/file/writer.cc
   src/parquet/file/writer-internal.cc
 
-  src/parquet/schema/converter.cc
-  src/parquet/schema/descriptor.cc
-  src/parquet/schema/printer.cc
-  src/parquet/schema/types.cc
+  src/parquet/schema.cc
 
   src/parquet/util/cpu-info.cc
   src/parquet/util/memory.cc
@@ -578,10 +572,8 @@ endif()
 add_subdirectory(src/parquet)
 add_subdirectory(src/parquet/api)
 add_subdirectory(src/parquet/column)
-add_subdirectory(src/parquet/compression)
 add_subdirectory(src/parquet/encodings)
 add_subdirectory(src/parquet/file)
-add_subdirectory(src/parquet/schema)
 add_subdirectory(src/parquet/thrift)
 add_subdirectory(src/parquet/util)
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/benchmarks/decode_benchmark.cc
----------------------------------------------------------------------
diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc
index 7659234..d767622 100644
--- a/benchmarks/decode_benchmark.cc
+++ b/benchmarks/decode_benchmark.cc
@@ -19,7 +19,7 @@
 #include <random>
 #include <stdio.h>
 
-#include "parquet/compression/codec.h"
+#include "parquet/compression.h"
 #include "parquet/encodings/delta-bit-pack-encoding.h"
 #include "parquet/encodings/delta-byte-array-encoding.h"
 #include "parquet/encodings/delta-length-byte-array-encoding.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index a2ebbad..f49a54d 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -17,10 +17,14 @@
 
 # Headers: top level
 install(FILES
+  compression.h
   exception.h
+  schema.h
   types.h
   DESTINATION include/parquet)
 
+ADD_PARQUET_TEST(compression-test)
 ADD_PARQUET_TEST(public-api-test)
 ADD_PARQUET_TEST(types-test)
 ADD_PARQUET_TEST(reader-test)
+ADD_PARQUET_TEST(schema-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/api/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/api/schema.h b/src/parquet/api/schema.h
index 523d046..2e6c3b3 100644
--- a/src/parquet/api/schema.h
+++ b/src/parquet/api/schema.h
@@ -19,8 +19,6 @@
 #define PARQUET_API_SCHEMA_H
 
 // Schemas
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/printer.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 
 #endif  // PARQUET_API_SCHEMA_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 5cf3084..d410b5f 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -28,8 +28,7 @@
 #include "parquet/column/page.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/test-util.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/test-common.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index cf89226..b9b0bf3 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -23,7 +23,7 @@
 #include <unordered_map>
 
 #include "parquet/exception.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 3b6a971..90c0761 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -29,7 +29,7 @@
 #include "parquet/column/page.h"
 #include "parquet/encodings/decoder.h"
 #include "parquet/exception.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index 8eee191..5d137b7 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -28,8 +28,7 @@
 #include "parquet/column/scanner.h"
 #include "parquet/column/test-specialization.h"
 #include "parquet/column/test-util.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/test-common.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 13fb01b..17340b7 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -27,7 +27,7 @@
 
 #include "parquet/column/reader.h"
 #include "parquet/exception.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
index 38cdc23..5eb7625 100644
--- a/src/parquet/column/statistics-test.cc
+++ b/src/parquet/column/statistics-test.cc
@@ -31,7 +31,7 @@
 #include "parquet/column/writer.h"
 #include "parquet/file/reader.h"
 #include "parquet/file/writer.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index 68bd4c8..af5ada6 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -22,7 +22,7 @@
 #include <memory>
 #include <string>
 
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index b52b932..094c65b 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -26,7 +26,7 @@
 #include "parquet/column/statistics.h"
 #include "parquet/encodings/encoder.h"
 #include "parquet/file/metadata.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression-test.cc b/src/parquet/compression-test.cc
new file mode 100644
index 0000000..f4fd3ba
--- /dev/null
+++ b/src/parquet/compression-test.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <gtest/gtest.h>
+#include <string>
+#include <vector>
+
+#include "parquet/compression.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet {
+
+template <typename T>
+void CheckCodecRoundtrip(const vector<uint8_t>& data) {
+  // create multiple compressors to try to break them
+  T c1;
+  T c2;
+
+  int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]);
+  std::vector<uint8_t> compressed(max_compressed_len);
+  std::vector<uint8_t> decompressed(data.size());
+
+  // compress with c1
+  int actual_size =
+      c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]);
+  compressed.resize(actual_size);
+
+  // decompress with c2
+  c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]);
+
+  ASSERT_TRUE(test::vector_equal(data, decompressed));
+
+  // compress with c2
+  int actual_size2 =
+      c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]);
+  ASSERT_EQ(actual_size2, actual_size);
+
+  // decompress with c1
+  c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]);
+
+  ASSERT_TRUE(test::vector_equal(data, decompressed));
+}
+
+template <typename T>
+void CheckCodec() {
+  int sizes[] = {10000, 100000};
+  for (int data_size : sizes) {
+    vector<uint8_t> data;
+    test::random_bytes(data_size, 1234, &data);
+    CheckCodecRoundtrip<T>(data);
+  }
+}
+
+TEST(TestCompressors, Snappy) {
+  CheckCodec<SnappyCodec>();
+}
+
+TEST(TestCompressors, Brotli) {
+  CheckCodec<BrotliCodec>();
+}
+
+TEST(TestCompressors, GZip) {
+  CheckCodec<GZipCodec>();
+}
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression.cc b/src/parquet/compression.cc
new file mode 100644
index 0000000..97b5c17
--- /dev/null
+++ b/src/parquet/compression.cc
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <brotli/decode.h>
+#include <brotli/encode.h>
+#include <snappy.h>
+
+#include "parquet/compression.h"
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) {
+  std::unique_ptr<Codec> result;
+  switch (codec_type) {
+    case Compression::UNCOMPRESSED:
+      break;
+    case Compression::SNAPPY:
+      result.reset(new SnappyCodec());
+      break;
+    case Compression::GZIP:
+      result.reset(new GZipCodec());
+      break;
+    case Compression::LZO:
+      ParquetException::NYI("LZO codec not implemented");
+      break;
+    case Compression::BROTLI:
+      result.reset(new BrotliCodec());
+      break;
+    default:
+      ParquetException::NYI("Unrecognized codec");
+      break;
+  }
+  return result;
+}
+
+// ----------------------------------------------------------------------
+// gzip implementation
+
+// These are magic numbers from zlib.h.  Not clear why they are not defined
+// there.
+
+// Maximum window size
+static constexpr int WINDOW_BITS = 15;
+
+// Output Gzip.
+static constexpr int GZIP_CODEC = 16;
+
+// Determine if this is libz or gzip from header.
+static constexpr int DETECT_CODEC = 32;
+
+GZipCodec::GZipCodec(Format format)
+    : format_(format), compressor_initialized_(false), decompressor_initialized_(false) {}
+
+GZipCodec::~GZipCodec() {
+  EndCompressor();
+  EndDecompressor();
+}
+
+void GZipCodec::InitCompressor() {
+  EndDecompressor();
+  memset(&stream_, 0, sizeof(stream_));
+
+  int ret;
+  // Initialize to run specified format
+  int window_bits = WINDOW_BITS;
+  if (format_ == DEFLATE) {
+    window_bits = -window_bits;
+  } else if (format_ == GZIP) {
+    window_bits += GZIP_CODEC;
+  }
+  if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9,
+           Z_DEFAULT_STRATEGY)) != Z_OK) {
+    throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg));
+  }
+
+  compressor_initialized_ = true;
+}
+
+void GZipCodec::EndCompressor() {
+  if (compressor_initialized_) { (void)deflateEnd(&stream_); }
+  compressor_initialized_ = false;
+}
+
+void GZipCodec::InitDecompressor() {
+  EndCompressor();
+  memset(&stream_, 0, sizeof(stream_));
+  int ret;
+
+  // Initialize to run either deflate or zlib/gzip format
+  int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
+  if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+    throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
+  }
+  decompressor_initialized_ = true;
+}
+
+void GZipCodec::EndDecompressor() {
+  if (decompressor_initialized_) { (void)inflateEnd(&stream_); }
+  decompressor_initialized_ = false;
+}
+
+void GZipCodec::Decompress(
+    int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) {
+  if (!decompressor_initialized_) { InitDecompressor(); }
+  if (output_length == 0) {
+    // The zlib library does not allow *output to be NULL, even when output_length
+    // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
+    // error, so bail early if no output is expected. Note that we don't signal
+    // an error if the input actually contains compressed data.
+    return;
+  }
+
+  // Reset the stream for this block
+  if (inflateReset(&stream_) != Z_OK) {
+    throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg));
+  }
+
+  int ret = 0;
+  // gzip can run in streaming mode or non-streaming mode.  We only
+  // support the non-streaming use case where we present it the entire
+  // compressed input and a buffer big enough to contain the entire
+  // compressed output.  In the case where we don't know the output,
+  // we just make a bigger buffer and try the non-streaming mode
+  // from the beginning again.
+  while (ret != Z_STREAM_END) {
+    stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+    stream_.avail_in = input_length;
+    stream_.next_out = reinterpret_cast<Bytef*>(output);
+    stream_.avail_out = output_length;
+
+    // We know the output size.  In this case, we can use Z_FINISH
+    // which is more efficient.
+    ret = inflate(&stream_, Z_FINISH);
+    if (ret == Z_STREAM_END || ret != Z_OK) break;
+
+    // Failure, buffer was too small
+    std::stringstream ss;
+    ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length
+       << " OutputLength=" << output_length;
+    throw ParquetException(ss.str());
+  }
+
+  // Failure for some other reason
+  if (ret != Z_STREAM_END) {
+    std::stringstream ss;
+    ss << "GZipCodec failed: ";
+    if (stream_.msg != NULL) ss << stream_.msg;
+    throw ParquetException(ss.str());
+  }
+}
+
+int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
+  // Most be in compression mode
+  if (!compressor_initialized_) { InitCompressor(); }
+  // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
+  return deflateBound(&stream_, static_cast<uLong>(input_length));
+}
+
+int64_t GZipCodec::Compress(
+    int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) {
+  if (!compressor_initialized_) { InitCompressor(); }
+  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+  stream_.avail_in = input_length;
+  stream_.next_out = reinterpret_cast<Bytef*>(output);
+  stream_.avail_out = output_length;
+
+  int64_t ret = 0;
+  if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
+    if (ret == Z_OK) {
+      // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
+      // small
+      throw ParquetException("zlib deflate failed, output buffer to small");
+    }
+    std::stringstream ss;
+    ss << "zlib deflate failed: " << stream_.msg;
+    throw ParquetException(ss.str());
+  }
+
+  if (deflateReset(&stream_) != Z_OK) {
+    throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg));
+  }
+
+  // Actual output length
+  return output_length - stream_.avail_out;
+}
+
+// ----------------------------------------------------------------------
+// Snappy implementation
+
+void SnappyCodec::Decompress(
+    int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+  if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
+          static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
+    throw parquet::ParquetException("Corrupt snappy compressed data.");
+  }
+}
+
+int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+  return snappy::MaxCompressedLength(input_len);
+}
+
+int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
+    int64_t output_buffer_len, uint8_t* output_buffer) {
+  size_t output_len;
+  snappy::RawCompress(reinterpret_cast<const char*>(input),
+      static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
+      &output_len);
+  return output_len;
+}
+
+// ----------------------------------------------------------------------
+// Brotli implementation
+
+void BrotliCodec::Decompress(
+    int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
+  size_t output_size = output_len;
+  if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) !=
+      BROTLI_DECODER_RESULT_SUCCESS) {
+    throw parquet::ParquetException("Corrupt brotli compressed data.");
+  }
+}
+
+int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
+  return BrotliEncoderMaxCompressedSize(input_len);
+}
+
+int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
+    int64_t output_buffer_len, uint8_t* output_buffer) {
+  size_t output_len = output_buffer_len;
+  // TODO: Make quality configurable. We use 8 as a default as it is the best
+  //       trade-off for Parquet workload
+  if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len,
+          input, &output_len, output_buffer) == BROTLI_FALSE) {
+    throw parquet::ParquetException("Brotli compression failure.");
+  }
+  return output_len;
+}
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression.h b/src/parquet/compression.h
new file mode 100644
index 0000000..abd4899
--- /dev/null
+++ b/src/parquet/compression.h
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COMPRESSION_CODEC_H
+#define PARQUET_COMPRESSION_CODEC_H
+
+#include <zlib.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+class Codec {
+ public:
+  virtual ~Codec() {}
+
+  static std::unique_ptr<Codec> Create(Compression::type codec);
+
+  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
+      uint8_t* output_buffer) = 0;
+
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer) = 0;
+
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;
+
+  virtual const char* name() const = 0;
+};
+
+// Snappy codec.
+class SnappyCodec : public Codec {
+ public:
+  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
+      uint8_t* output_buffer);
+
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer);
+
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
+
+  virtual const char* name() const { return "snappy"; }
+};
+
+// Brotli codec.
+class BrotliCodec : public Codec {
+ public:
+  void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
+      uint8_t* output_buffer) override;
+
+  int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len,
+      uint8_t* output_buffer) override;
+
+  int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
+
+  const char* name() const override { return "brotli"; }
+};
+
+// GZip codec.
+class GZipCodec : public Codec {
+ public:
+  /// Compression formats supported by the zlib library
+  enum Format {
+    ZLIB,
+    DEFLATE,
+    GZIP,
+  };
+
+  explicit GZipCodec(Format format = GZIP);
+  virtual ~GZipCodec();
+
+  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
+      uint8_t* output_buffer);
+
+  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
+      int64_t output_buffer_len, uint8_t* output_buffer);
+
+  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
+
+  virtual const char* name() const { return "gzip"; }
+
+ private:
+  // zlib is stateful and the z_stream state variable must be initialized
+  // before
+  z_stream stream_;
+
+  // Realistically, this will always be GZIP, but we leave the option open to
+  // configure
+  Format format_;
+
+  // These variables are mutually exclusive. When the codec is in "compressor"
+  // state, compressor_initialized_ is true while decompressor_initialized_ is
+  // false. When it's decompressing, the opposite is true.
+  //
+  // Indeed, this is slightly hacky, but the alternative is having separate
+  // Compressor and Decompressor classes. If this ever becomes an issue, we can
+  // perform the refactoring then
+  void InitCompressor();
+  void InitDecompressor();
+  void EndCompressor();
+  void EndDecompressor();
+  bool compressor_initialized_;
+  bool decompressor_initialized_;
+};
+
+}  // namespace parquet
+
+#endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
deleted file mode 100644
index 82ba522..0000000
--- a/src/parquet/compression/CMakeLists.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Headers: compression
-install(FILES
-  codec.h
-  DESTINATION include/parquet/compression)
-
-ADD_PARQUET_TEST(codec-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/brotli-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/brotli-codec.cc b/src/parquet/compression/brotli-codec.cc
deleted file mode 100644
index 8118206..0000000
--- a/src/parquet/compression/brotli-codec.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <brotli/decode.h>
-#include <brotli/encode.h>
-#include <cstdint>
-#include <cstdlib>
-
-#include "parquet/compression/codec.h"
-#include "parquet/exception.h"
-
-namespace parquet {
-
-void BrotliCodec::Decompress(
-    int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
-  size_t output_size = output_len;
-  if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) !=
-      BROTLI_DECODER_RESULT_SUCCESS) {
-    throw parquet::ParquetException("Corrupt brotli compressed data.");
-  }
-}
-
-int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
-  return BrotliEncoderMaxCompressedSize(input_len);
-}
-
-int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
-    int64_t output_buffer_len, uint8_t* output_buffer) {
-  size_t output_len = output_buffer_len;
-  // TODO: Make quality configurable. We use 8 as a default as it is the best
-  //       trade-off for Parquet workload
-  if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len,
-          input, &output_len, output_buffer) == BROTLI_FALSE) {
-    throw parquet::ParquetException("Brotli compression failure.");
-  }
-  return output_len;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc
deleted file mode 100644
index f2be84b..0000000
--- a/src/parquet/compression/codec-test.cc
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <gtest/gtest.h>
-#include <string>
-#include <vector>
-
-#include "parquet/compression/codec.h"
-#include "parquet/util/test-common.h"
-
-using std::string;
-using std::vector;
-
-namespace parquet {
-
-template <typename T>
-void CheckCodecRoundtrip(const vector<uint8_t>& data) {
-  // create multiple compressors to try to break them
-  T c1;
-  T c2;
-
-  int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]);
-  std::vector<uint8_t> compressed(max_compressed_len);
-  std::vector<uint8_t> decompressed(data.size());
-
-  // compress with c1
-  int actual_size =
-      c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]);
-  compressed.resize(actual_size);
-
-  // decompress with c2
-  c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]);
-
-  ASSERT_TRUE(test::vector_equal(data, decompressed));
-
-  // compress with c2
-  int actual_size2 =
-      c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0]);
-  ASSERT_EQ(actual_size2, actual_size);
-
-  // decompress with c1
-  c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]);
-
-  ASSERT_TRUE(test::vector_equal(data, decompressed));
-}
-
-template <typename T>
-void CheckCodec() {
-  int sizes[] = {10000, 100000};
-  for (int data_size : sizes) {
-    vector<uint8_t> data;
-    test::random_bytes(data_size, 1234, &data);
-    CheckCodecRoundtrip<T>(data);
-  }
-}
-
-TEST(TestCompressors, Snappy) {
-  CheckCodec<SnappyCodec>();
-}
-
-TEST(TestCompressors, Brotli) {
-  CheckCodec<BrotliCodec>();
-}
-
-TEST(TestCompressors, GZip) {
-  CheckCodec<GZipCodec>();
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.cc b/src/parquet/compression/codec.cc
deleted file mode 100644
index a7e5fba..0000000
--- a/src/parquet/compression/codec.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <memory>
-
-#include "parquet/compression/codec.h"
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-namespace parquet {
-
-std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) {
-  std::unique_ptr<Codec> result;
-  switch (codec_type) {
-    case Compression::UNCOMPRESSED:
-      break;
-    case Compression::SNAPPY:
-      result.reset(new SnappyCodec());
-      break;
-    case Compression::GZIP:
-      result.reset(new GZipCodec());
-      break;
-    case Compression::LZO:
-      ParquetException::NYI("LZO codec not implemented");
-      break;
-    case Compression::BROTLI:
-      result.reset(new BrotliCodec());
-      break;
-    default:
-      ParquetException::NYI("Unrecognized codec");
-      break;
-  }
-  return result;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
deleted file mode 100644
index abd4899..0000000
--- a/src/parquet/compression/codec.h
+++ /dev/null
@@ -1,125 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COMPRESSION_CODEC_H
-#define PARQUET_COMPRESSION_CODEC_H
-
-#include <zlib.h>
-
-#include <cstdint>
-#include <memory>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-namespace parquet {
-
-class Codec {
- public:
-  virtual ~Codec() {}
-
-  static std::unique_ptr<Codec> Create(Compression::type codec);
-
-  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
-      uint8_t* output_buffer) = 0;
-
-  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
-      int64_t output_buffer_len, uint8_t* output_buffer) = 0;
-
-  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;
-
-  virtual const char* name() const = 0;
-};
-
-// Snappy codec.
-class SnappyCodec : public Codec {
- public:
-  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
-      uint8_t* output_buffer);
-
-  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
-      int64_t output_buffer_len, uint8_t* output_buffer);
-
-  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
-
-  virtual const char* name() const { return "snappy"; }
-};
-
-// Brotli codec.
-class BrotliCodec : public Codec {
- public:
-  void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
-      uint8_t* output_buffer) override;
-
-  int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len,
-      uint8_t* output_buffer) override;
-
-  int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
-
-  const char* name() const override { return "brotli"; }
-};
-
-// GZip codec.
-class GZipCodec : public Codec {
- public:
-  /// Compression formats supported by the zlib library
-  enum Format {
-    ZLIB,
-    DEFLATE,
-    GZIP,
-  };
-
-  explicit GZipCodec(Format format = GZIP);
-  virtual ~GZipCodec();
-
-  virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len,
-      uint8_t* output_buffer);
-
-  virtual int64_t Compress(int64_t input_len, const uint8_t* input,
-      int64_t output_buffer_len, uint8_t* output_buffer);
-
-  virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);
-
-  virtual const char* name() const { return "gzip"; }
-
- private:
-  // zlib is stateful and the z_stream state variable must be initialized
-  // before
-  z_stream stream_;
-
-  // Realistically, this will always be GZIP, but we leave the option open to
-  // configure
-  Format format_;
-
-  // These variables are mutually exclusive. When the codec is in "compressor"
-  // state, compressor_initialized_ is true while decompressor_initialized_ is
-  // false. When it's decompressing, the opposite is true.
-  //
-  // Indeed, this is slightly hacky, but the alternative is having separate
-  // Compressor and Decompressor classes. If this ever becomes an issue, we can
-  // perform the refactoring then
-  void InitCompressor();
-  void InitDecompressor();
-  void EndCompressor();
-  void EndDecompressor();
-  bool compressor_initialized_;
-  bool decompressor_initialized_;
-};
-
-}  // namespace parquet
-
-#endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/gzip-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc
deleted file mode 100644
index 6c27714..0000000
--- a/src/parquet/compression/gzip-codec.cc
+++ /dev/null
@@ -1,175 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstring>
-#include <sstream>
-#include <string>
-
-#include "parquet/compression/codec.h"
-#include "parquet/exception.h"
-
-namespace parquet {
-
-// These are magic numbers from zlib.h.  Not clear why they are not defined
-// there.
-
-// Maximum window size
-static constexpr int WINDOW_BITS = 15;
-
-// Output Gzip.
-static constexpr int GZIP_CODEC = 16;
-
-// Determine if this is libz or gzip from header.
-static constexpr int DETECT_CODEC = 32;
-
-GZipCodec::GZipCodec(Format format)
-    : format_(format), compressor_initialized_(false), decompressor_initialized_(false) {}
-
-GZipCodec::~GZipCodec() {
-  EndCompressor();
-  EndDecompressor();
-}
-
-void GZipCodec::InitCompressor() {
-  EndDecompressor();
-  memset(&stream_, 0, sizeof(stream_));
-
-  int ret;
-  // Initialize to run specified format
-  int window_bits = WINDOW_BITS;
-  if (format_ == DEFLATE) {
-    window_bits = -window_bits;
-  } else if (format_ == GZIP) {
-    window_bits += GZIP_CODEC;
-  }
-  if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9,
-           Z_DEFAULT_STRATEGY)) != Z_OK) {
-    throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg));
-  }
-
-  compressor_initialized_ = true;
-}
-
-void GZipCodec::EndCompressor() {
-  if (compressor_initialized_) { (void)deflateEnd(&stream_); }
-  compressor_initialized_ = false;
-}
-
-void GZipCodec::InitDecompressor() {
-  EndCompressor();
-  memset(&stream_, 0, sizeof(stream_));
-  int ret;
-
-  // Initialize to run either deflate or zlib/gzip format
-  int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
-  if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
-    throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
-  }
-  decompressor_initialized_ = true;
-}
-
-void GZipCodec::EndDecompressor() {
-  if (decompressor_initialized_) { (void)inflateEnd(&stream_); }
-  decompressor_initialized_ = false;
-}
-
-void GZipCodec::Decompress(
-    int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) {
-  if (!decompressor_initialized_) { InitDecompressor(); }
-  if (output_length == 0) {
-    // The zlib library does not allow *output to be NULL, even when output_length
-    // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
-    // error, so bail early if no output is expected. Note that we don't signal
-    // an error if the input actually contains compressed data.
-    return;
-  }
-
-  // Reset the stream for this block
-  if (inflateReset(&stream_) != Z_OK) {
-    throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg));
-  }
-
-  int ret = 0;
-  // gzip can run in streaming mode or non-streaming mode.  We only
-  // support the non-streaming use case where we present it the entire
-  // compressed input and a buffer big enough to contain the entire
-  // compressed output.  In the case where we don't know the output,
-  // we just make a bigger buffer and try the non-streaming mode
-  // from the beginning again.
-  while (ret != Z_STREAM_END) {
-    stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
-    stream_.avail_in = input_length;
-    stream_.next_out = reinterpret_cast<Bytef*>(output);
-    stream_.avail_out = output_length;
-
-    // We know the output size.  In this case, we can use Z_FINISH
-    // which is more efficient.
-    ret = inflate(&stream_, Z_FINISH);
-    if (ret == Z_STREAM_END || ret != Z_OK) break;
-
-    // Failure, buffer was too small
-    std::stringstream ss;
-    ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length
-       << " OutputLength=" << output_length;
-    throw ParquetException(ss.str());
-  }
-
-  // Failure for some other reason
-  if (ret != Z_STREAM_END) {
-    std::stringstream ss;
-    ss << "GZipCodec failed: ";
-    if (stream_.msg != NULL) ss << stream_.msg;
-    throw ParquetException(ss.str());
-  }
-}
-
-int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
-  // Most be in compression mode
-  if (!compressor_initialized_) { InitCompressor(); }
-  // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
-  return deflateBound(&stream_, static_cast<uLong>(input_length));
-}
-
-int64_t GZipCodec::Compress(
-    int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) {
-  if (!compressor_initialized_) { InitCompressor(); }
-  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
-  stream_.avail_in = input_length;
-  stream_.next_out = reinterpret_cast<Bytef*>(output);
-  stream_.avail_out = output_length;
-
-  int64_t ret = 0;
-  if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
-    if (ret == Z_OK) {
-      // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
-      // small
-      throw ParquetException("zlib deflate failed, output buffer to small");
-    }
-    std::stringstream ss;
-    ss << "zlib deflate failed: " << stream_.msg;
-    throw ParquetException(ss.str());
-  }
-
-  if (deflateReset(&stream_) != Z_OK) {
-    throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg));
-  }
-
-  // Actual output length
-  return output_length - stream_.avail_out;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
deleted file mode 100644
index 70f3a41..0000000
--- a/src/parquet/compression/snappy-codec.cc
+++ /dev/null
@@ -1,48 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdlib>
-#include <snappy.h>
-
-#include "parquet/compression/codec.h"
-#include "parquet/exception.h"
-
-namespace parquet {
-
-void SnappyCodec::Decompress(
-    int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) {
-  if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
-          static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
-    throw parquet::ParquetException("Corrupt snappy compressed data.");
-  }
-}
-
-int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) {
-  return snappy::MaxCompressedLength(input_len);
-}
-
-int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
-    int64_t output_buffer_len, uint8_t* output_buffer) {
-  size_t output_len;
-  snappy::RawCompress(reinterpret_cast<const char*>(input),
-      static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
-      &output_len);
-  return output_len;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index a1aa448..1e9894d 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -24,8 +24,7 @@
 
 #include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/bit-util.h"
 #include "parquet/util/memory.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index d2127ef..5e7e269 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -23,7 +23,7 @@
 
 #include "parquet/encodings/decoder.h"
 #include "parquet/encodings/encoder.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/util/bit-stream-utils.inline.h"
 #include "parquet/util/memory.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index fbb511a..30b3f6d 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -27,7 +27,7 @@
 #include <vector>
 
 #include "parquet/column/page.h"
-#include "parquet/compression/codec.h"
+#include "parquet/compression.h"
 #include "parquet/exception.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/thrift/parquet_types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 9b9fffd..0edc34f 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -17,8 +17,7 @@
 
 #include "parquet/column/statistics.h"
 #include "parquet/file/metadata.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include <gtest/gtest.h>
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 2f7f9d2..b4dffde 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -21,7 +21,8 @@
 
 #include "parquet/exception.h"
 #include "parquet/file/metadata.h"
-#include "parquet/schema/converter.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
 #include "parquet/thrift/util.h"
 #include "parquet/util/memory.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 7307ddf..1f8b09f 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -24,8 +24,8 @@
 
 #include "parquet/column/properties.h"
 #include "parquet/column/statistics.h"
-#include "parquet/compression/codec.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/compression.h"
+#include "parquet/schema.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index d6fcc48..425a001 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -25,11 +25,9 @@
 #include <vector>
 
 #include "parquet/column/page.h"
-#include "parquet/compression/codec.h"
+#include "parquet/compression.h"
 #include "parquet/exception.h"
-#include "parquet/schema/converter.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/thrift/util.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 7b21c2e..4ff065d 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -24,7 +24,7 @@
 
 #include "parquet/column/page.h"
 #include "parquet/column/properties.h"
-#include "parquet/compression/codec.h"
+#include "parquet/compression.h"
 #include "parquet/file/metadata.h"
 #include "parquet/file/reader.h"
 #include "parquet/thrift/parquet_types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 723595a..3cdfa9f 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -29,7 +29,7 @@
 #include "parquet/column/properties.h"
 #include "parquet/column/statistics.h"
 #include "parquet/file/metadata.h"
-#include "parquet/schema/descriptor.h"
+#include "parquet/schema.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 724635c..56f08f3 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -18,7 +18,8 @@
 #include "parquet/file/writer-internal.h"
 
 #include "parquet/column/writer.h"
-#include "parquet/schema/converter.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
 #include "parquet/thrift/util.h"
 #include "parquet/util/memory.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 12002e1..e711c44 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -22,7 +22,7 @@
 #include <vector>
 
 #include "parquet/column/page.h"
-#include "parquet/compression/codec.h"
+#include "parquet/compression.h"
 #include "parquet/file/metadata.h"
 #include "parquet/file/writer.h"
 #include "parquet/thrift/parquet_types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 6d7161b..07ccb51 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -22,8 +22,7 @@
 #include <memory>
 
 #include "parquet/column/properties.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/schema/types.h"
+#include "parquet/schema.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/schema-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema-internal.h b/src/parquet/schema-internal.h
new file mode 100644
index 0000000..5ceaa0c
--- /dev/null
+++ b/src/parquet/schema-internal.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module contains the logical parquet-cpp types (independent of Thrift
+// structures), schema nodes, and related type tools
+
+#ifndef PARQUET_SCHEMA_INTERNAL_H
+#define PARQUET_SCHEMA_INTERNAL_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "parquet/schema.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/types.h"
+#include "parquet/util/macros.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+namespace schema {
+
+// ----------------------------------------------------------------------
+// Conversion from Parquet Thrift metadata
+
+std::shared_ptr<SchemaDescriptor> FromParquet(
+    const std::vector<format::SchemaElement>& schema);
+
+class FlatSchemaConverter {
+ public:
+  FlatSchemaConverter(const format::SchemaElement* elements, int length)
+      : elements_(elements), length_(length), pos_(0), current_id_(0) {}
+
+  std::unique_ptr<Node> Convert();
+
+ private:
+  const format::SchemaElement* elements_;
+  int length_;
+  int pos_;
+  int current_id_;
+
+  int next_id() { return current_id_++; }
+
+  const format::SchemaElement& Next();
+
+  std::unique_ptr<Node> NextNode();
+};
+
+// ----------------------------------------------------------------------
+// Conversion to Parquet Thrift metadata
+
+void ToParquet(const GroupNode* schema, std::vector<format::SchemaElement>* out);
+
+// Converts nested parquet schema back to a flat vector of Thrift structs
+class SchemaFlattener {
+ public:
+  SchemaFlattener(const GroupNode* schema, std::vector<format::SchemaElement>* out);
+
+  void Flatten();
+
+ private:
+  const GroupNode* root_;
+  std::vector<format::SchemaElement>* elements_;
+};
+
+}  // namespace schema
+}  // namespace parquet
+
+#endif  // PARQUET_SCHEMA_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/13da51d3/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
new file mode 100644
index 0000000..b092b38
--- /dev/null
+++ b/src/parquet/schema-test.cc
@@ -0,0 +1,703 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <cstdlib>
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/types.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet {
+
+using format::ConvertedType;
+using format::FieldRepetitionType;
+using format::SchemaElement;
+
+namespace schema {
+
+static inline SchemaElement NewPrimitive(const std::string& name,
+    FieldRepetitionType::type repetition, format::Type::type type, int id = 0) {
+  SchemaElement result;
+  result.__set_name(name);
+  result.__set_repetition_type(repetition);
+  result.__set_type(type);
+  result.__set_num_children(0);
+
+  return result;
+}
+
+static inline SchemaElement NewGroup(const std::string& name,
+    FieldRepetitionType::type repetition, int num_children, int id = 0) {
+  SchemaElement result;
+  result.__set_name(name);
+  result.__set_repetition_type(repetition);
+  result.__set_num_children(num_children);
+
+  return result;
+}
+
+// ----------------------------------------------------------------------
+// ColumnPath
+
+TEST(TestColumnPath, TestAttrs) {
+  ColumnPath path(std::vector<std::string>({"toplevel", "leaf"}));
+
+  ASSERT_EQ(path.ToDotString(), "toplevel.leaf");
+
+  std::shared_ptr<ColumnPath> path_ptr = ColumnPath::FromDotString("toplevel.leaf");
+  ASSERT_EQ(path_ptr->ToDotString(), "toplevel.leaf");
+
+  std::shared_ptr<ColumnPath> extended = path_ptr->extend("anotherlevel");
+  ASSERT_EQ(extended->ToDotString(), "toplevel.leaf.anotherlevel");
+}
+
+// ----------------------------------------------------------------------
+// Primitive node
+
+class TestPrimitiveNode : public ::testing::Test {
+ public:
+  void SetUp() {
+    name_ = "name";
+    id_ = 5;
+  }
+
+  void Convert(const format::SchemaElement* element) {
+    node_ = PrimitiveNode::FromParquet(element, id_);
+    ASSERT_TRUE(node_->is_primitive());
+    prim_node_ = static_cast<const PrimitiveNode*>(node_.get());
+  }
+
+ protected:
+  std::string name_;
+  const PrimitiveNode* prim_node_;
+
+  int id_;
+  std::unique_ptr<Node> node_;
+};
+
+TEST_F(TestPrimitiveNode, Attrs) {
+  PrimitiveNode node1("foo", Repetition::REPEATED, Type::INT32);
+
+  PrimitiveNode node2("bar", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8);
+
+  ASSERT_EQ("foo", node1.name());
+
+  ASSERT_TRUE(node1.is_primitive());
+  ASSERT_FALSE(node1.is_group());
+
+  ASSERT_EQ(Repetition::REPEATED, node1.repetition());
+  ASSERT_EQ(Repetition::OPTIONAL, node2.repetition());
+
+  ASSERT_EQ(Node::PRIMITIVE, node1.node_type());
+
+  ASSERT_EQ(Type::INT32, node1.physical_type());
+  ASSERT_EQ(Type::BYTE_ARRAY, node2.physical_type());
+
+  // logical types
+  ASSERT_EQ(LogicalType::NONE, node1.logical_type());
+  ASSERT_EQ(LogicalType::UTF8, node2.logical_type());
+
+  // repetition
+  node1 = PrimitiveNode("foo", Repetition::REQUIRED, Type::INT32);
+  node2 = PrimitiveNode("foo", Repetition::OPTIONAL, Type::INT32);
+  PrimitiveNode node3("foo", Repetition::REPEATED, Type::INT32);
+
+  ASSERT_TRUE(node1.is_required());
+
+  ASSERT_TRUE(node2.is_optional());
+  ASSERT_FALSE(node2.is_required());
+
+  ASSERT_TRUE(node3.is_repeated());
+  ASSERT_FALSE(node3.is_optional());
+}
+
+TEST_F(TestPrimitiveNode, FromParquet) {
+  SchemaElement elt =
+      NewPrimitive(name_, FieldRepetitionType::OPTIONAL, format::Type::INT32, 0);
+  Convert(&elt);
+  ASSERT_EQ(name_, prim_node_->name());
+  ASSERT_EQ(id_, prim_node_->id());
+  ASSERT_EQ(Repetition::OPTIONAL, prim_node_->repetition());
+  ASSERT_EQ(Type::INT32, prim_node_->physical_type());
+  ASSERT_EQ(LogicalType::NONE, prim_node_->logical_type());
+
+  // Test a logical type
+  elt = NewPrimitive(name_, FieldRepetitionType::REQUIRED, format::Type::BYTE_ARRAY, 0);
+  elt.__set_converted_type(ConvertedType::UTF8);
+
+  Convert(&elt);
+  ASSERT_EQ(Repetition::REQUIRED, prim_node_->repetition());
+  ASSERT_EQ(Type::BYTE_ARRAY, prim_node_->physical_type());
+  ASSERT_EQ(LogicalType::UTF8, prim_node_->logical_type());
+
+  // FIXED_LEN_BYTE_ARRAY
+  elt = NewPrimitive(
+      name_, FieldRepetitionType::OPTIONAL, format::Type::FIXED_LEN_BYTE_ARRAY, 0);
+  elt.__set_type_length(16);
+
+  Convert(&elt);
+  ASSERT_EQ(name_, prim_node_->name());
+  ASSERT_EQ(id_, prim_node_->id());
+  ASSERT_EQ(Repetition::OPTIONAL, prim_node_->repetition());
+  ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, prim_node_->physical_type());
+  ASSERT_EQ(16, prim_node_->type_length());
+
+  // ConvertedType::Decimal
+  elt = NewPrimitive(
+      name_, FieldRepetitionType::OPTIONAL, format::Type::FIXED_LEN_BYTE_ARRAY, 0);
+  elt.__set_converted_type(ConvertedType::DECIMAL);
+  elt.__set_type_length(6);
+  elt.__set_scale(2);
+  elt.__set_precision(12);
+
+  Convert(&elt);
+  ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, prim_node_->physical_type());
+  ASSERT_EQ(LogicalType::DECIMAL, prim_node_->logical_type());
+  ASSERT_EQ(6, prim_node_->type_length());
+  ASSERT_EQ(2, prim_node_->decimal_metadata().scale);
+  ASSERT_EQ(12, prim_node_->decimal_metadata().precision);
+}
+
+TEST_F(TestPrimitiveNode, Equals) {
+  PrimitiveNode node1("foo", Repetition::REQUIRED, Type::INT32);
+  PrimitiveNode node2("foo", Repetition::REQUIRED, Type::INT64);
+  PrimitiveNode node3("bar", Repetition::REQUIRED, Type::INT32);
+  PrimitiveNode node4("foo", Repetition::OPTIONAL, Type::INT32);
+  PrimitiveNode node5("foo", Repetition::REQUIRED, Type::INT32);
+
+  ASSERT_TRUE(node1.Equals(&node1));
+  ASSERT_FALSE(node1.Equals(&node2));
+  ASSERT_FALSE(node1.Equals(&node3));
+  ASSERT_FALSE(node1.Equals(&node4));
+  ASSERT_TRUE(node1.Equals(&node5));
+
+  PrimitiveNode flba1("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::DECIMAL, 12, 4, 2);
+
+  PrimitiveNode flba2("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::DECIMAL, 1, 4, 2);
+  flba2.SetTypeLength(12);
+
+  PrimitiveNode flba3("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::DECIMAL, 1, 4, 2);
+  flba3.SetTypeLength(16);
+
+  PrimitiveNode flba4("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::DECIMAL, 12, 4, 0);
+
+  PrimitiveNode flba5("foo", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::NONE, 12, 4, 0);
+
+  ASSERT_TRUE(flba1.Equals(&flba2));
+  ASSERT_FALSE(flba1.Equals(&flba3));
+  ASSERT_FALSE(flba1.Equals(&flba4));
+  ASSERT_FALSE(flba1.Equals(&flba5));
+}
+
+TEST_F(TestPrimitiveNode, PhysicalLogicalMapping) {
+  ASSERT_NO_THROW(
+      PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::INT_32));
+  ASSERT_NO_THROW(PrimitiveNode::Make(
+      "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::JSON));
+  ASSERT_THROW(
+      PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::JSON),
+      ParquetException);
+  ASSERT_NO_THROW(PrimitiveNode::Make(
+      "foo", Repetition::REQUIRED, Type::INT64, LogicalType::TIMESTAMP_MILLIS));
+  ASSERT_THROW(
+      PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::INT32, LogicalType::INT_64),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make(
+                   "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::INT_8),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make(
+                   "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::INTERVAL),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::ENUM),
+      ParquetException);
+  ASSERT_NO_THROW(PrimitiveNode::Make(
+      "foo", Repetition::REQUIRED, Type::BYTE_ARRAY, LogicalType::ENUM));
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 0, 2, 4),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED, Type::FLOAT,
+                   LogicalType::DECIMAL, 0, 2, 4),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 0, 4, 0),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 0, 4),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 4, -1),
+      ParquetException);
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 2, 4),
+      ParquetException);
+  ASSERT_NO_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 10, 6, 4));
+  ASSERT_NO_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
+  ASSERT_THROW(PrimitiveNode::Make("foo", Repetition::REQUIRED,
+                   Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 10),
+      ParquetException);
+}
+
+// ----------------------------------------------------------------------
+// Group node
+
+class TestGroupNode : public ::testing::Test {
+ public:
+  NodeVector Fields1() {
+    NodeVector fields;
+
+    fields.push_back(Int32("one", Repetition::REQUIRED));
+    fields.push_back(Int64("two"));
+    fields.push_back(Double("three"));
+
+    return fields;
+  }
+};
+
+TEST_F(TestGroupNode, Attrs) {
+  NodeVector fields = Fields1();
+
+  GroupNode node1("foo", Repetition::REPEATED, fields);
+  GroupNode node2("bar", Repetition::OPTIONAL, fields, LogicalType::LIST);
+
+  ASSERT_EQ("foo", node1.name());
+
+  ASSERT_TRUE(node1.is_group());
+  ASSERT_FALSE(node1.is_primitive());
+
+  ASSERT_EQ(fields.size(), node1.field_count());
+
+  ASSERT_TRUE(node1.is_repeated());
+  ASSERT_TRUE(node2.is_optional());
+
+  ASSERT_EQ(Repetition::REPEATED, node1.repetition());
+  ASSERT_EQ(Repetition::OPTIONAL, node2.repetition());
+
+  ASSERT_EQ(Node::GROUP, node1.node_type());
+
+  // logical types
+  ASSERT_EQ(LogicalType::NONE, node1.logical_type());
+  ASSERT_EQ(LogicalType::LIST, node2.logical_type());
+}
+
+TEST_F(TestGroupNode, Equals) {
+  NodeVector f1 = Fields1();
+  NodeVector f2 = Fields1();
+
+  GroupNode group1("group", Repetition::REPEATED, f1);
+  GroupNode group2("group", Repetition::REPEATED, f2);
+  GroupNode group3("group2", Repetition::REPEATED, f2);
+
+  // This is copied in the GroupNode ctor, so this is okay
+  f2.push_back(Float("four", Repetition::OPTIONAL));
+  GroupNode group4("group", Repetition::REPEATED, f2);
+  GroupNode group5("group", Repetition::REPEATED, Fields1());
+
+  ASSERT_TRUE(group1.Equals(&group1));
+  ASSERT_TRUE(group1.Equals(&group2));
+  ASSERT_FALSE(group1.Equals(&group3));
+
+  ASSERT_FALSE(group1.Equals(&group4));
+  ASSERT_FALSE(group5.Equals(&group4));
+}
+
+// ----------------------------------------------------------------------
+// Test convert group
+
+class TestSchemaConverter : public ::testing::Test {
+ public:
+  void setUp() { name_ = "parquet_schema"; }
+
+  void Convert(const parquet::format::SchemaElement* elements, int length) {
+    FlatSchemaConverter converter(elements, length);
+    node_ = converter.Convert();
+    ASSERT_TRUE(node_->is_group());
+    group_ = static_cast<const GroupNode*>(node_.get());
+  }
+
+ protected:
+  std::string name_;
+  const GroupNode* group_;
+  std::unique_ptr<Node> node_;
+};
+
+bool check_for_parent_consistency(const GroupNode* node) {
+  // Each node should have the group as parent
+  for (int i = 0; i < node->field_count(); i++) {
+    const NodePtr& field = node->field(i);
+    if (field->parent() != node) { return false; }
+    if (field->is_group()) {
+      const GroupNode* group = static_cast<GroupNode*>(field.get());
+      if (!check_for_parent_consistency(group)) { return false; }
+    }
+  }
+  return true;
+}
+
+TEST_F(TestSchemaConverter, NestedExample) {
+  SchemaElement elt;
+  std::vector<SchemaElement> elements;
+  elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));
+
+  // A primitive one
+  elements.push_back(
+      NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1));
+
+  // A group
+  elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2));
+
+  // 3-level list encoding, by hand
+  elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3);
+  elt.__set_converted_type(ConvertedType::LIST);
+  elements.push_back(elt);
+  elements.push_back(
+      NewPrimitive("item", FieldRepetitionType::OPTIONAL, format::Type::INT64, 4));
+
+  Convert(&elements[0], elements.size());
+
+  // Construct the expected schema
+  NodeVector fields;
+  fields.push_back(Int32("a", Repetition::REQUIRED));
+
+  // 3-level list encoding
+  NodePtr item = Int64("item");
+  NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST));
+  NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
+  fields.push_back(bag);
+
+  NodePtr schema = GroupNode::Make(name_, Repetition::REPEATED, fields);
+
+  ASSERT_TRUE(schema->Equals(group_));
+
+  // Check that the parent relationship in each node is consitent
+  ASSERT_EQ(group_->parent(), nullptr);
+  ASSERT_TRUE(check_for_parent_consistency(group_));
+}
+
+TEST_F(TestSchemaConverter, InvalidRoot) {
+  // According to the Parquet specification, the first element in the
+  // list<SchemaElement> is a group whose children (and their descendants)
+  // contain all of the rest of the flattened schema elements. If the first
+  // element is not a group, it is a malformed Parquet file.
+
+  SchemaElement elements[2];
+  elements[0] =
+      NewPrimitive("not-a-group", FieldRepetitionType::REQUIRED, format::Type::INT32, 0);
+  ASSERT_THROW(Convert(elements, 2), ParquetException);
+
+  // While the Parquet spec indicates that the root group should have REPEATED
+  // repetition type, some implementations may return REQUIRED or OPTIONAL
+  // groups as the first element. These tests check that this is okay as a
+  // practicality matter.
+  elements[0] = NewGroup("not-repeated", FieldRepetitionType::REQUIRED, 1, 0);
+  elements[1] = NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1);
+  Convert(elements, 2);
+
+  elements[0] = NewGroup("not-repeated", FieldRepetitionType::OPTIONAL, 1, 0);
+  Convert(elements, 2);
+}
+
+TEST_F(TestSchemaConverter, NotEnoughChildren) {
+  // Throw a ParquetException, but don't core dump or anything
+  SchemaElement elt;
+  std::vector<SchemaElement> elements;
+  elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));
+  ASSERT_THROW(Convert(&elements[0], 1), ParquetException);
+}
+
+// ----------------------------------------------------------------------
+// Schema tree flatten / unflatten
+
+class TestSchemaFlatten : public ::testing::Test {
+ public:
+  void setUp() { name_ = "parquet_schema"; }
+
+  void Flatten(const GroupNode* schema) { ToParquet(schema, &elements_); }
+
+ protected:
+  std::string name_;
+  std::vector<format::SchemaElement> elements_;
+};
+
+TEST_F(TestSchemaFlatten, DecimalMetadata) {
+  // Checks that DecimalMetadata is only set for DecimalTypes
+  NodePtr node = PrimitiveNode::Make(
+      "decimal", Repetition::REQUIRED, Type::INT64, LogicalType::DECIMAL, -1, 8, 4);
+  NodePtr group =
+      GroupNode::Make("group", Repetition::REPEATED, {node}, LogicalType::LIST);
+  Flatten(reinterpret_cast<GroupNode*>(group.get()));
+  ASSERT_EQ("decimal", elements_[1].name);
+  ASSERT_TRUE(elements_[1].__isset.precision);
+  ASSERT_TRUE(elements_[1].__isset.scale);
+
+  elements_.clear();
+  // Not for integers with no logical type
+  group =
+      GroupNode::Make("group", Repetition::REPEATED, {Int64("int64")}, LogicalType::LIST);
+  Flatten(reinterpret_cast<GroupNode*>(group.get()));
+  ASSERT_EQ("int64", elements_[1].name);
+  ASSERT_FALSE(elements_[0].__isset.precision);
+  ASSERT_FALSE(elements_[0].__isset.scale);
+}
+
+TEST_F(TestSchemaFlatten, NestedExample) {
+  SchemaElement elt;
+  std::vector<SchemaElement> elements;
+  elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));
+
+  // A primitive one
+  elements.push_back(
+      NewPrimitive("a", FieldRepetitionType::REQUIRED, format::Type::INT32, 1));
+
+  // A group
+  elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2));
+
+  // 3-level list encoding, by hand
+  elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3);
+  elt.__set_converted_type(ConvertedType::LIST);
+  elements.push_back(elt);
+  elements.push_back(
+      NewPrimitive("item", FieldRepetitionType::OPTIONAL, format::Type::INT64, 4));
+
+  // Construct the schema
+  NodeVector fields;
+  fields.push_back(Int32("a", Repetition::REQUIRED));
+
+  // 3-level list encoding
+  NodePtr item = Int64("item");
+  NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST));
+  NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
+  fields.push_back(bag);
+
+  NodePtr schema = GroupNode::Make(name_, Repetition::REPEATED, fields);
+
+  Flatten(static_cast<GroupNode*>(schema.get()));
+  ASSERT_EQ(elements_.size(), elements.size());
+  for (size_t i = 0; i < elements_.size(); i++) {
+    ASSERT_EQ(elements_[i], elements[i]);
+  }
+}
+
+TEST(TestColumnDescriptor, TestAttrs) {
+  NodePtr node = PrimitiveNode::Make(
+      "name", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8);
+  ColumnDescriptor descr(node, 4, 1);
+
+  ASSERT_EQ("name", descr.name());
+  ASSERT_EQ(4, descr.max_definition_level());
+  ASSERT_EQ(1, descr.max_repetition_level());
+
+  ASSERT_EQ(Type::BYTE_ARRAY, descr.physical_type());
+
+  ASSERT_EQ(-1, descr.type_length());
+
+  // Test FIXED_LEN_BYTE_ARRAY
+  node = PrimitiveNode::Make("name", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
+      LogicalType::DECIMAL, 12, 10, 4);
+  descr = ColumnDescriptor(node, 4, 1);
+
+  ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, descr.physical_type());
+  ASSERT_EQ(12, descr.type_length());
+}
+
+class TestSchemaDescriptor : public ::testing::Test {
+ public:
+  void setUp() {}
+
+ protected:
+  SchemaDescriptor descr_;
+};
+
+TEST_F(TestSchemaDescriptor, InitNonGroup) {
+  NodePtr node = PrimitiveNode::Make("field", Repetition::OPTIONAL, Type::INT32);
+
+  ASSERT_THROW(descr_.Init(node), ParquetException);
+}
+
+TEST_F(TestSchemaDescriptor, Equals) {
+  NodePtr schema;
+
+  NodePtr inta = Int32("a", Repetition::REQUIRED);
+  NodePtr intb = Int64("b", Repetition::OPTIONAL);
+  NodePtr intb2 = Int64("b2", Repetition::OPTIONAL);
+  NodePtr intc = ByteArray("c", Repetition::REPEATED);
+
+  NodePtr item1 = Int64("item1", Repetition::REQUIRED);
+  NodePtr item2 = Boolean("item2", Repetition::OPTIONAL);
+  NodePtr item3 = Int32("item3", Repetition::REPEATED);
+  NodePtr list(GroupNode::Make(
+      "records", Repetition::REPEATED, {item1, item2, item3}, LogicalType::LIST));
+
+  NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
+  NodePtr bag2(GroupNode::Make("bag", Repetition::REQUIRED, {list}));
+
+  SchemaDescriptor descr1;
+  descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag}));
+
+  ASSERT_TRUE(descr1.Equals(descr1));
+
+  SchemaDescriptor descr2;
+  descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag2}));
+  ASSERT_FALSE(descr1.Equals(descr2));
+
+  SchemaDescriptor descr3;
+  descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb2, intc, bag}));
+  ASSERT_FALSE(descr1.Equals(descr3));
+
+  // Robust to name of parent node
+  SchemaDescriptor descr4;
+  descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, {inta, intb, intc, bag}));
+  ASSERT_TRUE(descr1.Equals(descr4));
+
+  SchemaDescriptor descr5;
+  descr5.Init(
+      GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag, intb2}));
+  ASSERT_FALSE(descr1.Equals(descr5));
+
+  // Different max repetition / definition levels
+  ColumnDescriptor col1(inta, 5, 1);
+  ColumnDescriptor col2(inta, 6, 1);
+  ColumnDescriptor col3(inta, 5, 2);
+
+  ASSERT_TRUE(col1.Equals(col1));
+  ASSERT_FALSE(col1.Equals(col2));
+  ASSERT_FALSE(col1.Equals(col3));
+}
+
+TEST_F(TestSchemaDescriptor, BuildTree) {
+  NodeVector fields;
+  NodePtr schema;
+
+  NodePtr inta = Int32("a", Repetition::REQUIRED);
+  fields.push_back(inta);
+  fields.push_back(Int64("b", Repetition::OPTIONAL));
+  fields.push_back(ByteArray("c", Repetition::REPEATED));
+
+  // 3-level list encoding
+  NodePtr item1 = Int64("item1", Repetition::REQUIRED);
+  NodePtr item2 = Boolean("item2", Repetition::OPTIONAL);
+  NodePtr item3 = Int32("item3", Repetition::REPEATED);
+  NodePtr list(GroupNode::Make(
+      "records", Repetition::REPEATED, {item1, item2, item3}, LogicalType::LIST));
+  NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
+  fields.push_back(bag);
+
+  schema = GroupNode::Make("schema", Repetition::REPEATED, fields);
+
+  descr_.Init(schema);
+
+  int nleaves = 6;
+
+  // 6 leaves
+  ASSERT_EQ(nleaves, descr_.num_columns());
+
+  //                             mdef mrep
+  // required int32 a            0    0
+  // optional int64 b            1    0
+  // repeated byte_array c       1    1
+  // optional group bag          1    0
+  //   repeated group records    2    1
+  //     required int64 item1    2    1
+  //     optional boolean item2  3    1
+  //     repeated int32 item3    3    2
+  int16_t ex_max_def_levels[6] = {0, 1, 1, 2, 3, 3};
+  int16_t ex_max_rep_levels[6] = {0, 0, 1, 1, 1, 2};
+
+  for (int i = 0; i < nleaves; ++i) {
+    const ColumnDescriptor* col = descr_.Column(i);
+    EXPECT_EQ(ex_max_def_levels[i], col->max_definition_level()) << i;
+    EXPECT_EQ(ex_max_rep_levels[i], col->max_repetition_level()) << i;
+  }
+
+  ASSERT_EQ(descr_.Column(0)->path()->ToDotString(), "a");
+  ASSERT_EQ(descr_.Column(1)->path()->ToDotString(), "b");
+  ASSERT_EQ(descr_.Column(2)->path()->ToDotString(), "c");
+  ASSERT_EQ(descr_.Column(3)->path()->ToDotString(), "bag.records.item1");
+  ASSERT_EQ(descr_.Column(4)->path()->ToDotString(), "bag.records.item2");
+  ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3");
+
+  ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
+
+  ASSERT_EQ(schema.get(), descr_.group_node());
+
+  // Init clears the leaves
+  descr_.Init(schema);
+  ASSERT_EQ(nleaves, descr_.num_columns());
+}
+
+static std::string Print(const NodePtr& node) {
+  std::stringstream ss;
+  PrintSchema(node.get(), ss);
+  return ss.str();
+}
+
+TEST(TestSchemaPrinter, Examples) {
+  // Test schema 1
+  NodeVector fields;
+  fields.push_back(Int32("a", Repetition::REQUIRED));
+
+  // 3-level list encoding
+  NodePtr item1 = Int64("item1");
+  NodePtr item2 = Boolean("item2", Repetition::REQUIRED);
+  NodePtr list(
+      GroupNode::Make("b", Repetition::REPEATED, {item1, item2}, LogicalType::LIST));
+  NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
+  fields.push_back(bag);
+
+  fields.push_back(PrimitiveNode::Make(
+      "c", Repetition::REQUIRED, Type::INT32, LogicalType::DECIMAL, -1, 3, 2));
+
+  NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, fields);
+
+  std::string result = Print(schema);
+  std::string expected = R"(message schema {
+  required int32 a;
+  optional group bag {
+    repeated group b (LIST) {
+      optional int64 item1;
+      required boolean item2;
+    }
+  }
+  required int32 c (DECIMAL(3,2));
+}
+)";
+  ASSERT_EQ(expected, result);
+}
+
+}  // namespace schema
+}  // namespace parquet


Mime
View raw message