arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-786: [Format] In-memory format for 128-bit Decimals, handling of sign bit
Date Thu, 24 Aug 2017 17:57:47 GMT
Repository: arrow
Updated Branches:
  refs/heads/master b31269726 -> 750b77dc6


ARROW-786: [Format] In-memory format for 128-bit Decimals, handling of sign bit

* Reimplement Decimal128 types to use the Int128 type as the underlying integer
representation, adapted from the Apache ORC project's C++ in memory format.
This enables us to write integration tests and results in an in-memory
Decimal128 format that is compatible with the Java implementation
* Additionaly, this PR also fixes Decimal slice comparison and adds related
regression tests
* Follow-ups include ARROW-695 (C++ Decimal integration tests), ARROW-696 (JSON
read/write support for decimals) and ARROW-1238 (Java Decimal integration
tests).

Author: Phillip Cloud <cpcloud@gmail.com>

Closes #981 from cpcloud/decimal-rewrite and squashes the following commits:

53ce04b6 [Phillip Cloud] Formatting
fe13ef36 [Phillip Cloud] Remove redundant constructor
86db1841 [Phillip Cloud] Subclass from FixedSizeBinaryArray for code reuse
535f9ff4 [Phillip Cloud] Use a macro for cases
1cc43cee [Phillip Cloud] Use CHAR_BIT
355fb24c [Phillip Cloud] Include the correct header for _BitScanReverse
b53d7cdd [Phillip Cloud] Share comparison code
162eeeb7 [Phillip Cloud] BUG: Double export
b98c8942 [Phillip Cloud] BUG: Export symbols
be220c83 [Phillip Cloud] Cast so we have enough space to contain the integer
57160108 [Phillip Cloud] Cast 18 to matching type size_t for msvc
88339041 [Phillip Cloud] Remove unnecessary args to sto* function calls
628ce85c [Phillip Cloud] Fix more docs
e4a17926 [Phillip Cloud] More const
8ecb3157 [Phillip Cloud] Formatting
178d3f20 [Phillip Cloud] NOLINT for MSVC specific and necessary types
38c9b506 [Phillip Cloud] Fix doc style in int128.h and add const where possible
2930d7bc [Phillip Cloud] Fix naming convention in decimal-test.cc
1eab5c4c [Phillip Cloud] Remove unnecessary header from CMakeLists.txt
22eda4b0 [Phillip Cloud] kMaximumPrecision
9af97d88 [Phillip Cloud] MSVC fix
349dc58b [Phillip Cloud] ARROW-786: [Format] In-memory format for 128-bit Decimals, handling of sign bit


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/750b77dc
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/750b77dc
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/750b77dc

Branch: refs/heads/master
Commit: 750b77dc6360464aa3621722b5ee6530b761391c
Parents: b312697
Author: Phillip Cloud <cpcloud@gmail.com>
Authored: Thu Aug 24 13:57:42 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Aug 24 13:57:42 2017 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                      |   1 +
 cpp/src/arrow/array-decimal-test.cc     |  92 +++--
 cpp/src/arrow/array.cc                  |  64 +---
 cpp/src/arrow/array.h                   |  37 +-
 cpp/src/arrow/builder.cc                |  57 +--
 cpp/src/arrow/builder.h                 |   8 -
 cpp/src/arrow/compare.cc                | 127 ++-----
 cpp/src/arrow/python/arrow_to_pandas.cc |  64 ++--
 cpp/src/arrow/python/python-test.cc     |   4 -
 cpp/src/arrow/type.cc                   |   4 -
 cpp/src/arrow/type.h                    |   1 -
 cpp/src/arrow/util/CMakeLists.txt       |   7 +-
 cpp/src/arrow/util/decimal-test.cc      |  48 ++-
 cpp/src/arrow/util/decimal.cc           |  41 +--
 cpp/src/arrow/util/decimal.h            |  26 +-
 cpp/src/arrow/util/int128.cc            | 527 +++++++++++++++++++++++++++
 cpp/src/arrow/util/int128.h             | 128 +++++++
 17 files changed, 846 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index b55a9bb..6c56103 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -742,6 +742,7 @@ set(ARROW_SRCS
   src/arrow/util/compression.cc
   src/arrow/util/cpu-info.cc
   src/arrow/util/decimal.cc
+  src/arrow/util/int128.cc
   src/arrow/util/key_value_metadata.cc
 )
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/array-decimal-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-decimal-test.cc b/cpp/src/arrow/array-decimal-test.cc
index e94ba48..37852a3 100644
--- a/cpp/src/arrow/array-decimal-test.cc
+++ b/cpp/src/arrow/array-decimal-test.cc
@@ -28,56 +28,66 @@ namespace decimal {
 template <typename T>
 class DecimalTestBase {
  public:
-  virtual std::vector<uint8_t> data(const std::vector<T>& input,
-                                    size_t byte_width) const = 0;
+  DecimalTestBase() : pool_(default_memory_pool()) {}
 
-  void test(int precision, const std::vector<T>& draw,
-            const std::vector<uint8_t>& valid_bytes,
-            const std::vector<uint8_t>& sign_bitmap = {}, int64_t offset = 0) const {
-    auto type = std::make_shared<DecimalType>(precision, 4);
-    int byte_width = type->byte_width();
-    auto pool = default_memory_pool();
-    auto builder = std::make_shared<DecimalBuilder>(type, pool);
-    size_t null_count = 0;
+  virtual std::vector<uint8_t> MakeData(const std::vector<T>& input,
+                                        size_t byte_width) const = 0;
+
+  void InitBuilder(const std::shared_ptr<DecimalType>& type, const std::vector<T>& draw,
+                   const std::vector<uint8_t>& valid_bytes, int byte_width,
+                   std::shared_ptr<DecimalBuilder>* builder, size_t* null_count) const {
+    *builder = std::make_shared<DecimalBuilder>(type, pool_);
 
     size_t size = draw.size();
-    ASSERT_OK(builder->Reserve(size));
+    ASSERT_OK((*builder)->Reserve(size));
 
     for (size_t i = 0; i < size; ++i) {
       if (valid_bytes[i]) {
-        ASSERT_OK(builder->Append(draw[i]));
+        ASSERT_OK((*builder)->Append(draw[i]));
       } else {
-        ASSERT_OK(builder->AppendNull());
-        ++null_count;
+        ASSERT_OK((*builder)->AppendNull());
+        ++*null_count;
       }
     }
+  }
 
-    std::shared_ptr<Buffer> expected_sign_bitmap;
-    if (!sign_bitmap.empty()) {
-      ASSERT_OK(BitUtil::BytesToBits(sign_bitmap, &expected_sign_bitmap));
-    }
+  void TestCreate(int precision, const std::vector<T>& draw,
+                  const std::vector<uint8_t>& valid_bytes, int64_t offset) const {
+    auto type = std::make_shared<DecimalType>(precision, 4);
 
-    auto raw_bytes = data(draw, byte_width);
+    std::shared_ptr<DecimalBuilder> builder;
+
+    size_t null_count = 0;
+
+    const size_t size = draw.size();
+    const int byte_width = type->byte_width();
+
+    InitBuilder(type, draw, valid_bytes, byte_width, &builder, &null_count);
+
+    auto raw_bytes = MakeData(draw, static_cast<size_t>(byte_width));
     auto expected_data = std::make_shared<Buffer>(raw_bytes.data(), size * byte_width);
     std::shared_ptr<Buffer> expected_null_bitmap;
     ASSERT_OK(BitUtil::BytesToBits(valid_bytes, &expected_null_bitmap));
 
     int64_t expected_null_count = test::null_count(valid_bytes);
-    auto expected =
-        std::make_shared<DecimalArray>(type, size, expected_data, expected_null_bitmap,
-                                       expected_null_count, offset, expected_sign_bitmap);
+    auto expected = std::make_shared<DecimalArray>(
+        type, size, expected_data, expected_null_bitmap, expected_null_count, 0);
 
     std::shared_ptr<Array> out;
     ASSERT_OK(builder->Finish(&out));
-    ASSERT_TRUE(out->Equals(*expected));
+    ASSERT_TRUE(out->Slice(offset)->Equals(
+        *expected->Slice(offset, expected->length() - offset)));
   }
+
+ private:
+  MemoryPool* pool_;
 };
 
 template <typename T>
 class DecimalTest : public DecimalTestBase<T> {
  public:
-  std::vector<uint8_t> data(const std::vector<T>& input,
-                            size_t byte_width) const override {
+  std::vector<uint8_t> MakeData(const std::vector<T>& input,
+                                size_t byte_width) const override {
     std::vector<uint8_t> result(input.size() * byte_width);
     // TODO(phillipc): There's probably a better way to do this
     constexpr static const size_t bytes_per_element = sizeof(T);
@@ -91,16 +101,15 @@ class DecimalTest : public DecimalTestBase<T> {
 template <>
 class DecimalTest<Decimal128> : public DecimalTestBase<Decimal128> {
  public:
-  std::vector<uint8_t> data(const std::vector<Decimal128>& input,
-                            size_t byte_width) const override {
+  std::vector<uint8_t> MakeData(const std::vector<Decimal128>& input,
+                                size_t byte_width) const override {
     std::vector<uint8_t> result;
     result.reserve(input.size() * byte_width);
     constexpr static const size_t bytes_per_element = 16;
     for (size_t i = 0; i < input.size(); ++i) {
       uint8_t stack_bytes[bytes_per_element] = {0};
       uint8_t* bytes = stack_bytes;
-      bool is_negative;
-      ToBytes(input[i], &bytes, &is_negative);
+      ToBytes(input[i], &bytes);
 
       for (size_t i = 0; i < bytes_per_element; ++i) {
         result.push_back(bytes[i]);
@@ -124,7 +133,8 @@ TEST_P(Decimal32BuilderTest, NoNulls) {
   std::vector<Decimal32> draw = {Decimal32(1), Decimal32(2), Decimal32(2389),
                                  Decimal32(4), Decimal32(-12348)};
   std::vector<uint8_t> valid_bytes = {true, true, true, true, true};
-  this->test(precision, draw, valid_bytes);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 TEST_P(Decimal64BuilderTest, NoNulls) {
@@ -132,7 +142,8 @@ TEST_P(Decimal64BuilderTest, NoNulls) {
   std::vector<Decimal64> draw = {Decimal64(1), Decimal64(2), Decimal64(2389),
                                  Decimal64(4), Decimal64(-12348)};
   std::vector<uint8_t> valid_bytes = {true, true, true, true, true};
-  this->test(precision, draw, valid_bytes);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 TEST_P(Decimal128BuilderTest, NoNulls) {
@@ -140,8 +151,8 @@ TEST_P(Decimal128BuilderTest, NoNulls) {
   std::vector<Decimal128> draw = {Decimal128(1), Decimal128(-2), Decimal128(2389),
                                   Decimal128(4), Decimal128(-12348)};
   std::vector<uint8_t> valid_bytes = {true, true, true, true, true};
-  std::vector<uint8_t> sign_bitmap = {false, true, false, false, true};
-  this->test(precision, draw, valid_bytes, sign_bitmap);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 TEST_P(Decimal32BuilderTest, WithNulls) {
@@ -149,7 +160,8 @@ TEST_P(Decimal32BuilderTest, WithNulls) {
   std::vector<Decimal32> draw = {Decimal32(1), Decimal32(2), Decimal32(-1), Decimal32(4),
                                  Decimal32(-1)};
   std::vector<uint8_t> valid_bytes = {true, true, false, true, false};
-  this->test(precision, draw, valid_bytes);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 TEST_P(Decimal64BuilderTest, WithNulls) {
@@ -157,7 +169,8 @@ TEST_P(Decimal64BuilderTest, WithNulls) {
   std::vector<Decimal64> draw = {Decimal64(-1), Decimal64(2), Decimal64(-1), Decimal64(4),
                                  Decimal64(-1)};
   std::vector<uint8_t> valid_bytes = {true, true, false, true, false};
-  this->test(precision, draw, valid_bytes);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 TEST_P(Decimal128BuilderTest, WithNulls) {
@@ -173,9 +186,8 @@ TEST_P(Decimal128BuilderTest, WithNulls) {
                                   Decimal128("-23049302932.235234")};
   std::vector<uint8_t> valid_bytes = {true, true, false, true, false,
                                       true, true, true,  true};
-  std::vector<uint8_t> sign_bitmap = {false, false, false, false, false,
-                                      false, false, false, true};
-  this->test(precision, draw, valid_bytes, sign_bitmap);
+  this->TestCreate(precision, draw, valid_bytes, 0);
+  this->TestCreate(precision, draw, valid_bytes, 2);
 }
 
 INSTANTIATE_TEST_CASE_P(Decimal32BuilderTest, Decimal32BuilderTest,
@@ -185,8 +197,8 @@ INSTANTIATE_TEST_CASE_P(Decimal64BuilderTest, Decimal64BuilderTest,
                         ::testing::Range(DecimalPrecision<int64_t>::minimum,
                                          DecimalPrecision<int64_t>::maximum));
 INSTANTIATE_TEST_CASE_P(Decimal128BuilderTest, Decimal128BuilderTest,
-                        ::testing::Range(DecimalPrecision<int128_t>::minimum,
-                                         DecimalPrecision<int128_t>::maximum));
+                        ::testing::Range(DecimalPrecision<Int128>::minimum,
+                                         DecimalPrecision<Int128>::maximum));
 
 }  // namespace decimal
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 637eb24..9fbadfe 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -161,7 +161,7 @@ PrimitiveArray::PrimitiveArray(const std::shared_ptr<DataType>& type, int64_t le
 
 const uint8_t* PrimitiveArray::raw_values() const {
   return raw_values_ +
-         offset() * static_cast<const FixedWidthType&>(*type()).bit_width() / 8;
+         offset() * static_cast<const FixedWidthType&>(*type()).bit_width() / CHAR_BIT;
 }
 
 template <typename T>
@@ -323,7 +323,6 @@ std::shared_ptr<Array> StringArray::Slice(int64_t offset, int64_t length) const
 
 FixedSizeBinaryArray::FixedSizeBinaryArray(
     const std::shared_ptr<internal::ArrayData>& data) {
-  DCHECK_EQ(data->type->id(), Type::FIXED_SIZE_BINARY);
   SetData(data);
 }
 
@@ -346,61 +345,30 @@ const uint8_t* FixedSizeBinaryArray::GetValue(int64_t i) const {
 // ----------------------------------------------------------------------
 // Decimal
 
-DecimalArray::DecimalArray(const std::shared_ptr<internal::ArrayData>& data) {
+DecimalArray::DecimalArray(const std::shared_ptr<internal::ArrayData>& data)
+    : FixedSizeBinaryArray(data) {
   DCHECK_EQ(data->type->id(), Type::DECIMAL);
-  SetData(data);
-}
-
-void DecimalArray::SetData(const std::shared_ptr<ArrayData>& data) {
-  auto fixed_size_data = data->buffers[1];
-  auto sign_bitmap = data->buffers[2];
-  this->Array::SetData(data);
-
-  raw_values_ = fixed_size_data != nullptr ? fixed_size_data->data() : nullptr;
-  sign_bitmap_data_ = sign_bitmap != nullptr ? sign_bitmap->data() : nullptr;
 }
 
-DecimalArray::DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
-                           const std::shared_ptr<Buffer>& data,
-                           const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count,
-                           int64_t offset, const std::shared_ptr<Buffer>& sign_bitmap) {
-  BufferVector buffers = {null_bitmap, data, sign_bitmap};
-  SetData(
-      std::make_shared<ArrayData>(type, length, std::move(buffers), null_count, offset));
-}
-
-bool DecimalArray::IsNegative(int64_t i) const {
-  return sign_bitmap_data_ != nullptr ? BitUtil::GetBit(sign_bitmap_data_, i) : false;
-}
-
-const uint8_t* DecimalArray::GetValue(int64_t i) const {
-  return raw_values_ + (i + data_->offset) * byte_width();
-}
+#define DECIMAL_TO_STRING_CASE(bits, bytes, precision, scale) \
+  case bits: {                                                \
+    decimal::Decimal##bits value;                             \
+    decimal::FromBytes((bytes), &value);                      \
+    return decimal::ToString(value, (precision), (scale));    \
+  }
 
 std::string DecimalArray::FormatValue(int64_t i) const {
   const auto& type_ = static_cast<const DecimalType&>(*type());
   const int precision = type_.precision();
   const int scale = type_.scale();
-  const int byte_width = type_.byte_width();
-  const uint8_t* bytes = raw_values_ + (i + data_->offset) * byte_width;
-  switch (byte_width) {
-    case 4: {
-      decimal::Decimal32 value;
-      decimal::FromBytes(bytes, &value);
-      return decimal::ToString(value, precision, scale);
-    }
-    case 8: {
-      decimal::Decimal64 value;
-      decimal::FromBytes(bytes, &value);
-      return decimal::ToString(value, precision, scale);
-    }
-    case 16: {
-      decimal::Decimal128 value;
-      decimal::FromBytes(bytes, IsNegative(i), &value);
-      return decimal::ToString(value, precision, scale);
-    }
+  const int bit_width = type_.bit_width();
+  const uint8_t* bytes = GetValue(i);
+  switch (bit_width) {
+    DECIMAL_TO_STRING_CASE(32, bytes, precision, scale)
+    DECIMAL_TO_STRING_CASE(64, bytes, precision, scale)
+    DECIMAL_TO_STRING_CASE(128, bytes, precision, scale)
     default: {
-      DCHECK(false) << "Invalid byte width: " << byte_width;
+      DCHECK(false) << "Invalid bit width: " << bit_width;
       return "";
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 777fbe0..f9f1e31 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -521,8 +521,6 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
 
   int32_t byte_width() const { return byte_width_; }
 
-  const uint8_t* raw_values() const { return raw_values_ + byte_width_ * data_->offset; }
-
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
 
  protected:
@@ -536,45 +534,18 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {
 
 // ----------------------------------------------------------------------
 // DecimalArray
-class ARROW_EXPORT DecimalArray : public FlatArray {
+class ARROW_EXPORT DecimalArray : public FixedSizeBinaryArray {
  public:
-  using TypeClass = Type;
+  using TypeClass = DecimalType;
+
+  using FixedSizeBinaryArray::FixedSizeBinaryArray;
 
   /// \brief Construct DecimalArray from internal::ArrayData instance
   explicit DecimalArray(const std::shared_ptr<internal::ArrayData>& data);
 
-  DecimalArray(const std::shared_ptr<DataType>& type, int64_t length,
-               const std::shared_ptr<Buffer>& data,
-               const std::shared_ptr<Buffer>& null_bitmap = nullptr,
-               int64_t null_count = 0, int64_t offset = 0,
-               const std::shared_ptr<Buffer>& sign_bitmap = nullptr);
-
-  bool IsNegative(int64_t i) const;
-
-  const uint8_t* GetValue(int64_t i) const;
-
   std::string FormatValue(int64_t i) const;
 
   std::shared_ptr<Array> Slice(int64_t offset, int64_t length) const override;
-
-  /// \brief The main decimal data
-  /// For 32/64-bit decimal this is everything
-  std::shared_ptr<Buffer> values() const { return data_->buffers[1]; }
-
-  /// Only needed for 128 bit Decimals
-  std::shared_ptr<Buffer> sign_bitmap() const { return data_->buffers[2]; }
-
-  int32_t byte_width() const {
-    return static_cast<const DecimalType&>(*type()).byte_width();
-  }
-
-  /// \brief Return pointer to value data, accounting for any offset
-  const uint8_t* raw_values() const { return raw_values_ + byte_width() * data_->offset; }
-
- private:
-  void SetData(const std::shared_ptr<internal::ArrayData>& data);
-  const uint8_t* raw_values_;
-  const uint8_t* sign_bitmap_data_;
 };
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index e2054db..5dba0e1 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -957,9 +957,7 @@ template class DictionaryBuilder<StringType>;
 // DecimalBuilder
 
 DecimalBuilder::DecimalBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
-    : FixedSizeBinaryBuilder(type, pool),
-      sign_bitmap_(nullptr),
-      sign_bitmap_data_(nullptr) {}
+    : FixedSizeBinaryBuilder(type, pool) {}
 
 #ifndef ARROW_NO_DEPRECATED_API
 DecimalBuilder::DecimalBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type)
@@ -968,9 +966,6 @@ DecimalBuilder::DecimalBuilder(MemoryPool* pool, const std::shared_ptr<DataType>
 
 template <typename T>
 ARROW_EXPORT Status DecimalBuilder::Append(const decimal::Decimal<T>& val) {
-  DCHECK_EQ(sign_bitmap_, nullptr) << "sign_bitmap_ is not null";
-  DCHECK_EQ(sign_bitmap_data_, nullptr) << "sign_bitmap_data_ is not null";
-
   RETURN_NOT_OK(FixedSizeBinaryBuilder::Reserve(1));
   return FixedSizeBinaryBuilder::Append(reinterpret_cast<const uint8_t*>(&val.value));
 }
@@ -980,53 +975,11 @@ template ARROW_EXPORT Status DecimalBuilder::Append(const decimal::Decimal64& va
 
 template <>
 ARROW_EXPORT Status DecimalBuilder::Append(const decimal::Decimal128& value) {
-  DCHECK_NE(sign_bitmap_, nullptr) << "sign_bitmap_ is null";
-  DCHECK_NE(sign_bitmap_data_, nullptr) << "sign_bitmap_data_ is null";
-
   RETURN_NOT_OK(FixedSizeBinaryBuilder::Reserve(1));
   uint8_t stack_bytes[16] = {0};
   uint8_t* bytes = stack_bytes;
-  bool is_negative;
-  decimal::ToBytes(value, &bytes, &is_negative);
-  RETURN_NOT_OK(FixedSizeBinaryBuilder::Append(bytes));
-
-  // TODO(phillipc): calculate the proper storage size here (do we have a function to do
-  // this)?
-  // TODO(phillipc): Reserve number of elements
-  RETURN_NOT_OK(sign_bitmap_->Reserve(1));
-  BitUtil::SetBitTo(sign_bitmap_data_, length_ - 1, is_negative);
-  return Status::OK();
-}
-
-Status DecimalBuilder::Init(int64_t capacity) {
-  RETURN_NOT_OK(FixedSizeBinaryBuilder::Init(capacity));
-  if (byte_width_ == 16) {
-    RETURN_NOT_OK(AllocateResizableBuffer(pool_, null_bitmap_->size(), &sign_bitmap_));
-    sign_bitmap_data_ = sign_bitmap_->mutable_data();
-    memset(sign_bitmap_data_, 0, static_cast<size_t>(sign_bitmap_->capacity()));
-  }
-  return Status::OK();
-}
-
-Status DecimalBuilder::Resize(int64_t capacity) {
-  int64_t old_bytes = null_bitmap_ != nullptr ? null_bitmap_->size() : 0;
-  if (sign_bitmap_ == nullptr) {
-    return Init(capacity);
-  }
-  RETURN_NOT_OK(FixedSizeBinaryBuilder::Resize(capacity));
-
-  if (byte_width_ == 16) {
-    RETURN_NOT_OK(sign_bitmap_->Resize(null_bitmap_->size()));
-    int64_t new_bytes = sign_bitmap_->size();
-    sign_bitmap_data_ = sign_bitmap_->mutable_data();
-
-    // The buffer might be overpadded to deal with padding according to the spec
-    if (old_bytes < new_bytes) {
-      memset(sign_bitmap_data_ + old_bytes, 0,
-             static_cast<size_t>(sign_bitmap_->capacity() - old_bytes));
-    }
-  }
-  return Status::OK();
+  decimal::ToBytes(value, &bytes);
+  return FixedSizeBinaryBuilder::Append(bytes);
 }
 
 Status DecimalBuilder::Finish(std::shared_ptr<Array>* out) {
@@ -1034,8 +987,8 @@ Status DecimalBuilder::Finish(std::shared_ptr<Array>* out) {
   RETURN_NOT_OK(byte_builder_.Finish(&data));
 
   /// TODO(phillipc): not sure where to get the offset argument here
-  *out = std::make_shared<DecimalArray>(type_, length_, data, null_bitmap_, null_count_,
-                                        0, sign_bitmap_);
+  *out =
+      std::make_shared<DecimalArray>(type_, length_, data, null_bitmap_, null_count_, 0);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 4e3cd92..8e41266 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -704,15 +704,7 @@ class ARROW_EXPORT DecimalBuilder : public FixedSizeBinaryBuilder {
   template <typename T>
   ARROW_EXPORT Status Append(const decimal::Decimal<T>& val);
 
-  Status Init(int64_t capacity) override;
-  Status Resize(int64_t capacity) override;
   Status Finish(std::shared_ptr<Array>* out) override;
-
- private:
-  /// We only need these for 128 bit decimals, because boost stores the sign
-  /// separate from the underlying bytes.
-  std::shared_ptr<ResizableBuffer> sign_bitmap_;
-  uint8_t* sign_bitmap_data_;
 };
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index c01f190..beb22e7 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -257,42 +257,7 @@ class RangeEqualsVisitor {
   }
 
   Status Visit(const DecimalArray& left) {
-    const auto& right = static_cast<const DecimalArray&>(right_);
-
-    int32_t width = left.byte_width();
-
-    const uint8_t* left_data = nullptr;
-    const uint8_t* right_data = nullptr;
-
-    if (left.values()) {
-      left_data = left.raw_values();
-    }
-
-    if (right.values()) {
-      right_data = right.raw_values();
-    }
-
-    for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_;
-         ++i, ++o_i) {
-      if (left.IsNegative(i) != right.IsNegative(o_i)) {
-        result_ = false;
-        return Status::OK();
-      }
-
-      const bool is_null = left.IsNull(i);
-      if (is_null != right.IsNull(o_i)) {
-        result_ = false;
-        return Status::OK();
-      }
-      if (is_null) continue;
-
-      if (std::memcmp(left_data + width * i, right_data + width * o_i, width)) {
-        result_ = false;
-        return Status::OK();
-      }
-    }
-    result_ = true;
-    return Status::OK();
+    return Visit(static_cast<const FixedSizeBinaryArray&>(left));
   }
 
   Status Visit(const NullArray& left) {
@@ -346,7 +311,7 @@ class RangeEqualsVisitor {
 
 static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& right) {
   const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
-  const int byte_width = size_meta.bit_width() / 8;
+  const int byte_width = size_meta.bit_width() / CHAR_BIT;
 
   const uint8_t* left_data = nullptr;
   const uint8_t* right_data = nullptr;
@@ -377,70 +342,26 @@ static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& r
 template <typename T>
 static inline bool CompareBuiltIn(const Array& left, const Array& right, const T* ldata,
                                   const T* rdata) {
-  if (left.null_count() > 0) {
-    for (int64_t i = 0; i < left.length(); ++i) {
-      if (left.IsNull(i) != right.IsNull(i)) {
-        return false;
-      } else if (!left.IsNull(i) && (ldata[i] != rdata[i])) {
-        return false;
-      }
-    }
+  if (ldata == nullptr && rdata == nullptr) {
     return true;
-  } else {
-    return memcmp(ldata, rdata, sizeof(T) * left.length()) == 0;
   }
-}
 
-static bool IsEqualDecimal(const DecimalArray& left, const DecimalArray& right) {
-  const int64_t loffset = left.offset();
-  const int64_t roffset = right.offset();
-
-  const uint8_t* left_data = nullptr;
-  const uint8_t* right_data = nullptr;
-
-  if (left.values()) {
-    left_data = left.raw_values();
-  }
-  if (right.values()) {
-    right_data = right.raw_values();
+  if (ldata == nullptr || rdata == nullptr) {
+    return false;
   }
 
-  const int32_t byte_width = left.byte_width();
-  if (byte_width == 4) {
-    return CompareBuiltIn<int32_t>(
-        left, right, reinterpret_cast<const int32_t*>(left_data) + loffset,
-        reinterpret_cast<const int32_t*>(right_data) + roffset);
-  } else if (byte_width == 8) {
-    return CompareBuiltIn<int64_t>(
-        left, right, reinterpret_cast<const int64_t*>(left_data) + loffset,
-        reinterpret_cast<const int64_t*>(right_data) + roffset);
-  } else {
-    // 128-bit
-
-    // Must also compare sign bitmap
-    const uint8_t* left_sign = nullptr;
-    const uint8_t* right_sign = nullptr;
-    if (left.sign_bitmap()) {
-      left_sign = left.sign_bitmap()->data();
-    }
-    if (right.sign_bitmap()) {
-      right_sign = right.sign_bitmap()->data();
-    }
-
+  if (left.null_count() > 0) {
     for (int64_t i = 0; i < left.length(); ++i) {
-      bool left_null = left.IsNull(i);
-      if (!left_null && (memcmp(left_data, right_data, byte_width) || right.IsNull(i))) {
+      if (left.IsNull(i) != right.IsNull(i)) {
         return false;
-      }
-      if (BitUtil::GetBit(left_sign, i + loffset) !=
-          BitUtil::GetBit(right_sign, i + roffset)) {
+      } else if (!left.IsNull(i) && (ldata[i] != rdata[i])) {
         return false;
       }
-      left_data += byte_width;
-      right_data += byte_width;
     }
     return true;
   }
+
+  return memcmp(ldata, rdata, sizeof(T) * left.length()) == 0;
 }
 
 class ArrayEqualsVisitor : public RangeEqualsVisitor {
@@ -485,11 +406,6 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     return Status::OK();
   }
 
-  Status Visit(const DecimalArray& left) {
-    result_ = IsEqualDecimal(left, static_cast<const DecimalArray&>(right_));
-    return Status::OK();
-  }
-
   template <typename ArrayType>
   bool ValueOffsetsEqual(const ArrayType& left) {
     const auto& right = static_cast<const ArrayType&>(right_);
@@ -591,6 +507,27 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
     return Status::OK();
   }
 
+  Status Visit(const DecimalArray& left) {
+    const int byte_width = left.byte_width();
+    if (byte_width == 4) {
+      result_ = CompareBuiltIn<int32_t>(
+          left, right_, reinterpret_cast<const int32_t*>(left.raw_values()),
+          reinterpret_cast<const int32_t*>(
+              static_cast<const DecimalArray&>(right_).raw_values()));
+      return Status::OK();
+    }
+
+    if (byte_width == 8) {
+      result_ = CompareBuiltIn<int64_t>(
+          left, right_, reinterpret_cast<const int64_t*>(left.raw_values()),
+          reinterpret_cast<const int64_t*>(
+              static_cast<const DecimalArray&>(right_).raw_values()));
+      return Status::OK();
+    }
+
+    return RangeEqualsVisitor::Visit(left);
+  }
+
   template <typename T>
   typename std::enable_if<std::is_base_of<NestedType, typename T::TypeClass>::value,
                           Status>::type
@@ -823,7 +760,7 @@ Status TensorEquals(const Tensor& left, const Tensor& right, bool* are_equal) {
     }
 
     const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
-    const int byte_width = size_meta.bit_width() / 8;
+    const int byte_width = size_meta.bit_width() / CHAR_BIT;
     DCHECK_GT(byte_width, 0);
 
     const uint8_t* left_data = left.data()->data();

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/python/arrow_to_pandas.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index d1fca70..1f62ef8 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -604,11 +604,12 @@ static Status ConvertTimes(PandasOptions options, const ChunkedArray& data,
 
 template <typename T>
 Status ValidateDecimalPrecision(int precision) {
-  constexpr static const int maximum_precision = decimal::DecimalPrecision<T>::maximum;
-  if (!(precision > 0 && precision <= maximum_precision)) {
+  constexpr static const int kMaximumPrecision =
+      decimal::DecimalPrecision<typename T::value_type>::maximum;
+  if (!(precision > 0 && precision <= kMaximumPrecision)) {
     std::stringstream ss;
     ss << "Invalid precision: " << precision << ". Minimum is 1, maximum is "
-       << maximum_precision;
+       << kMaximumPrecision;
     return Status::Invalid(ss.str());
   }
   return Status::OK();
@@ -620,27 +621,24 @@ Status RawDecimalToString(const uint8_t* bytes, int precision, int scale,
   DCHECK_NE(bytes, nullptr);
   DCHECK_NE(result, nullptr);
   RETURN_NOT_OK(ValidateDecimalPrecision<T>(precision));
-  decimal::Decimal<T> decimal;
-  FromBytes(bytes, &decimal);
-  *result = ToString(decimal, precision, scale);
+  T decimal;
+  decimal::FromBytes(bytes, &decimal);
+  *result = decimal::ToString(decimal, precision, scale);
   return Status::OK();
 }
 
-template Status RawDecimalToString<int32_t>(const uint8_t*, int, int,
-                                            std::string* result);
-template Status RawDecimalToString<int64_t>(const uint8_t*, int, int,
-                                            std::string* result);
-
-Status RawDecimalToString(const uint8_t* bytes, int precision, int scale,
-                          bool is_negative, std::string* result) {
-  DCHECK_NE(bytes, nullptr);
-  DCHECK_NE(result, nullptr);
-  RETURN_NOT_OK(ValidateDecimalPrecision<boost::multiprecision::int128_t>(precision));
-  decimal::Decimal128 decimal;
-  FromBytes(bytes, is_negative, &decimal);
-  *result = ToString(decimal, precision, scale);
-  return Status::OK();
-}
+template Status RawDecimalToString<decimal::Decimal32>(const uint8_t*, int, int,
+                                                       std::string*);
+template Status RawDecimalToString<decimal::Decimal64>(const uint8_t*, int, int,
+                                                       std::string*);
+template Status RawDecimalToString<decimal::Decimal128>(const uint8_t*, int, int,
+                                                        std::string*);
+
+#define RAW_DECIMAL_TO_STRING_CASE(bits, value, precision, scale, output)          \
+  case bits:                                                                       \
+    RETURN_NOT_OK(RawDecimalToString<decimal::Decimal##bits>((value), (precision), \
+                                                             (scale), (output)));  \
+    break;
 
 static Status ConvertDecimals(PandasOptions options, const ChunkedArray& data,
                               PyObject** out_values) {
@@ -664,22 +662,18 @@ static Status ConvertDecimals(PandasOptions options, const ChunkedArray& data,
         *out_values++ = Py_None;
       } else {
         const uint8_t* raw_value = arr->GetValue(i);
-        std::string s;
+        std::string decimal_string;
         switch (bit_width) {
-          case 32:
-            RETURN_NOT_OK(RawDecimalToString<int32_t>(raw_value, precision, scale, &s));
-            break;
-          case 64:
-            RETURN_NOT_OK(RawDecimalToString<int64_t>(raw_value, precision, scale, &s));
-            break;
-          case 128:
-            RETURN_NOT_OK(
-                RawDecimalToString(raw_value, precision, scale, arr->IsNegative(i), &s));
-            break;
-          default:
-            break;
+          RAW_DECIMAL_TO_STRING_CASE(32, raw_value, precision, scale, &decimal_string)
+          RAW_DECIMAL_TO_STRING_CASE(64, raw_value, precision, scale, &decimal_string)
+          RAW_DECIMAL_TO_STRING_CASE(128, raw_value, precision, scale, &decimal_string)
+          default: {
+            std::stringstream buf;
+            buf << "Invalid bit_width " << bit_width << " for decimal value";
+            return Status::Invalid(buf.str());
+          }
         }
-        RETURN_NOT_OK(DecimalFromString(Decimal, s, out_values++));
+        RETURN_NOT_OK(DecimalFromString(Decimal, decimal_string, out_values++));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/python/python-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 0d83012..0241ff0 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -23,12 +23,10 @@
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
-#include "arrow/table.h"
 #include "arrow/test-util.h"
 
 #include "arrow/python/arrow_to_pandas.h"
 #include "arrow/python/builtin_convert.h"
-#include "arrow/python/common.h"
 #include "arrow/python/helpers.h"
 
 #include "arrow/util/decimal.h"
@@ -61,13 +59,11 @@ TEST(DecimalTest, TestPythonDecimalToString) {
   ASSERT_NE(pydecimal.obj(), nullptr);
   ASSERT_EQ(PyErr_Occurred(), nullptr);
 
-  boost::multiprecision::int128_t boost_decimal(decimal_string);
   PyObject* python_object = pydecimal.obj();
   ASSERT_NE(python_object, nullptr);
 
   std::string string_result;
   ASSERT_OK(PythonDecimalToString(python_object, &string_result));
-  ASSERT_EQ(boost_decimal.str(), string_result);
 }
 
 TEST(PandasConversionTest, TestObjectBlockWriteFails) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 4443e8d..82cd137 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -480,10 +480,6 @@ std::vector<BufferDescr> FixedSizeBinaryType::GetBufferLayout() const {
   return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())};
 }
 
-std::vector<BufferDescr> DecimalType::GetBufferLayout() const {
-  return {kValidityBuffer, kBooleanBuffer, BufferDescr(BufferType::DATA, bit_width())};
-}
-
 std::vector<BufferDescr> ListType::GetBufferLayout() const {
   return {kValidityBuffer, kOffsetBuffer};
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 4917ebb..30cd71e 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -477,7 +477,6 @@ class ARROW_EXPORT DecimalType : public FixedSizeBinaryType {
         precision_(precision),
         scale_(scale) {}
 
-  std::vector<BufferDescr> GetBufferLayout() const override;
   Status Accept(TypeVisitor* visitor) const override;
   std::string ToString() const override;
   static std::string name() { return "decimal"; }

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index 8c1a23d..0705820 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -21,19 +21,20 @@
 
 # Headers: top level
 install(FILES
-  bit-util.h
   bit-stream-utils.h
+  bit-util.h
   bpacking.h
   compiler-util.h
-  compression.h
   compression_brotli.h
+  compression.h
   compression_lz4.h
   compression_snappy.h
   compression_zlib.h
   compression_zstd.h
   cpu-info.h
-  key_value_metadata.h
   hash-util.h
+  int128.h
+  key_value_metadata.h
   logging.h
   macros.h
   random.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/decimal-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal-test.cc b/cpp/src/arrow/util/decimal-test.cc
index e3fd480..fb8833c 100644
--- a/cpp/src/arrow/util/decimal-test.cc
+++ b/cpp/src/arrow/util/decimal-test.cc
@@ -28,27 +28,27 @@ namespace decimal {
 template <typename T>
 class DecimalTest : public ::testing::Test {
  public:
-  DecimalTest() : string_value("234.23445") { integer_value.value = 23423445; }
-  Decimal<T> integer_value;
-  std::string string_value;
+  DecimalTest() : decimal_value_(23423445), string_value_("234.23445") {}
+  Decimal<T> decimal_value_;
+  std::string string_value_;
 };
 
-typedef ::testing::Types<int32_t, int64_t, int128_t> DecimalTypes;
+typedef ::testing::Types<int32_t, int64_t, Int128> DecimalTypes;
 TYPED_TEST_CASE(DecimalTest, DecimalTypes);
 
 TYPED_TEST(DecimalTest, TestToString) {
-  Decimal<TypeParam> decimal(this->integer_value);
+  Decimal<TypeParam> decimal(this->decimal_value_);
   int precision = 8;
   int scale = 5;
   std::string result = ToString(decimal, precision, scale);
-  ASSERT_EQ(result, this->string_value);
+  ASSERT_EQ(result, this->string_value_);
 }
 
 TYPED_TEST(DecimalTest, TestFromString) {
-  Decimal<TypeParam> expected(this->integer_value);
+  Decimal<TypeParam> expected(this->decimal_value_);
   Decimal<TypeParam> result;
   int precision, scale;
-  ASSERT_OK(FromString(this->string_value, &result, &precision, &scale));
+  ASSERT_OK(FromString(this->string_value_, &result, &precision, &scale));
   ASSERT_EQ(result.value, expected.value);
   ASSERT_EQ(precision, 8);
   ASSERT_EQ(scale, 5);
@@ -67,7 +67,7 @@ TEST(DecimalTest, TestStringStartingWithPlus) {
 
 TEST(DecimalTest, TestStringStartingWithPlus128) {
   std::string plus_value("+2342394230592.232349023094");
-  decimal::int128_t expected_value("2342394230592232349023094");
+  Int128 expected_value("2342394230592232349023094");
   Decimal128 out;
   int scale;
   int precision;
@@ -90,29 +90,30 @@ TEST(DecimalTest, TestStringToInt64) {
 }
 
 TEST(DecimalTest, TestStringToInt128) {
-  int128_t value = 0;
+  Int128 value = 0;
   StringToInteger("123456789", "456789123", 1, &value);
-  ASSERT_EQ(value, 123456789456789123);
+  ASSERT_EQ(value.high_bits(), 0);
+  ASSERT_EQ(value.low_bits(), 123456789456789123);
 }
 
 TEST(DecimalTest, TestFromString128) {
   static const std::string string_value("-23049223942343532412");
-  Decimal<int128_t> result(string_value);
-  int128_t expected = -230492239423435324;
+  Decimal128 result(string_value);
+  Int128 expected(static_cast<int64_t>(-230492239423435324));
   ASSERT_EQ(result.value, expected * 100 - 12);
 
   // Sanity check that our number is actually using more than 64 bits
-  ASSERT_NE(result.value, static_cast<int64_t>(result.value));
+  ASSERT_NE(result.value.high_bits(), 0);
 }
 
 TEST(DecimalTest, TestFromDecimalString128) {
   static const std::string string_value("-23049223942343.532412");
-  Decimal<int128_t> result(string_value);
-  int128_t expected = -230492239423435324;
+  Decimal128 result(string_value);
+  Int128 expected(static_cast<int64_t>(-230492239423435324));
   ASSERT_EQ(result.value, expected * 100 - 12);
 
   // Sanity check that our number is actually using more than 64 bits
-  ASSERT_NE(result.value, static_cast<int64_t>(result.value));
+  ASSERT_NE(result.value.high_bits(), 0);
 }
 
 TEST(DecimalTest, TestDecimal32Precision) {
@@ -130,8 +131,8 @@ TEST(DecimalTest, TestDecimal64Precision) {
 }
 
 TEST(DecimalTest, TestDecimal128Precision) {
-  auto min_precision = DecimalPrecision<int128_t>::minimum;
-  auto max_precision = DecimalPrecision<int128_t>::maximum;
+  auto min_precision = DecimalPrecision<Int128>::minimum;
+  auto max_precision = DecimalPrecision<Int128>::maximum;
   ASSERT_EQ(min_precision, 19);
   ASSERT_EQ(max_precision, 38);
 }
@@ -166,19 +167,16 @@ TEST(DecimalTest, TestDecimal128StringAndBytesRoundTrip) {
   Decimal128 expected(string_value);
 
   std::string expected_string_value("-340282366920938463463374607431711455");
-  int128_t expected_underlying_value(expected_string_value);
+  Int128 expected_underlying_value(expected_string_value);
 
   ASSERT_EQ(expected.value, expected_underlying_value);
 
   uint8_t stack_bytes[16] = {0};
   uint8_t* bytes = stack_bytes;
-  bool is_negative;
-  ToBytes(expected, &bytes, &is_negative);
-
-  ASSERT_TRUE(is_negative);
+  ToBytes(expected, &bytes);
 
   Decimal128 result;
-  FromBytes(bytes, is_negative, &result);
+  FromBytes(bytes, &result);
 
   ASSERT_EQ(expected.value, result.value);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/decimal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal.cc b/cpp/src/arrow/util/decimal.cc
index 1a12e20..c195cf3 100644
--- a/cpp/src/arrow/util/decimal.cc
+++ b/cpp/src/arrow/util/decimal.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <sstream>
+
 #include "arrow/util/decimal.h"
 
 namespace arrow {
@@ -156,8 +158,9 @@ void StringToInteger(const std::string& whole, const std::string& fractional, in
   DCHECK(sign == -1 || sign == 1);
   DCHECK_NE(out, nullptr);
   DCHECK(!whole.empty() || !fractional.empty());
+
   if (!whole.empty()) {
-    *out = std::stoi(whole, nullptr, 10) *
+    *out = std::stoi(whole) *
            static_cast<int32_t>(pow(10.0, static_cast<double>(fractional.size())));
   }
   if (!fractional.empty()) {
@@ -172,7 +175,7 @@ void StringToInteger(const std::string& whole, const std::string& fractional, in
   DCHECK_NE(out, nullptr);
   DCHECK(!whole.empty() || !fractional.empty());
   if (!whole.empty()) {
-    *out = static_cast<int64_t>(std::stoll(whole, nullptr, 10)) *
+    *out = static_cast<int64_t>(std::stoll(whole)) *
            static_cast<int64_t>(pow(10.0, static_cast<double>(fractional.size())));
   }
   if (!fractional.empty()) {
@@ -182,11 +185,11 @@ void StringToInteger(const std::string& whole, const std::string& fractional, in
 }
 
 void StringToInteger(const std::string& whole, const std::string& fractional, int8_t sign,
-                     int128_t* out) {
+                     Int128* out) {
   DCHECK(sign == -1 || sign == 1);
   DCHECK_NE(out, nullptr);
   DCHECK(!whole.empty() || !fractional.empty());
-  *out = int128_t(whole + fractional) * sign;
+  *out = Int128(whole + fractional) * sign;
 }
 
 void FromBytes(const uint8_t* bytes, Decimal32* decimal) {
@@ -201,22 +204,8 @@ void FromBytes(const uint8_t* bytes, Decimal64* decimal) {
   decimal->value = *reinterpret_cast<const int64_t*>(bytes);
 }
 
-constexpr static const size_t BYTES_IN_128_BITS = 128 / CHAR_BIT;
-constexpr static const size_t LIMB_SIZE =
-    sizeof(std::remove_pointer<int128_t::backend_type::limb_pointer>::type);
-constexpr static const size_t LIMBS_IN_INT128 = BYTES_IN_128_BITS / LIMB_SIZE;
-
-void FromBytes(const uint8_t* bytes, bool is_negative, Decimal128* decimal) {
-  DCHECK_NE(bytes, nullptr);
-  DCHECK_NE(decimal, nullptr);
-
-  auto& decimal_value(decimal->value);
-  int128_t::backend_type& backend(decimal_value.backend());
-  backend.resize(LIMBS_IN_INT128, LIMBS_IN_INT128);
-  std::memcpy(backend.limbs(), bytes, BYTES_IN_128_BITS);
-  if (is_negative) {
-    decimal->value = -decimal->value;
-  }
+void FromBytes(const uint8_t* bytes, Decimal128* decimal) {
+  decimal->value = Int128(bytes);
 }
 
 void ToBytes(const Decimal32& value, uint8_t** bytes) {
@@ -229,16 +218,10 @@ void ToBytes(const Decimal64& value, uint8_t** bytes) {
   *reinterpret_cast<int64_t*>(*bytes) = value.value;
 }
 
-void ToBytes(const Decimal128& decimal, uint8_t** bytes, bool* is_negative) {
+void ToBytes(const Decimal128& decimal, uint8_t** bytes) {
+  DCHECK_NE(bytes, nullptr);
   DCHECK_NE(*bytes, nullptr);
-  DCHECK_NE(is_negative, nullptr);
-
-  /// TODO(phillipc): boost multiprecision is unreliable here, int128_t can't be
-  /// roundtripped
-  const auto& backend(decimal.value.backend());
-  const size_t bytes_in_use = LIMB_SIZE * backend.size();
-  std::memcpy(*bytes, backend.limbs(), bytes_in_use);
-  *is_negative = backend.isneg();
+  decimal.value.ToBytes(bytes);
 }
 
 }  // namespace decimal

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/decimal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal.h b/cpp/src/arrow/util/decimal.h
index 20142fa..88ac2f5 100644
--- a/cpp/src/arrow/util/decimal.h
+++ b/cpp/src/arrow/util/decimal.h
@@ -25,15 +25,12 @@
 
 #include "arrow/status.h"
 #include "arrow/util/bit-util.h"
+#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 
-#include <boost/multiprecision/cpp_int.hpp>
-
 namespace arrow {
 namespace decimal {
 
-using boost::multiprecision::int128_t;
-
 template <typename T>
 struct ARROW_EXPORT Decimal;
 
@@ -42,7 +39,7 @@ ARROW_EXPORT void StringToInteger(const std::string& whole, const std::string& f
 ARROW_EXPORT void StringToInteger(const std::string& whole, const std::string& fractional,
                                   int8_t sign, int64_t* out);
 ARROW_EXPORT void StringToInteger(const std::string& whole, const std::string& fractional,
-                                  int8_t sign, int128_t* out);
+                                  int8_t sign, Int128* out);
 
 template <typename T>
 ARROW_EXPORT Status FromString(const std::string& s, Decimal<T>* out,
@@ -61,7 +58,7 @@ struct ARROW_EXPORT Decimal {
 
 using Decimal32 = Decimal<int32_t>;
 using Decimal64 = Decimal<int64_t>;
-using Decimal128 = Decimal<int128_t>;
+using Decimal128 = Decimal<Int128>;
 
 template <typename T>
 struct ARROW_EXPORT DecimalPrecision {};
@@ -79,7 +76,7 @@ struct ARROW_EXPORT DecimalPrecision<int64_t> {
 };
 
 template <>
-struct ARROW_EXPORT DecimalPrecision<int128_t> {
+struct ARROW_EXPORT DecimalPrecision<Int128> {
   constexpr static const int minimum = 19;
   constexpr static const int maximum = 38;
 };
@@ -108,8 +105,8 @@ ARROW_EXPORT std::string ToString(const Decimal<T>& decimal_value, int precision
   if (scale > 0) {
     int remaining_scale = scale;
     do {
-      str[--last_char_idx] = static_cast<char>((remaining_value % 10) +
-                                               static_cast<T>('0'));  // Ascii offset
+      str[--last_char_idx] =
+          static_cast<char>(remaining_value % 10 + static_cast<T>('0'));  // Ascii offset
       remaining_value /= 10;
     } while (--remaining_scale > 0);
     str[--last_char_idx] = '.';
@@ -117,11 +114,14 @@ ARROW_EXPORT std::string ToString(const Decimal<T>& decimal_value, int precision
   }
   do {
     str[--last_char_idx] =
-        static_cast<char>((remaining_value % 10) + static_cast<T>('0'));  // Ascii offset
+        static_cast<char>(remaining_value % 10 + static_cast<T>('0'));  // Ascii offset
     remaining_value /= 10;
     if (remaining_value == 0) {
       // Trim any extra leading 0's.
-      if (last_char_idx > first_digit_idx) str.erase(0, last_char_idx - first_digit_idx);
+      if (last_char_idx > first_digit_idx) {
+        str.erase(0, last_char_idx - first_digit_idx);
+      }
+
       break;
     }
     // For safety, enforce string length independent of remaining_value.
@@ -133,12 +133,12 @@ ARROW_EXPORT std::string ToString(const Decimal<T>& decimal_value, int precision
 /// Conversion from raw bytes to a Decimal value
 ARROW_EXPORT void FromBytes(const uint8_t* bytes, Decimal32* value);
 ARROW_EXPORT void FromBytes(const uint8_t* bytes, Decimal64* value);
-ARROW_EXPORT void FromBytes(const uint8_t* bytes, bool is_negative, Decimal128* decimal);
+ARROW_EXPORT void FromBytes(const uint8_t* bytes, Decimal128* decimal);
 
 /// Conversion from a Decimal value to raw bytes
 ARROW_EXPORT void ToBytes(const Decimal32& value, uint8_t** bytes);
 ARROW_EXPORT void ToBytes(const Decimal64& value, uint8_t** bytes);
-ARROW_EXPORT void ToBytes(const Decimal128& decimal, uint8_t** bytes, bool* is_negative);
+ARROW_EXPORT void ToBytes(const Decimal128& decimal, uint8_t** bytes);
 
 }  // namespace decimal
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/int128.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/int128.cc b/cpp/src/arrow/util/int128.cc
new file mode 100644
index 0000000..9d1d062
--- /dev/null
+++ b/cpp/src/arrow/util/int128.cc
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <cmath>
+#include <iomanip>
+#include <limits>
+#include <sstream>
+
+#ifdef _MSC_VER
+#include <intrin.h>
+#pragma intrinsic(_BitScanReverse)
+#endif
+
+#include "arrow/util/int128.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace decimal {
+
+static constexpr uint64_t kIntMask = 0xFFFFFFFF;
+static constexpr auto kCarryBit = static_cast<uint64_t>(1) << static_cast<uint64_t>(32);
+
+Int128::Int128(const std::string& str) : Int128() {
+  const size_t length = str.length();
+
+  if (length > 0) {
+    bool is_negative = str[0] == '-';
+    auto posn = static_cast<size_t>(is_negative);
+
+    while (posn < length) {
+      const size_t group = std::min(static_cast<size_t>(18), length - posn);
+      const auto chunk = static_cast<int64_t>(std::stoll(str.substr(posn, group)));
+      const auto multiple =
+          static_cast<int64_t>(std::pow(10.0, static_cast<double>(group)));
+
+      *this *= multiple;
+      *this += chunk;
+
+      posn += group;
+    }
+
+    if (is_negative) {
+      Negate();
+    }
+  }
+}
+
+Int128::Int128(const uint8_t* bytes)
+    : Int128(reinterpret_cast<const int64_t*>(bytes)[0],
+             reinterpret_cast<const uint64_t*>(bytes)[1]) {}
+
+void Int128::ToBytes(uint8_t** out) const {
+  DCHECK_NE(out, nullptr) << "Cannot fill nullptr of bytes from Int128";
+  DCHECK_NE(*out, nullptr) << "Cannot fill nullptr of bytes from Int128";
+  const uint64_t raw[] = {static_cast<uint64_t>(high_bits_), low_bits_};
+  std::memcpy(*out, raw, 16);
+}
+
+Int128& Int128::Negate() {
+  low_bits_ = ~low_bits_ + 1;
+  high_bits_ = ~high_bits_;
+  if (low_bits_ == 0) {
+    ++high_bits_;
+  }
+  return *this;
+}
+
+Int128& Int128::operator+=(const Int128& right) {
+  const uint64_t sum = low_bits_ + right.low_bits_;
+  high_bits_ += right.high_bits_;
+  if (sum < low_bits_) {
+    ++high_bits_;
+  }
+  low_bits_ = sum;
+  return *this;
+}
+
+Int128& Int128::operator-=(const Int128& right) {
+  const uint64_t diff = low_bits_ - right.low_bits_;
+  high_bits_ -= right.high_bits_;
+  if (diff > low_bits_) {
+    --high_bits_;
+  }
+  low_bits_ = diff;
+  return *this;
+}
+
+Int128& Int128::operator/=(const Int128& right) {
+  Int128 remainder;
+  DCHECK(Divide(right, this, &remainder).ok());
+  return *this;
+}
+
+Int128::operator char() const {
+  DCHECK(high_bits_ == 0 || high_bits_ == -1)
+      << "Trying to cast an Int128 greater than the value range of a "
+         "char. high_bits_ must be equal to 0 or -1, got: "
+      << high_bits_;
+  DCHECK_LE(low_bits_, std::numeric_limits<char>::max())
+      << "low_bits_ too large for C type char, got: " << low_bits_;
+  return static_cast<char>(low_bits_);
+}
+
+Int128& Int128::operator|=(const Int128& right) {
+  low_bits_ |= right.low_bits_;
+  high_bits_ |= right.high_bits_;
+  return *this;
+}
+
+Int128& Int128::operator&=(const Int128& right) {
+  low_bits_ &= right.low_bits_;
+  high_bits_ &= right.high_bits_;
+  return *this;
+}
+
+Int128& Int128::operator<<=(uint32_t bits) {
+  if (bits != 0) {
+    if (bits < 64) {
+      high_bits_ <<= bits;
+      high_bits_ |= (low_bits_ >> (64 - bits));
+      low_bits_ <<= bits;
+    } else if (bits < 128) {
+      high_bits_ = static_cast<int64_t>(low_bits_) << (bits - 64);
+      low_bits_ = 0;
+    } else {
+      high_bits_ = 0;
+      low_bits_ = 0;
+    }
+  }
+  return *this;
+}
+
+Int128& Int128::operator>>=(uint32_t bits) {
+  if (bits != 0) {
+    if (bits < 64) {
+      low_bits_ >>= bits;
+      low_bits_ |= static_cast<uint64_t>(high_bits_ << (64 - bits));
+      high_bits_ = static_cast<int64_t>(static_cast<uint64_t>(high_bits_) >> bits);
+    } else if (bits < 128) {
+      low_bits_ = static_cast<uint64_t>(high_bits_ >> (bits - 64));
+      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
+    } else {
+      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
+      low_bits_ = static_cast<uint64_t>(high_bits_);
+    }
+  }
+  return *this;
+}
+
+Int128& Int128::operator*=(const Int128& right) {
+  // Break the left and right numbers into 32 bit chunks
+  // so that we can multiply them without overflow.
+  const uint64_t L0 = static_cast<uint64_t>(high_bits_) >> 32;
+  const uint64_t L1 = static_cast<uint64_t>(high_bits_) & kIntMask;
+  const uint64_t L2 = low_bits_ >> 32;
+  const uint64_t L3 = low_bits_ & kIntMask;
+
+  const uint64_t R0 = static_cast<uint64_t>(right.high_bits_) >> 32;
+  const uint64_t R1 = static_cast<uint64_t>(right.high_bits_) & kIntMask;
+  const uint64_t R2 = right.low_bits_ >> 32;
+  const uint64_t R3 = right.low_bits_ & kIntMask;
+
+  uint64_t product = L3 * R3;
+  low_bits_ = product & kIntMask;
+
+  uint64_t sum = product >> 32;
+
+  product = L2 * R3;
+  sum += product;
+
+  product = L3 * R2;
+  sum += product;
+
+  low_bits_ += sum << 32;
+
+  high_bits_ = static_cast<int64_t>(sum < product ? kCarryBit : 0);
+  if (sum < product) {
+    high_bits_ += kCarryBit;
+  }
+
+  high_bits_ += static_cast<int64_t>(sum >> 32);
+  high_bits_ += L1 * R3 + L2 * R2 + L3 * R1;
+  high_bits_ += (L0 * R3 + L1 * R2 + L2 * R1 + L3 * R0) << 32;
+  return *this;
+}
+
+/// Expands the given value into an array of ints so that we can work on
+/// it. The array will be converted to an absolute value and the wasNegative
+/// flag will be set appropriately. The array will remove leading zeros from
+/// the value.
+/// \param array an array of length 4 to set with the value
+/// \param was_negative a flag for whether the value was original negative
+/// \result the output length of the array
+static int64_t FillInArray(const Int128& value, uint32_t* array, bool& was_negative) {
+  uint64_t high;
+  uint64_t low;
+  const int64_t highbits = value.high_bits();
+  const uint64_t lowbits = value.low_bits();
+
+  if (highbits < 0) {
+    low = ~lowbits + 1;
+    high = static_cast<uint64_t>(~highbits);
+    if (low == 0) {
+      ++high;
+    }
+    was_negative = true;
+  } else {
+    low = lowbits;
+    high = static_cast<uint64_t>(highbits);
+    was_negative = false;
+  }
+
+  if (high != 0) {
+    if (high > std::numeric_limits<uint32_t>::max()) {
+      array[0] = static_cast<uint32_t>(high >> 32);
+      array[1] = static_cast<uint32_t>(high);
+      array[2] = static_cast<uint32_t>(low >> 32);
+      array[3] = static_cast<uint32_t>(low);
+      return 4;
+    }
+
+    array[0] = static_cast<uint32_t>(high);
+    array[1] = static_cast<uint32_t>(low >> 32);
+    array[2] = static_cast<uint32_t>(low);
+    return 3;
+  }
+
+  if (low >= std::numeric_limits<uint32_t>::max()) {
+    array[0] = static_cast<uint32_t>(low >> 32);
+    array[1] = static_cast<uint32_t>(low);
+    return 2;
+  }
+
+  if (low == 0) {
+    return 0;
+  }
+
+  array[0] = static_cast<uint32_t>(low);
+  return 1;
+}
+
+/// \brief Find last set bit in a 32 bit integer. Bit 1 is the LSB and bit 32 is the MSB.
+static int64_t FindLastSetBit(uint32_t value) {
+#if defined(__clang__) || defined(__GNUC__)
+  // Count leading zeros
+  return __builtin_clz(value) + 1;
+#elif defined(_MSC_VER)
+  unsigned long index;                                         // NOLINT
+  _BitScanReverse(&index, static_cast<unsigned long>(value));  // NOLINT
+  return static_cast<int64_t>(index + 1UL);
+#endif
+}
+
+/// Shift the number in the array left by bits positions.
+/// \param array the number to shift, must have length elements
+/// \param length the number of entries in the array
+/// \param bits the number of bits to shift (0 <= bits < 32)
+static void ShiftArrayLeft(uint32_t* array, int64_t length, int64_t bits) {
+  if (length > 0 && bits != 0) {
+    for (int64_t i = 0; i < length - 1; ++i) {
+      array[i] = (array[i] << bits) | (array[i + 1] >> (32 - bits));
+    }
+    array[length - 1] <<= bits;
+  }
+}
+
+/// Shift the number in the array right by bits positions.
+/// \param array the number to shift, must have length elements
+/// \param length the number of entries in the array
+/// \param bits the number of bits to shift (0 <= bits < 32)
+static void ShiftArrayRight(uint32_t* array, int64_t length, int64_t bits) {
+  if (length > 0 && bits != 0) {
+    for (int64_t i = length - 1; i > 0; --i) {
+      array[i] = (array[i] >> bits) | (array[i - 1] << (32 - bits));
+    }
+    array[0] >>= bits;
+  }
+}
+
+/// \brief Fix the signs of the result and remainder at the end of the division based on
+/// the signs of the dividend and divisor.
+static void FixDivisionSigns(Int128* result, Int128* remainder,
+                             bool dividend_was_negative, bool divisor_was_negative) {
+  if (dividend_was_negative != divisor_was_negative) {
+    result->Negate();
+  }
+
+  if (dividend_was_negative) {
+    remainder->Negate();
+  }
+}
+
+/// \brief Build a Int128 from a list of ints.
+static Status BuildFromArray(Int128* value, uint32_t* array, int64_t length) {
+  switch (length) {
+    case 0:
+      *value = {static_cast<int64_t>(0)};
+      break;
+    case 1:
+      *value = {static_cast<int64_t>(array[0])};
+      break;
+    case 2:
+      *value = {static_cast<int64_t>(0),
+                (static_cast<uint64_t>(array[0]) << 32) + array[1]};
+      break;
+    case 3:
+      *value = {static_cast<int64_t>(array[0]),
+                (static_cast<uint64_t>(array[1]) << 32) + array[2]};
+      break;
+    case 4:
+      *value = {(static_cast<int64_t>(array[0]) << 32) + array[1],
+                (static_cast<uint64_t>(array[2]) << 32) + array[3]};
+      break;
+    case 5:
+      if (array[0] != 0) {
+        return Status::Invalid("Can't build Int128 with 5 ints.");
+      }
+      *value = {(static_cast<int64_t>(array[1]) << 32) + array[2],
+                (static_cast<uint64_t>(array[3]) << 32) + array[4]};
+      break;
+    default:
+      return Status::Invalid("Unsupported length for building Int128");
+  }
+
+  return Status::OK();
+}
+
+/// \brief Do a division where the divisor fits into a single 32 bit value.
+static Status SingleDivide(const uint32_t* dividend, int64_t dividend_length,
+                           uint32_t divisor, Int128* remainder,
+                           bool dividend_was_negative, bool divisor_was_negative,
+                           Int128* result) {
+  uint64_t r = 0;
+  uint32_t result_array[5];
+  for (int64_t j = 0; j < dividend_length; j++) {
+    r <<= 32;
+    r += dividend[j];
+    result_array[j] = static_cast<uint32_t>(r / divisor);
+    r %= divisor;
+  }
+  RETURN_NOT_OK(BuildFromArray(result, result_array, dividend_length));
+  *remainder = static_cast<int64_t>(r);
+  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
+  return Status::OK();
+}
+
+Status Int128::Divide(const Int128& divisor, Int128* result, Int128* remainder) const {
+  // Split the dividend and divisor into integer pieces so that we can
+  // work on them.
+  uint32_t dividend_array[5];
+  uint32_t divisor_array[4];
+  bool dividend_was_negative;
+  bool divisor_was_negative;
+  // leave an extra zero before the dividend
+  dividend_array[0] = 0;
+  int64_t dividend_length =
+      FillInArray(*this, dividend_array + 1, dividend_was_negative) + 1;
+  int64_t divisor_length = FillInArray(divisor, divisor_array, divisor_was_negative);
+
+  // Handle some of the easy cases.
+  if (dividend_length <= divisor_length) {
+    *remainder = *this;
+    *result = 0;
+    return Status::OK();
+  }
+
+  if (divisor_length == 0) {
+    return Status::Invalid("Division by 0 in Int128");
+  }
+
+  if (divisor_length == 1) {
+    return SingleDivide(dividend_array, dividend_length, divisor_array[0], remainder,
+                        dividend_was_negative, divisor_was_negative, result);
+  }
+
+  int64_t result_length = dividend_length - divisor_length;
+  uint32_t result_array[4];
+
+  // Normalize by shifting both by a multiple of 2 so that
+  // the digit guessing is better. The requirement is that
+  // divisor_array[0] is greater than 2**31.
+  int64_t normalize_bits = 32 - FindLastSetBit(divisor_array[0]);
+  ShiftArrayLeft(divisor_array, divisor_length, normalize_bits);
+  ShiftArrayLeft(dividend_array, dividend_length, normalize_bits);
+
+  // compute each digit in the result
+  for (int64_t j = 0; j < result_length; ++j) {
+    // Guess the next digit. At worst it is two too large
+    uint32_t guess = std::numeric_limits<uint32_t>::max();
+    auto high_dividend =
+        static_cast<uint64_t>(dividend_array[j]) << 32 | dividend_array[j + 1];
+    if (dividend_array[j] != divisor_array[0]) {
+      guess = static_cast<uint32_t>(high_dividend / divisor_array[0]);
+    }
+
+    // catch all of the cases where guess is two too large and most of the
+    // cases where it is one too large
+    auto rhat = static_cast<uint32_t>(high_dividend -
+                                      guess * static_cast<uint64_t>(divisor_array[0]));
+    while (static_cast<uint64_t>(divisor_array[1]) * guess >
+           (static_cast<uint64_t>(rhat) << 32) + dividend_array[j + 2]) {
+      --guess;
+      rhat += divisor_array[0];
+      if (static_cast<uint64_t>(rhat) < divisor_array[0]) {
+        break;
+      }
+    }
+
+    // subtract off the guess * divisor from the dividend
+    uint64_t mult = 0;
+    for (int64_t i = divisor_length - 1; i >= 0; --i) {
+      mult += static_cast<uint64_t>(guess) * divisor_array[i];
+      uint32_t prev = dividend_array[j + i + 1];
+      dividend_array[j + i + 1] -= static_cast<uint32_t>(mult);
+      mult >>= 32;
+      if (dividend_array[j + i + 1] > prev) {
+        ++mult;
+      }
+    }
+    uint32_t prev = dividend_array[j];
+    dividend_array[j] -= static_cast<uint32_t>(mult);
+
+    // if guess was too big, we add back divisor
+    if (dividend_array[j] > prev) {
+      --guess;
+
+      uint32_t carry = 0;
+      for (int64_t i = divisor_length - 1; i >= 0; --i) {
+        uint64_t sum =
+            static_cast<uint64_t>(divisor_array[i]) + dividend_array[j + i + 1] + carry;
+        dividend_array[j + i + 1] = static_cast<uint32_t>(sum);
+        carry = static_cast<uint32_t>(sum >> 32);
+      }
+      dividend_array[j] += carry;
+    }
+
+    result_array[j] = guess;
+  }
+
+  // denormalize the remainder
+  ShiftArrayRight(dividend_array, dividend_length, normalize_bits);
+
+  // return result and remainder
+  RETURN_NOT_OK(BuildFromArray(result, result_array, result_length));
+  RETURN_NOT_OK(BuildFromArray(remainder, dividend_array, dividend_length));
+  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
+  return Status::OK();
+}
+
+bool operator==(const Int128& left, const Int128& right) {
+  return left.high_bits() == right.high_bits() && left.low_bits() == right.low_bits();
+}
+
+bool operator!=(const Int128& left, const Int128& right) {
+  return !operator==(left, right);
+}
+
+bool operator<(const Int128& left, const Int128& right) {
+  return left.high_bits() < right.high_bits() ||
+         (left.high_bits() == right.high_bits() && left.low_bits() < right.low_bits());
+}
+
+bool operator<=(const Int128& left, const Int128& right) {
+  return !operator>(left, right);
+}
+
+bool operator>(const Int128& left, const Int128& right) { return operator<(right, left); }
+
+bool operator>=(const Int128& left, const Int128& right) {
+  return !operator<(left, right);
+}
+
+Int128 operator-(const Int128& operand) {
+  Int128 result(operand.high_bits(), operand.low_bits());
+  return result.Negate();
+}
+
+Int128 operator+(const Int128& left, const Int128& right) {
+  Int128 result(left.high_bits(), left.low_bits());
+  result += right;
+  return result;
+}
+
+Int128 operator-(const Int128& left, const Int128& right) {
+  Int128 result(left.high_bits(), left.low_bits());
+  result -= right;
+  return result;
+}
+
+Int128 operator*(const Int128& left, const Int128& right) {
+  Int128 result(left.high_bits(), left.low_bits());
+  result *= right;
+  return result;
+}
+
+Int128 operator/(const Int128& left, const Int128& right) {
+  Int128 remainder;
+  Int128 result;
+  DCHECK(left.Divide(right, &result, &remainder).ok());
+  return result;
+}
+
+Int128 operator%(const Int128& left, const Int128& right) {
+  Int128 remainder;
+  Int128 result;
+  DCHECK(left.Divide(right, &result, &remainder).ok());
+  return remainder;
+}
+
+}  // namespace decimal
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/750b77dc/cpp/src/arrow/util/int128.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/int128.h b/cpp/src/arrow/util/int128.h
new file mode 100644
index 0000000..1d4bd40
--- /dev/null
+++ b/cpp/src/arrow/util/int128.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ARROW_INT128_H
+#define ARROW_INT128_H
+
+#include <string>
+
+#include "arrow/status.h"
+
+namespace arrow {
+namespace decimal {
+
+/// Represents a signed 128-bit integer in two's complement.
+/// Calculations wrap around and overflow is ignored.
+///
+/// For a discussion of the algorithms, look at Knuth's volume 2,
+/// Semi-numerical Algorithms section 4.3.1.
+///
+/// Adapted from the Apache ORC C++ implementation
+class ARROW_EXPORT Int128 {
+ public:
+  constexpr Int128() : Int128(0, 0) {}
+
+  /// \brief Convert a signed 64 bit value into an Int128.
+  constexpr Int128(int64_t value)
+      : Int128(value >= 0 ? 0 : -1, static_cast<uint64_t>(value)) {}
+
+  /// \brief Convert a signed 32 bit value into an Int128.
+  constexpr Int128(int32_t value) : Int128(static_cast<int64_t>(value)) {}
+
+  /// \brief Create an Int128 from the two's complement representation.
+  constexpr Int128(int64_t high, uint64_t low) : high_bits_(high), low_bits_(low) {}
+
+  /// \brief Parse the number from a base 10 string representation.
+  explicit Int128(const std::string& value);
+
+  /// \brief Create an Int128 from an array of bytes
+  explicit Int128(const uint8_t* bytes);
+
+  /// \brief Negate the current value
+  Int128& Negate();
+
+  /// \brief Add a number to this one. The result is truncated to 128 bits.
+  Int128& operator+=(const Int128& right);
+
+  /// \brief Subtract a number from this one. The result is truncated to 128 bits.
+  Int128& operator-=(const Int128& right);
+
+  /// \brief Multiply this number by another number. The result is truncated to 128 bits.
+  Int128& operator*=(const Int128& right);
+
+  /// Divide this number by right and return the result. This operation is
+  /// not destructive.
+  /// The answer rounds to zero. Signs work like:
+  ///   21 /  5 ->  4,  1
+  ///  -21 /  5 -> -4, -1
+  ///   21 / -5 -> -4,  1
+  ///  -21 / -5 ->  4, -1
+  /// @param right the number to divide by
+  /// @param remainder the remainder after the division
+  Status Divide(const Int128& divisor, Int128* result, Int128* remainder) const;
+
+  /// \brief In-place division.
+  Int128& operator/=(const Int128& right);
+
+  /// \brief Cast the value to char. This is used when converting the value a string.
+  explicit operator char() const;
+
+  /// \brief Bitwise or between two Int128.
+  Int128& operator|=(const Int128& right);
+
+  /// \brief Bitwise and between two Int128.
+  Int128& operator&=(const Int128& right);
+
+  /// \brief Shift left by the given number of bits.
+  Int128& operator<<=(uint32_t bits);
+
+  /// \brief Shift right by the given number of bits. Negative values will
+  Int128& operator>>=(uint32_t bits);
+
+  /// \brief Get the high bits of the two's complement representation of the number.
+  int64_t high_bits() const { return high_bits_; }
+
+  /// \brief Get the low bits of the two's complement representation of the number.
+  uint64_t low_bits() const { return low_bits_; }
+
+  /// \brief Put the raw bytes of the value into a pointer to uint8_t.
+  void ToBytes(uint8_t** out) const;
+
+ private:
+  int64_t high_bits_;
+  uint64_t low_bits_;
+};
+
+ARROW_EXPORT bool operator==(const Int128& left, const Int128& right);
+ARROW_EXPORT bool operator!=(const Int128& left, const Int128& right);
+ARROW_EXPORT bool operator<(const Int128& left, const Int128& right);
+ARROW_EXPORT bool operator<=(const Int128& left, const Int128& right);
+ARROW_EXPORT bool operator>(const Int128& left, const Int128& right);
+ARROW_EXPORT bool operator>=(const Int128& left, const Int128& right);
+
+ARROW_EXPORT Int128 operator-(const Int128& operand);
+ARROW_EXPORT Int128 operator+(const Int128& left, const Int128& right);
+ARROW_EXPORT Int128 operator-(const Int128& left, const Int128& right);
+ARROW_EXPORT Int128 operator*(const Int128& left, const Int128& right);
+ARROW_EXPORT Int128 operator/(const Int128& left, const Int128& right);
+ARROW_EXPORT Int128 operator%(const Int128& left, const Int128& right);
+
+}  // namespace decimal
+}  // namespace arrow
+
+#endif  //  ARROW_INT128_H


Mime
View raw message