parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-1078: Add option to coerce Arrow timestamps to a particular unit
Date Mon, 07 Aug 2017 17:46:15 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master ea0f6e911 -> aa6a3c6e8


PARQUET-1078: Add option to coerce Arrow timestamps to a particular unit

See ARROW-622 and ARROW-1076. We have been coercing timestamp units on ingest to Arrow format,
rather than letting the data be whatever it is and converting only when writing to Parquet.
This adds an option to coerce via the `ArrowWriterProperties`, and we also will return error
Status if casting to a lower-resolution unit would lose data. We can later add an option to
allow unsafe casts should that be desired.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #380 from wesm/PARQUET-1078 and squashes the following commits:

2bb6d9f [Wes McKinney] Code reviews. Use CapWords for lambda functions
0d5ff4d [Wes McKinney] Account for offset in WriteTimestampsCoerce
17402cb [Wes McKinney] Add invalid value in null slot
94ecc65 [Wes McKinney] Only check for data loss when values are not null
7069757 [Wes McKinney] Implement timestamp unit coercion, raise when losing precision
b97b78b [Wes McKinney] Initial tests for timestamp coercion
f5ddb68 [Wes McKinney] Add schema logic for timestamp coercion
69bb26a [Wes McKinney] Refactor schema functions to enable more Arrow writer properties. Add
timestamp coercion functions


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/aa6a3c6e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/aa6a3c6e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/aa6a3c6e

Branch: refs/heads/master
Commit: aa6a3c6e87c3e61ffbb14c4b42738abea39c30f8
Parents: ea0f6e9
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Aug 7 13:46:10 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Mon Aug 7 13:46:10 2017 -0400

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 147 ++++++++++++-
 src/parquet/arrow/schema.cc                   | 108 ++++++----
 src/parquet/arrow/schema.h                    |  13 +-
 src/parquet/arrow/writer.cc                   | 233 ++++++++++++++-------
 src/parquet/arrow/writer.h                    |  31 ++-
 5 files changed, 400 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aa6a3c6e/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 911a1c1..2adda67 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -59,6 +59,8 @@ using parquet::schema::NodePtr;
 using parquet::schema::PrimitiveNode;
 using parquet::arrow::FromParquetSchema;
 
+using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>;
+
 namespace parquet {
 namespace arrow {
 
@@ -280,9 +282,9 @@ struct test_traits<::arrow::FixedSizeBinaryType> {
   static std::string const value;
 };
 
-const std::string test_traits<::arrow::StringType>::value("Test");
-const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");
-const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");
+const std::string test_traits<::arrow::StringType>::value("Test");              //
NOLINT
+const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");  //
NOLINT
+const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");    //
NOLINT
 template <typename T>
 using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 
@@ -993,6 +995,145 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
   ASSERT_TRUE(table->Equals(*result));
 }
 
+TEST(TestArrowReadWrite, CoerceTimestamps) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+
+  // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
+  std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
+                                    1489272001, 1489272000, 1489273000};
+  std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
+                                    1489272000001, 1489272000000, 1489273000000};
+  std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
+                                    1489271000000000LL, 1489272000000001LL,
+                                    1489272000000000LL, 1489273000000000LL};
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
+
+  // Input table, all data as is
+  auto s1 = std::shared_ptr<::arrow::Schema>(
+      new ::arrow::Schema({field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us),
+                           field("f_ns", t_ns)}));
+  auto input = std::make_shared<::arrow::Table>(
+      s1, ColumnVector({std::make_shared<Column>("f_s", a_s),
+                        std::make_shared<Column>("f_ms", a_ms),
+                        std::make_shared<Column>("f_us", a_us),
+                        std::make_shared<Column>("f_ns", a_ns)}));
+
+  // Result when coercing to milliseconds
+  auto s2 = std::shared_ptr<::arrow::Schema>(
+      new ::arrow::Schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms),
+                           field("f_ns", t_ms)}));
+  auto ex_milli_result = std::make_shared<::arrow::Table>(
+      s2, ColumnVector({std::make_shared<Column>("f_s", a_ms),
+                        std::make_shared<Column>("f_ms", a_ms),
+                        std::make_shared<Column>("f_us", a_ms),
+                        std::make_shared<Column>("f_ns", a_ms)}));
+
+  // Result when coercing to microseconds
+  auto s3 = std::shared_ptr<::arrow::Schema>(
+      new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
+                           field("f_ns", t_us)}));
+  auto ex_micro_result = std::make_shared<::arrow::Table>(
+      s3, ColumnVector({std::make_shared<Column>("f_s", a_us),
+                        std::make_shared<Column>("f_ms", a_us),
+                        std::make_shared<Column>("f_us", a_us),
+                        std::make_shared<Column>("f_ns", a_us)}));
+
+  std::shared_ptr<Table> milli_result;
+  DoSimpleRoundtrip(
+      input, 1, input->num_rows(), {}, &milli_result,
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
+  ASSERT_TRUE(milli_result->Equals(*ex_milli_result));
+
+  std::shared_ptr<Table> micro_result;
+  DoSimpleRoundtrip(
+      input, 1, input->num_rows(), {}, &micro_result,
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
+  ASSERT_TRUE(micro_result->Equals(*ex_micro_result));
+}
+
+TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+
+  // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
+  std::vector<int64_t> ms_values = {1489269001, 1489270001, 1489271001,
+                                    1489272001, 1489272001, 1489273001};
+  std::vector<int64_t> us_values = {1489269000001, 1489270000001, 1489271000001,
+                                    1489272000001, 1489272000001, 1489273000001};
+  std::vector<int64_t> ns_values = {1489269000000001LL, 1489270000000001LL,
+                                    1489271000000001LL, 1489272000000001LL,
+                                    1489272000000001LL, 1489273000000001LL};
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
+
+  auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s", t_s)}));
+  auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms", t_ms)}));
+  auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us", t_us)}));
+  auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns", t_ns)}));
+
+  auto c1 = std::make_shared<Column>("f_s", a_s);
+  auto c2 = std::make_shared<Column>("f_ms", a_ms);
+  auto c3 = std::make_shared<Column>("f_us", a_us);
+  auto c4 = std::make_shared<Column>("f_ns", a_ns);
+
+  auto t1 = std::make_shared<::arrow::Table>(s1, ColumnVector({c1}));
+  auto t2 = std::make_shared<::arrow::Table>(s2, ColumnVector({c2}));
+  auto t3 = std::make_shared<::arrow::Table>(s3, ColumnVector({c3}));
+  auto t4 = std::make_shared<::arrow::Table>(s4, ColumnVector({c4}));
+
+  auto sink = std::make_shared<InMemoryOutputStream>();
+
+  // OK to write to millis
+  auto coerce_millis =
+      (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
+  ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), coerce_millis));
+  ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), coerce_millis));
+
+  // Loss of precision
+  ASSERT_RAISES(Invalid, WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
+                                    default_writer_properties(), coerce_millis));
+  ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+                                    default_writer_properties(), coerce_millis));
+
+  // OK to write micros to micros
+  auto coerce_micros =
+      (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
+  ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), coerce_micros));
+
+  // Loss of precision
+  ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+                                    default_writer_properties(), coerce_micros));
+}
+
 TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   using ::arrow::ArrayFromVector;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aa6a3c6e/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index b0cde36..366f2c3 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -400,13 +400,12 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
 }
 
 Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string&
name,
-                  bool nullable, bool support_int96_nanoseconds,
-                  const WriterProperties& properties, NodePtr* out) {
+                  bool nullable, const WriterProperties& properties,
+                  const ArrowWriterProperties& arrow_properties, NodePtr* out) {
   Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
 
   NodePtr element;
-  RETURN_NOT_OK(
-      FieldToNode(type->value_field(), properties, &element, support_int96_nanoseconds));
+  RETURN_NOT_OK(FieldToNode(type->value_field(), properties, arrow_properties, &element));
 
   NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
   *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST);
@@ -415,23 +414,63 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>&
type, const std::str
 
 Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
                     const std::string& name, bool nullable,
-                    bool support_int96_nanoseconds, const WriterProperties& properties,
-                    NodePtr* out) {
+                    const WriterProperties& properties,
+                    const ArrowWriterProperties& arrow_properties, NodePtr* out) {
   Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
 
   std::vector<NodePtr> children(type->num_children());
   for (int i = 0; i < type->num_children(); i++) {
     RETURN_NOT_OK(
-        FieldToNode(type->child(i), properties, &children[i], support_int96_nanoseconds));
+        FieldToNode(type->child(i), properties, arrow_properties, &children[i]));
   }
 
   *out = GroupNode::Make(name, repetition, children);
   return Status::OK();
 }
 
+static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
+                                   const ArrowWriterProperties& properties,
+                                   ParquetType::type* physical_type,
+                                   LogicalType::type* logical_type) {
+  auto unit = type.unit();
+  *physical_type = ParquetType::INT64;
+
+  if (properties.coerce_timestamps_enabled()) {
+    auto coerce_unit = properties.coerce_timestamps_unit();
+    if (coerce_unit == ::arrow::TimeUnit::MILLI) {
+      *logical_type = LogicalType::TIMESTAMP_MILLIS;
+    } else if (coerce_unit == ::arrow::TimeUnit::MICRO) {
+      *logical_type = LogicalType::TIMESTAMP_MICROS;
+    } else {
+      return Status::NotImplemented(
+          "Can only coerce Arrow timestamps to milliseconds"
+          " or microseconds");
+    }
+    return Status::OK();
+  }
+
+  if (unit == ::arrow::TimeUnit::MILLI) {
+    *logical_type = LogicalType::TIMESTAMP_MILLIS;
+  } else if (unit == ::arrow::TimeUnit::MICRO) {
+    *logical_type = LogicalType::TIMESTAMP_MICROS;
+  } else if (unit == ::arrow::TimeUnit::NANO) {
+    if (properties.support_deprecated_int96_timestamps()) {
+      *physical_type = ParquetType::INT96;
+      // No corresponding logical type
+    } else {
+      *logical_type = LogicalType::TIMESTAMP_MICROS;
+    }
+  } else {
+    return Status::NotImplemented(
+        "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
+        "Parquet.");
+  }
+  return Status::OK();
+}
+
 Status FieldToNode(const std::shared_ptr<Field>& field,
-                   const WriterProperties& properties, NodePtr* out,
-                   bool support_int96_nanoseconds) {
+                   const WriterProperties& properties,
+                   const ArrowWriterProperties& arrow_properties, NodePtr* out) {
   LogicalType::type logical_type = LogicalType::NONE;
   ParquetType::type type;
   Repetition::type repetition =
@@ -507,29 +546,11 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       type = ParquetType::INT32;
       logical_type = LogicalType::DATE;
       break;
-    case ArrowType::TIMESTAMP: {
-      auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
-      auto unit = timestamp_type->unit();
-      if (unit == ::arrow::TimeUnit::MILLI) {
-        type = ParquetType::INT64;
-        logical_type = LogicalType::TIMESTAMP_MILLIS;
-      } else if (unit == ::arrow::TimeUnit::MICRO) {
-        type = ParquetType::INT64;
-        logical_type = LogicalType::TIMESTAMP_MICROS;
-      } else if (unit == ::arrow::TimeUnit::NANO) {
-        if (support_int96_nanoseconds) {
-          type = ParquetType::INT96;
-          // No corresponding logical type
-        } else {
-          type = ParquetType::INT64;
-          logical_type = LogicalType::TIMESTAMP_MICROS;
-        }
-      } else {
-        return Status::NotImplemented(
-            "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
-            "Parquet.");
-      }
-    } break;
+    case ArrowType::TIMESTAMP:
+      RETURN_NOT_OK(
+          GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()),
+                               arrow_properties, &type, &logical_type));
+      break;
     case ArrowType::TIME32:
       type = ParquetType::INT32;
       logical_type = LogicalType::TIME_MILLIS;
@@ -544,13 +565,13 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     } break;
     case ArrowType::STRUCT: {
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
-      return StructToNode(struct_type, field->name(), field->nullable(),
-                          support_int96_nanoseconds, properties, out);
+      return StructToNode(struct_type, field->name(), field->nullable(), properties,
+                          arrow_properties, out);
     } break;
     case ArrowType::LIST: {
       auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
-      return ListToNode(list_type, field->name(), field->nullable(),
-                        support_int96_nanoseconds, properties, out);
+      return ListToNode(list_type, field->name(), field->nullable(), properties,
+                        arrow_properties, out);
     } break;
     default:
       // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -562,12 +583,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
 
 Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
                        const WriterProperties& properties,
-                       std::shared_ptr<SchemaDescriptor>* out,
-                       bool support_int96_nanoseconds) {
+                       const ArrowWriterProperties& arrow_properties,
+                       std::shared_ptr<SchemaDescriptor>* out) {
   std::vector<NodePtr> nodes(arrow_schema->num_fields());
   for (int i = 0; i < arrow_schema->num_fields(); i++) {
-    RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i],
-                              support_int96_nanoseconds));
+    RETURN_NOT_OK(
+        FieldToNode(arrow_schema->field(i), properties, arrow_properties, &nodes[i]));
   }
 
   NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
@@ -577,5 +598,12 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
   return Status::OK();
 }
 
+Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
+                       const WriterProperties& properties,
+                       std::shared_ptr<SchemaDescriptor>* out) {
+  return ToParquetSchema(arrow_schema, properties, *default_arrow_writer_properties(),
+                         out);
+}
+
 }  // namespace arrow
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aa6a3c6e/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 7d1f27e..73e48f2 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -25,6 +25,7 @@
 
 #include "parquet/api/schema.h"
 #include "parquet/api/writer.h"
+#include "parquet/arrow/writer.h"
 
 namespace arrow {
 
@@ -69,13 +70,17 @@ FromParquetSchema(const SchemaDescriptor* parquet_schema,
 
 ::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>&
field,
                                            const WriterProperties& properties,
-                                           schema::NodePtr* out,
-                                           bool support_int96_nanoseconds = false);
+                                           const ArrowWriterProperties& arrow_properties,
+                                           schema::NodePtr* out);
+
+::arrow::Status PARQUET_EXPORT
+ToParquetSchema(const ::arrow::Schema* arrow_schema, const WriterProperties& properties,
+                const ArrowWriterProperties& arrow_properties,
+                std::shared_ptr<SchemaDescriptor>* out);
 
 ::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
                                                const WriterProperties& properties,
-                                               std::shared_ptr<SchemaDescriptor>* out,
-                                               bool support_int96_nanoseconds = false);
+                                               std::shared_ptr<SchemaDescriptor>* out);
 
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aa6a3c6e/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index b096e90..b3ed7dc 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -260,26 +260,30 @@ class FileWriter::Impl {
        const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
 
   Status NewRowGroup(int64_t chunk_size);
+
   template <typename ParquetType, typename ArrowType>
-  Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data,
+  Status TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr<Array>&
data,
+                         int64_t num_levels, const int16_t* def_levels,
+                         const int16_t* rep_levels);
+
+  Status WriteTimestamps(ColumnWriter* column_writer, const std::shared_ptr<Array>&
data,
                          int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels);
 
-  Status TypedWriteBatchConvertedNanos(ColumnWriter* writer,
-                                       const std::shared_ptr<Array>& data,
-                                       int64_t num_levels, const int16_t* def_levels,
-                                       const int16_t* rep_levels);
+  Status WriteTimestampsCoerce(ColumnWriter* column_writer,
+                               const std::shared_ptr<Array>& data, int64_t num_levels,
+                               const int16_t* def_levels, const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
+  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
                                const ArrowType& type, int64_t num_values,
                                int64_t num_levels, const int16_t* def_levels,
                                const int16_t* rep_levels,
                                const typename ArrowType::c_type* data_ptr);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, const ArrowType&
type,
-                            int64_t num_values, int64_t num_levels,
+  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
+                            const ArrowType& type, int64_t num_values, int64_t num_levels,
                             const int16_t* def_levels, const int16_t* rep_levels,
                             const uint8_t* valid_bits, int64_t valid_bits_offset,
                             const typename ArrowType::c_type* data_ptr);
@@ -317,6 +321,9 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
   return Status::OK();
 }
 
+// ----------------------------------------------------------------------
+// Column type specialization
+
 template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
                                          const std::shared_ptr<Array>& array,
@@ -325,7 +332,7 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
   using ArrowCType = typename ArrowType::c_type;
 
   auto data = static_cast<const PrimitiveArray*>(array.get());
-  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->values()->data());
+  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->raw_values());
   auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
@@ -391,25 +398,6 @@ Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
   return Status::OK();
 }
 
-template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
-    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const int64_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
-  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
-  if (type.unit() == TimeUnit::NANO) {
-    for (int i = 0; i < num_values; i++) {
-      internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
-    }
-  } else {
-    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
-  }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  return Status::OK();
-}
-
 #define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)         \
   template <>                                                              \
   Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(  \
@@ -424,7 +412,6 @@ Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType
 NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
 NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
@@ -507,6 +494,31 @@ Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
   return Status::OK();
 }
 
+#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
+  template <>                                                                     
    \
+  Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(            
    \
+      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,         
        \
+      int64_t num_values, int64_t num_levels, const int16_t* def_levels,               \
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
+      const CType* data_ptr) {                                                         \
+    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                   
 \
+        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
+    return Status::OK();                                                               \
+  }
+
+NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
+NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+
+// ----------------------------------------------------------------------
+// Write timestamps
+
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+
 template <>
 Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
     TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
@@ -533,65 +545,133 @@ Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
   return Status::OK();
 }
 
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
-  template <>                                                                     
    \
-  Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(            
    \
-      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,         
        \
-      int64_t num_values, int64_t num_levels, const int16_t* def_levels,               \
-      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
-      const CType* data_ptr) {                                                         \
-    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                   
 \
-        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
-    return Status::OK();                                                               \
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
+    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const int64_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
+  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+  if (type.unit() == TimeUnit::NANO) {
+    for (int i = 0; i < num_values; i++) {
+      internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+    }
+  } else {
+    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  return Status::OK();
+}
 
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
-NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+Status FileWriter::Impl::WriteTimestamps(ColumnWriter* column_writer,
+                                         const std::shared_ptr<Array>& values,
+                                         int64_t num_levels, const int16_t* def_levels,
+                                         const int16_t* rep_levels) {
+  const auto& type = static_cast<::arrow::TimestampType&>(*values->type());
+
+  const bool is_nanosecond = type.unit() == TimeUnit::NANO;
+
+  if (is_nanosecond && arrow_properties_->support_deprecated_int96_timestamps())
{
+    return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
+        column_writer, values, num_levels, def_levels, rep_levels);
+  } else if (is_nanosecond ||
+             (arrow_properties_->coerce_timestamps_enabled() &&
+              (type.unit() != arrow_properties_->coerce_timestamps_unit()))) {
+    // Casting is required. This covers several cases
+    // * Nanoseconds -> cast to microseconds
+    // * coerce_timestamps_enabled_, cast all timestamps to requested unit
+    return WriteTimestampsCoerce(column_writer, values, num_levels, def_levels,
+                                 rep_levels);
+  } else {
+    // No casting of timestamps is required, take the fast path
+    return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
+        column_writer, values, num_levels, def_levels, rep_levels);
+  }
+}
 
-Status FileWriter::Impl::TypedWriteBatchConvertedNanos(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
+Status FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
+                                               const std::shared_ptr<Array>& array,
+                                               int64_t num_levels,
+                                               const int16_t* def_levels,
+                                               const int16_t* rep_levels) {
   // Note that we can only use data_buffer_ here as we write timestamps with the fast
   // path.
   RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
   int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
 
-  auto data = static_cast<const NumericArray<::arrow::TimestampType>*>(array.get());
-  auto data_ptr = reinterpret_cast<const int64_t*>(data->values()->data());
+  const auto& data = static_cast<const ::arrow::TimestampArray&>(*array);
+
+  // TimestampArray::raw_values accounts for offset
+  auto data_ptr = data.raw_values();
   auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
 
-  // Convert nanoseconds to microseconds
-  for (int64_t i = 0; i < array->length(); i++) {
-    data_buffer_ptr[i] = data_ptr[i] / 1000;
+  const auto& type = static_cast<const ::arrow::TimestampType&>(*array->type());
+
+  TimeUnit::type target_unit = arrow_properties_->coerce_timestamps_enabled()
+                                   ? arrow_properties_->coerce_timestamps_unit()
+                                   : TimeUnit::MICRO;
+  auto target_type = ::arrow::timestamp(target_unit);
+
+  auto DivideBy = [&](const int64_t factor) {
+    for (int64_t i = 0; i < array->length(); i++) {
+      if (!data.IsNull(i) && (data_ptr[i] % factor != 0)) {
+        std::stringstream ss;
+        ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
+           << " would lose data: " << data_ptr[i];
+        return Status::Invalid(ss.str());
+      }
+      data_buffer_ptr[i] = data_ptr[i] / factor;
+    }
+    return Status::OK();
+  };
+
+  auto MultiplyBy = [&](const int64_t factor) {
+    for (int64_t i = 0; i < array->length(); i++) {
+      data_buffer_ptr[i] = data_ptr[i] * factor;
+    }
+    return Status::OK();
+  };
+
+  if (type.unit() == TimeUnit::NANO) {
+    if (target_unit == TimeUnit::MICRO) {
+      RETURN_NOT_OK(DivideBy(1000));
+    } else {
+      DCHECK_EQ(TimeUnit::MILLI, target_unit);
+      RETURN_NOT_OK(DivideBy(1000000));
+    }
+  } else if (type.unit() == TimeUnit::SECOND) {
+    RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
+  } else if (type.unit() == TimeUnit::MILLI) {
+    DCHECK_EQ(TimeUnit::MICRO, target_unit);
+    RETURN_NOT_OK(MultiplyBy(1000));
+  } else {
+    DCHECK_EQ(TimeUnit::MILLI, target_unit);
+    RETURN_NOT_OK(DivideBy(1000));
   }
 
-  std::shared_ptr<::arrow::TimestampType> type =
-      std::static_pointer_cast<::arrow::TimestampType>(
-          ::arrow::timestamp(::arrow::TimeUnit::MICRO));
-  if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
+  if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0))
{
     // no nulls, just dump the data
     RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
-        writer, *type, array->length(), num_levels, def_levels, rep_levels,
-        data_buffer_ptr)));
+        writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
+        num_levels, def_levels, rep_levels, data_buffer_ptr)));
   } else {
-    const uint8_t* valid_bits = data->null_bitmap_data();
+    const uint8_t* valid_bits = data.null_bitmap_data();
     RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
-        writer, *type, array->length(), num_levels, def_levels, rep_levels, valid_bits,
-        data->offset(), data_buffer_ptr)));
+        writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
+        num_levels, def_levels, rep_levels, valid_bits, data.offset(), data_buffer_ptr)));
   }
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
 
+// ----------------------------------------------------------------------
+
 // This specialization seems quite similar but it significantly differs in two points:
 // * offset is added at the most latest time to the pointer as we have sub-byte access
 // * Arrow data is stored bitwise thus we cannot use std::copy to transform from
 //   ArrowType::c_type to ParquetType::c_type
+
 template <>
 Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
     ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
@@ -642,6 +722,8 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
     DCHECK(data_ptr != nullptr);
   }
   auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
+
+  // Slice offset is accounted for in raw_value_offsets
   const int32_t* value_offset = data->raw_value_offsets();
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
@@ -699,6 +781,9 @@ Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>
   return Status::OK();
 }
 
+// End of column type specializations
+// ----------------------------------------------------------------------
+
 Status FileWriter::Impl::Close() {
   if (row_group_writer_ != nullptr) {
     PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
@@ -760,21 +845,9 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
       }
     }
       WRITE_BATCH_CASE(NA, NullType, Int32Type)
-    case ::arrow::Type::TIMESTAMP: {
-      auto timestamp_type =
-          static_cast<::arrow::TimestampType*>(values_array->type().get());
-      if (timestamp_type->unit() == ::arrow::TimeUnit::NANO &&
-          arrow_properties_->support_deprecated_int96_timestamps()) {
-        return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
-            column_writer, values_array, num_levels, def_levels, rep_levels);
-      } else if (timestamp_type->unit() == ::arrow::TimeUnit::NANO) {
-        return TypedWriteBatchConvertedNanos(column_writer, values_array, num_levels,
-                                             def_levels, rep_levels);
-      } else {
-        return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
-            column_writer, values_array, num_levels, def_levels, rep_levels);
-      }
-    }
+    case ::arrow::Type::TIMESTAMP:
+      return WriteTimestamps(column_writer, values_array, num_levels, def_levels,
+                             rep_levels);
       WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
       WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
       WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -830,8 +903,8 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool*
pool
                         const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                         std::unique_ptr<FileWriter>* writer) {
   std::shared_ptr<SchemaDescriptor> parquet_schema;
-  RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema,
-                                arrow_properties->support_deprecated_int96_timestamps()));
+  RETURN_NOT_OK(
+      ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));
 
   auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/aa6a3c6e/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index a74f263..24ba72d 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -24,6 +24,7 @@
 #include "parquet/api/writer.h"
 
 #include "arrow/io/interfaces.h"
+#include "arrow/type.h"
 
 namespace arrow {
 
@@ -44,7 +45,7 @@ class PARQUET_EXPORT ArrowWriterProperties {
  public:
   class Builder {
    public:
-    Builder() : write_nanos_as_int96_(false) {}
+    Builder() : write_nanos_as_int96_(false), coerce_timestamps_enabled_(false) {}
     virtual ~Builder() {}
 
     Builder* disable_deprecated_int96_timestamps() {
@@ -57,22 +58,42 @@ class PARQUET_EXPORT ArrowWriterProperties {
       return this;
     }
 
+    Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
+      coerce_timestamps_enabled_ = true;
+      coerce_timestamps_unit_ = unit;
+      return this;
+    }
+
     std::shared_ptr<ArrowWriterProperties> build() {
-      return std::shared_ptr<ArrowWriterProperties>(
-          new ArrowWriterProperties(write_nanos_as_int96_));
+      return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
+          write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_));
     }
 
    private:
     bool write_nanos_as_int96_;
+
+    bool coerce_timestamps_enabled_;
+    ::arrow::TimeUnit::type coerce_timestamps_unit_;
   };
 
   bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
 
+  bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
+  ::arrow::TimeUnit::type coerce_timestamps_unit() const {
+    return coerce_timestamps_unit_;
+  }
+
  private:
-  explicit ArrowWriterProperties(bool write_nanos_as_int96)
-      : write_nanos_as_int96_(write_nanos_as_int96) {}
+  explicit ArrowWriterProperties(bool write_nanos_as_int96,
+                                 bool coerce_timestamps_enabled,
+                                 ::arrow::TimeUnit::type coerce_timestamps_unit)
+      : write_nanos_as_int96_(write_nanos_as_int96),
+        coerce_timestamps_enabled_(coerce_timestamps_enabled),
+        coerce_timestamps_unit_(coerce_timestamps_unit) {}
 
   const bool write_nanos_as_int96_;
+  const bool coerce_timestamps_enabled_;
+  const ::arrow::TimeUnit::type coerce_timestamps_unit_;
 };
 
 std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();


Mime
View raw message