arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-620: [C++] Implement JSON integration test support for date, time, timestamp, fixed width binary
Date Tue, 28 Mar 2017 14:07:50 GMT
Repository: arrow
Updated Branches:
  refs/heads/master e717d4786 -> 3b71d87c5


ARROW-620: [C++] Implement JSON integration test support for date, time, timestamp, fixed width binary

This also contains some code scrubbing, and uses inline visitors in the JSON reader/writer path

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #446 from wesm/ARROW-620 and squashes the following commits:

46978aa [Wes McKinney] Fix compiler warning
cd714f8 [Wes McKinney] No underscores. Fix bug slicing null buffer
d26ad14 [Wes McKinney] Implement FixedWidthBinary support in JSON reader/writer. Make FWBinaryArray subclass of PrimitiveArray
bd5652e [Wes McKinney] Get date/time/timestamp JSON tests passing. Cleanup, fix large metadata issues
fcbf64a [Wes McKinney] Refactoring, implement record batch round trip fixture for JSON, failing unit tests


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3b71d87c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3b71d87c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3b71d87c

Branch: refs/heads/master
Commit: 3b71d87c5e2a79cc5955e6cb73fa4a5cc906458f
Parents: e717d47
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Mar 28 10:07:44 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Mar 28 10:07:44 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/array.cc                   |   5 +-
 cpp/src/arrow/array.h                    |   7 +-
 cpp/src/arrow/compare.cc                 |  14 +-
 cpp/src/arrow/ipc/ipc-json-test.cc       |  43 +-
 cpp/src/arrow/ipc/ipc-read-write-test.cc |  63 +--
 cpp/src/arrow/ipc/json-internal.cc       | 693 ++++++++++++++------------
 cpp/src/arrow/ipc/metadata.cc            |   6 +-
 cpp/src/arrow/ipc/metadata.h             |   4 +-
 cpp/src/arrow/ipc/reader.cc              |   2 +
 cpp/src/arrow/ipc/test-common.h          |  40 +-
 cpp/src/arrow/ipc/writer.cc              |   8 +-
 cpp/src/arrow/tensor.cc                  |   3 +-
 cpp/src/arrow/type.cc                    |  31 +-
 cpp/src/arrow/type.h                     |  71 +--
 14 files changed, 545 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index cff0126..3ea0333 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -283,12 +283,9 @@ std::shared_ptr<Array> StringArray::Slice(int64_t offset, int64_t length) const
 FixedWidthBinaryArray::FixedWidthBinaryArray(const std::shared_ptr<DataType>& type,
     int64_t length, const std::shared_ptr<Buffer>& data,
     const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset)
-    : Array(type, length, null_bitmap, null_count, offset),
-      data_(data),
-      raw_data_(nullptr) {
+    : PrimitiveArray(type, length, data, null_bitmap, null_count, offset) {
   DCHECK(type->type == Type::FIXED_WIDTH_BINARY);
   byte_width_ = static_cast<const FixedWidthBinaryType&>(*type).byte_width();
-  if (data) { raw_data_ = data->data(); }
 }
 
 std::shared_ptr<Array> FixedWidthBinaryArray::Slice(

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index cc0cf98..c0ec571 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -347,7 +347,7 @@ class ARROW_EXPORT StringArray : public BinaryArray {
 // ----------------------------------------------------------------------
 // Fixed width binary
 
-class ARROW_EXPORT FixedWidthBinaryArray : public Array {
+class ARROW_EXPORT FixedWidthBinaryArray : public PrimitiveArray {
  public:
   using TypeClass = FixedWidthBinaryType;
 
@@ -360,9 +360,6 @@ class ARROW_EXPORT FixedWidthBinaryArray : public Array {
     return raw_data_ + (i + offset_) * byte_width_;
   }
 
-  /// Note that this buffer does not account for any slice offset
-  std::shared_ptr<Buffer> data() const { return data_; }
-
   int32_t byte_width() const { return byte_width_; }
 
   const uint8_t* raw_data() const { return raw_data_; }
@@ -371,8 +368,6 @@ class ARROW_EXPORT FixedWidthBinaryArray : public Array {
 
  protected:
   int32_t byte_width_;
-  std::shared_ptr<Buffer> data_;
-  const uint8_t* raw_data_;
 };
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index 8274e0f..3e282f8 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -666,14 +666,12 @@ class TypeEqualsVisitor {
     return Status::OK();
   }
 
-  Status Visit(const Time32Type& left) {
-    const auto& right = static_cast<const Time32Type&>(right_);
-    result_ = left.unit == right.unit;
-    return Status::OK();
-  }
-
-  Status Visit(const Time64Type& left) {
-    const auto& right = static_cast<const Time64Type&>(right_);
+  template <typename T>
+  typename std::enable_if<std::is_base_of<TimeType, T>::value ||
+                              std::is_base_of<DateType, T>::value,
+      Status>::type
+  Visit(const T& left) {
+    const auto& right = static_cast<const T&>(right_);
     result_ = left.unit == right.unit;
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index e943ef1..68261ab 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -75,7 +75,8 @@ void TestArrayRoundTrip(const Array& array) {
   std::shared_ptr<Array> out;
   ASSERT_OK(ReadJsonArray(default_memory_pool(), d, array.type(), &out));
 
-  ASSERT_TRUE(array.Equals(out)) << array_as_json;
+  // std::cout << array_as_json << std::endl;
+  CompareArraysDetailed(0, *out, array);
 }
 
 template <typename T, typename ValueType>
@@ -351,5 +352,45 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
   ASSERT_TRUE(batch->column(1)->Equals(bar));
 }
 
+#define BATCH_CASES()                                                                   \
+  ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
+      &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch,   \
+      &MakeStruct, &MakeUnion, &MakeDates, &MakeTimestamps, &MakeTimes, &MakeFWBinary);
+
+class TestJsonRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+  void SetUp() {}
+  void TearDown() {}
+};
+
+void CheckRoundtrip(const RecordBatch& batch) {
+  std::unique_ptr<JsonWriter> writer;
+  ASSERT_OK(JsonWriter::Open(batch.schema(), &writer));
+  ASSERT_OK(writer->WriteRecordBatch(batch));
+
+  std::string result;
+  ASSERT_OK(writer->Finish(&result));
+
+  auto buffer = std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(result.c_str()),
+      static_cast<int64_t>(result.size()));
+
+  std::unique_ptr<JsonReader> reader;
+  ASSERT_OK(JsonReader::Open(buffer, &reader));
+
+  std::shared_ptr<RecordBatch> result_batch;
+  ASSERT_OK(reader->GetRecordBatch(0, &result_batch));
+
+  CompareBatch(batch, *result_batch);
+}
+
+TEST_P(TestJsonRoundTrip, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  CheckRoundtrip(*batch);
+}
+
+INSTANTIATE_TEST_CASE_P(TestJsonRoundTrip, TestJsonRoundTrip, BATCH_CASES());
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 086cc68..cd3f190 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -41,20 +41,6 @@
 namespace arrow {
 namespace ipc {
 
-void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
-  if (!left.schema()->Equals(right.schema())) {
-    FAIL() << "Left schema: " << left.schema()->ToString()
-           << "\nRight schema: " << right.schema()->ToString();
-  }
-  ASSERT_EQ(left.num_columns(), right.num_columns())
-      << left.schema()->ToString() << " result: " << right.schema()->ToString();
-  EXPECT_EQ(left.num_rows(), right.num_rows());
-  for (int i = 0; i < left.num_columns(); ++i) {
-    EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
-        << "Idx: " << i << " Name: " << left.column_name(i);
-  }
-}
-
 using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
 
 class TestSchemaMetadata : public ::testing::Test {
@@ -85,17 +71,17 @@ class TestSchemaMetadata : public ::testing::Test {
 const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
 
 TEST_F(TestSchemaMetadata, PrimitiveFields) {
-  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>(), false);
-  auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
-  auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
-  auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
-  auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
-  auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
-  auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
-  auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
-  auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>(), false);
-  auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
+  auto f0 = field("f0", std::make_shared<Int8Type>());
+  auto f1 = field("f1", std::make_shared<Int16Type>(), false);
+  auto f2 = field("f2", std::make_shared<Int32Type>());
+  auto f3 = field("f3", std::make_shared<Int64Type>());
+  auto f4 = field("f4", std::make_shared<UInt8Type>());
+  auto f5 = field("f5", std::make_shared<UInt16Type>());
+  auto f6 = field("f6", std::make_shared<UInt32Type>());
+  auto f7 = field("f7", std::make_shared<UInt64Type>());
+  auto f8 = field("f8", std::make_shared<FloatType>());
+  auto f9 = field("f9", std::make_shared<DoubleType>(), false);
+  auto f10 = field("f10", std::make_shared<BooleanType>());
 
   Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
   DictionaryMemo memo;
@@ -105,11 +91,11 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
 
 TEST_F(TestSchemaMetadata, NestedFields) {
   auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
-  auto f0 = std::make_shared<Field>("f0", type);
+  auto f0 = field("f0", type);
 
-  std::shared_ptr<StructType> type2(new StructType({std::make_shared<Field>("k1", INT32),
-      std::make_shared<Field>("k2", INT32), std::make_shared<Field>("k3", INT32)}));
-  auto f1 = std::make_shared<Field>("f1", type2);
+  std::shared_ptr<StructType> type2(
+      new StructType({field("k1", INT32), field("k2", INT32), field("k3", INT32)}));
+  auto f1 = field("f1", type2);
 
   Schema schema({f0, f1});
   DictionaryMemo memo;
@@ -172,20 +158,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
     ASSERT_EQ(expected.num_columns(), result.num_columns())
         << expected.schema()->ToString() << " result: " << result.schema()->ToString();
 
-    for (int i = 0; i < expected.num_columns(); ++i) {
-      const auto& left = *expected.column(i);
-      const auto& right = *result.column(i);
-      if (!left.Equals(right)) {
-        std::stringstream pp_result;
-        std::stringstream pp_expected;
-
-        ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
-        ASSERT_OK(PrettyPrint(right, 0, &pp_result));
-
-        FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
-               << "\nGot: " << pp_result.str();
-      }
-    }
+    CompareBatchColumnsDetailed(result, expected);
   }
 
   void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
@@ -549,7 +522,7 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
   std::vector<std::shared_ptr<Field>> fields = {f0};
   auto schema = std::make_shared<Schema>(fields);
 
-  RecordBatch batch(schema, 0, {array});
+  RecordBatch batch(schema, length, {array});
 
   std::string path = "test-write-large-record_batch";
 
@@ -562,6 +535,8 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
   ASSERT_OK(DoLargeRoundTrip(batch, false, &result));
   CheckReadResult(*result, batch);
 
+  ASSERT_EQ(length, result->num_rows());
+
   // Fails if we try to write this with the normal code path
   ASSERT_RAISES(Invalid, DoStandardRoundTrip(batch, false, &result));
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 3484680..95ab011 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -40,6 +40,7 @@
 #include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/string.h"
+#include "arrow/visitor_inline.h"
 
 namespace arrow {
 namespace ipc {
@@ -63,13 +64,13 @@ static std::string GetBufferTypeName(BufferType type) {
   return "UNKNOWN";
 }
 
-static std::string GetFloatingPrecisionName(FloatingPointMeta::Precision precision) {
+static std::string GetFloatingPrecisionName(FloatingPoint::Precision precision) {
   switch (precision) {
-    case FloatingPointMeta::HALF:
+    case FloatingPoint::HALF:
       return "HALF";
-    case FloatingPointMeta::SINGLE:
+    case FloatingPoint::SINGLE:
       return "SINGLE";
-    case FloatingPointMeta::DOUBLE:
+    case FloatingPoint::DOUBLE:
       return "DOUBLE";
     default:
       break;
@@ -93,7 +94,7 @@ static std::string GetTimeUnitName(TimeUnit unit) {
   return "UNKNOWN";
 }
 
-class JsonSchemaWriter : public TypeVisitor {
+class JsonSchemaWriter {
  public:
   explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
       : schema_(schema), writer_(writer) {}
@@ -120,7 +121,7 @@ class JsonSchemaWriter : public TypeVisitor {
     writer_->Bool(field.nullable);
 
     // Visit the type
-    RETURN_NOT_OK(field.type->Accept(this));
+    RETURN_NOT_OK(VisitTypeInline(*field.type, this));
     writer_->EndObject();
 
     return Status::OK();
@@ -139,25 +140,19 @@ class JsonSchemaWriter : public TypeVisitor {
       void>::type
   WriteTypeMetadata(const T& type) {}
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<IntegerMeta, T>::value, void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const Integer& type) {
     writer_->Key("bitWidth");
     writer_->Int(type.bit_width());
     writer_->Key("isSigned");
     writer_->Bool(type.is_signed());
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<FloatingPointMeta, T>::value, void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const FloatingPoint& type) {
     writer_->Key("precision");
     writer_->String(GetFloatingPrecisionName(type.precision()));
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<IntervalType, T>::value, void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const IntervalType& type) {
     writer_->Key("unit");
     switch (type.unit) {
       case IntervalType::Unit::YEAR_MONTH:
@@ -169,28 +164,45 @@ class JsonSchemaWriter : public TypeVisitor {
     }
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<Time32Type, T>::value ||
-                              std::is_base_of<Time64Type, T>::value ||
-                              std::is_base_of<TimestampType, T>::value,
-      void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const TimestampType& type) {
+    writer_->Key("unit");
+    writer_->String(GetTimeUnitName(type.unit));
+    if (type.timezone.size() > 0) {
+      writer_->Key("timezone");
+      writer_->String(type.timezone);
+    }
+  }
+
+  void WriteTypeMetadata(const TimeType& type) {
     writer_->Key("unit");
     writer_->String(GetTimeUnitName(type.unit));
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<DecimalType, T>::value, void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const DateType& type) {
+    writer_->Key("unit");
+    switch (type.unit) {
+      case DateUnit::DAY:
+        writer_->String("DAY");
+        break;
+      case DateUnit::MILLI:
+        writer_->String("MILLISECOND");
+        break;
+    }
+  }
+
+  void WriteTypeMetadata(const FixedWidthBinaryType& type) {
+    writer_->Key("byteWidth");
+    writer_->Int(type.byte_width());
+  }
+
+  void WriteTypeMetadata(const DecimalType& type) {
     writer_->Key("precision");
     writer_->Int(type.precision);
     writer_->Key("scale");
     writer_->Int(type.scale);
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<UnionType, T>::value, void>::type
-  WriteTypeMetadata(const T& type) {
+  void WriteTypeMetadata(const UnionType& type) {
     writer_->Key("mode");
     switch (type.mode) {
       case UnionMode::SPARSE:
@@ -268,86 +280,65 @@ class JsonSchemaWriter : public TypeVisitor {
     return Status::OK();
   }
 
-  Status Visit(const NullType& type) override { return WritePrimitive("null", type); }
-
-  Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); }
-
-  Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); }
-
-  Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); }
+  Status Visit(const NullType& type) { return WritePrimitive("null", type); }
 
-  Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); }
+  Status Visit(const BooleanType& type) { return WritePrimitive("bool", type); }
 
-  Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); }
+  Status Visit(const Integer& type) { return WritePrimitive("int", type); }
 
-  Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); }
-
-  Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); }
-
-  Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); }
-
-  Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); }
-
-  Status Visit(const HalfFloatType& type) override {
-    return WritePrimitive("floatingpoint", type);
-  }
-
-  Status Visit(const FloatType& type) override {
-    return WritePrimitive("floatingpoint", type);
-  }
-
-  Status Visit(const DoubleType& type) override {
+  Status Visit(const FloatingPoint& type) {
     return WritePrimitive("floatingpoint", type);
   }
 
-  Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); }
-
-  Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); }
-
-  // TODO
-  Status Visit(const Date32Type& type) override { return WritePrimitive("date", type); }
+  Status Visit(const DateType& type) { return WritePrimitive("date", type); }
 
-  Status Visit(const Date64Type& type) override { return WritePrimitive("date", type); }
+  Status Visit(const TimeType& type) { return WritePrimitive("time", type); }
 
-  Status Visit(const Time32Type& type) override { return WritePrimitive("time", type); }
+  Status Visit(const StringType& type) { return WriteVarBytes("utf8", type); }
 
-  Status Visit(const Time64Type& type) override { return WritePrimitive("time", type); }
+  Status Visit(const BinaryType& type) { return WriteVarBytes("binary", type); }
 
-  Status Visit(const TimestampType& type) override {
-    return WritePrimitive("timestamp", type);
+  Status Visit(const FixedWidthBinaryType& type) {
+    return WritePrimitive("fixedwidthbinary", type);
   }
 
-  Status Visit(const IntervalType& type) override {
-    return WritePrimitive("interval", type);
-  }
+  Status Visit(const TimestampType& type) { return WritePrimitive("timestamp", type); }
+
+  Status Visit(const IntervalType& type) { return WritePrimitive("interval", type); }
 
-  Status Visit(const ListType& type) override {
+  Status Visit(const ListType& type) {
     WriteName("list", type);
     RETURN_NOT_OK(WriteChildren(type.children()));
     WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
-  Status Visit(const StructType& type) override {
+  Status Visit(const StructType& type) {
     WriteName("struct", type);
     WriteChildren(type.children());
     WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
-  Status Visit(const UnionType& type) override {
+  Status Visit(const UnionType& type) {
     WriteName("union", type);
     WriteChildren(type.children());
     WriteBufferLayout(type.GetBufferLayout());
     return Status::OK();
   }
 
+  Status Visit(const DecimalType& type) { return Status::NotImplemented("decimal"); }
+
+  Status Visit(const DictionaryType& type) {
+    return Status::NotImplemented("dictionary");
+  }
+
  private:
   const Schema& schema_;
   RjWriter* writer_;
 };
 
-class JsonArrayWriter : public ArrayVisitor {
+class JsonArrayWriter {
  public:
   JsonArrayWriter(const std::string& name, const Array& array, RjWriter* writer)
       : name_(name), array_(array), writer_(writer) {}
@@ -362,7 +353,7 @@ class JsonArrayWriter : public ArrayVisitor {
     writer_->Key("count");
     writer_->Int(static_cast<int32_t>(arr.length()));
 
-    RETURN_NOT_OK(arr.Accept(this));
+    RETURN_NOT_OK(VisitArrayInline(arr, this));
 
     writer_->EndObject();
     return Status::OK();
@@ -411,9 +402,15 @@ class JsonArrayWriter : public ArrayVisitor {
     }
   }
 
-  template <typename T>
-  typename std::enable_if<std::is_base_of<BooleanArray, T>::value, void>::type
-  WriteDataValues(const T& arr) {
+  void WriteDataValues(const FixedWidthBinaryArray& arr) {
+    int32_t width = arr.byte_width();
+    for (int64_t i = 0; i < arr.length(); ++i) {
+      const char* buf = reinterpret_cast<const char*>(arr.GetValue(i));
+      writer_->String(HexEncode(buf, width));
+    }
+  }
+
+  void WriteDataValues(const BooleanArray& arr) {
     for (int i = 0; i < arr.length(); ++i) {
       writer_->Bool(arr.Value(i));
     }
@@ -458,23 +455,6 @@ class JsonArrayWriter : public ArrayVisitor {
     writer_->EndArray();
   }
 
-  template <typename T>
-  Status WritePrimitive(const T& array) {
-    WriteValidityField(array);
-    WriteDataField(array);
-    SetNoChildren();
-    return Status::OK();
-  }
-
-  template <typename T>
-  Status WriteVarBytes(const T& array) {
-    WriteValidityField(array);
-    WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
-    WriteDataField(array);
-    SetNoChildren();
-    return Status::OK();
-  }
-
   Status WriteChildren(const std::vector<std::shared_ptr<Field>>& fields,
       const std::vector<std::shared_ptr<Array>>& arrays) {
     writer_->Key("children");
@@ -486,53 +466,48 @@ class JsonArrayWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  Status Visit(const NullArray& array) override {
+  Status Visit(const NullArray& array) {
     SetNoChildren();
     return Status::OK();
   }
 
-  Status Visit(const BooleanArray& array) override { return WritePrimitive(array); }
-
-  Status Visit(const Int8Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const Int16Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const Int32Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const Int64Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const UInt8Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const UInt16Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const UInt32Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const UInt64Array& array) override { return WritePrimitive(array); }
-
-  Status Visit(const HalfFloatArray& array) override { return WritePrimitive(array); }
-
-  Status Visit(const FloatArray& array) override { return WritePrimitive(array); }
+  template <typename T>
+  typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value, Status>::type Visit(
+      const T& array) {
+    WriteValidityField(array);
+    WriteDataField(array);
+    SetNoChildren();
+    return Status::OK();
+  }
 
-  Status Visit(const DoubleArray& array) override { return WritePrimitive(array); }
+  template <typename T>
+  typename std::enable_if<std::is_base_of<BinaryArray, T>::value, Status>::type Visit(
+      const T& array) {
+    WriteValidityField(array);
+    WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
+    WriteDataField(array);
+    SetNoChildren();
+    return Status::OK();
+  }
 
-  Status Visit(const StringArray& array) override { return WriteVarBytes(array); }
+  Status Visit(const DecimalArray& array) { return Status::NotImplemented("decimal"); }
 
-  Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); }
+  Status Visit(const DictionaryArray& array) { return Status::NotImplemented("decimal"); }
 
-  Status Visit(const ListArray& array) override {
+  Status Visit(const ListArray& array) {
     WriteValidityField(array);
     WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
     auto type = static_cast<const ListType*>(array.type().get());
     return WriteChildren(type->children(), {array.values()});
   }
 
-  Status Visit(const StructArray& array) override {
+  Status Visit(const StructArray& array) {
     WriteValidityField(array);
     auto type = static_cast<const StructType*>(array.type().get());
     return WriteChildren(type->children(), array.fields());
   }
 
-  Status Visit(const UnionArray& array) override {
+  Status Visit(const UnionArray& array) {
     WriteValidityField(array);
     auto type = static_cast<const UnionType*>(array.type().get());
 
@@ -549,240 +524,256 @@ class JsonArrayWriter : public ArrayVisitor {
   RjWriter* writer_;
 };
 
-class JsonSchemaReader {
- public:
-  explicit JsonSchemaReader(const rj::Value& json_schema) : json_schema_(json_schema) {}
-
-  Status GetSchema(std::shared_ptr<Schema>* schema) {
-    const auto& obj_schema = json_schema_.GetObject();
+static Status GetInteger(
+    const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_bit_width = json_type.FindMember("bitWidth");
+  RETURN_NOT_INT("bitWidth", json_bit_width, json_type);
 
-    const auto& json_fields = obj_schema.FindMember("fields");
-    RETURN_NOT_ARRAY("fields", json_fields, obj_schema);
+  const auto& json_is_signed = json_type.FindMember("isSigned");
+  RETURN_NOT_BOOL("isSigned", json_is_signed, json_type);
 
-    std::vector<std::shared_ptr<Field>> fields;
-    RETURN_NOT_OK(GetFieldsFromArray(json_fields->value, &fields));
+  bool is_signed = json_is_signed->value.GetBool();
+  int bit_width = json_bit_width->value.GetInt();
 
-    *schema = std::make_shared<Schema>(fields);
-    return Status::OK();
+  switch (bit_width) {
+    case 8:
+      *type = is_signed ? int8() : uint8();
+      break;
+    case 16:
+      *type = is_signed ? int16() : uint16();
+      break;
+    case 32:
+      *type = is_signed ? int32() : uint32();
+      break;
+    case 64:
+      *type = is_signed ? int64() : uint64();
+      break;
+    default:
+      std::stringstream ss;
+      ss << "Invalid bit width: " << bit_width;
+      return Status::Invalid(ss.str());
   }
+  return Status::OK();
+}
 
-  Status GetFieldsFromArray(
-      const rj::Value& obj, std::vector<std::shared_ptr<Field>>* fields) {
-    const auto& values = obj.GetArray();
+static Status GetFloatingPoint(
+    const RjObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_precision = json_type.FindMember("precision");
+  RETURN_NOT_STRING("precision", json_precision, json_type);
 
-    fields->resize(values.Size());
-    for (rj::SizeType i = 0; i < fields->size(); ++i) {
-      RETURN_NOT_OK(GetField(values[i], &(*fields)[i]));
-    }
-    return Status::OK();
+  std::string precision = json_precision->value.GetString();
+
+  if (precision == "DOUBLE") {
+    *type = float64();
+  } else if (precision == "SINGLE") {
+    *type = float32();
+  } else if (precision == "HALF") {
+    *type = float16();
+  } else {
+    std::stringstream ss;
+    ss << "Invalid precision: " << precision;
+    return Status::Invalid(ss.str());
   }
+  return Status::OK();
+}
 
-  Status GetField(const rj::Value& obj, std::shared_ptr<Field>* field) {
-    if (!obj.IsObject()) { return Status::Invalid("Field was not a JSON object"); }
-    const auto& json_field = obj.GetObject();
+static Status GetFixedWidthBinary(
+    const RjObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_byte_width = json_type.FindMember("byteWidth");
+  RETURN_NOT_INT("byteWidth", json_byte_width, json_type);
 
-    const auto& json_name = json_field.FindMember("name");
-    RETURN_NOT_STRING("name", json_name, json_field);
+  int32_t byte_width = json_byte_width->value.GetInt();
+  *type = fixed_width_binary(byte_width);
+  return Status::OK();
+}
 
-    const auto& json_nullable = json_field.FindMember("nullable");
-    RETURN_NOT_BOOL("nullable", json_nullable, json_field);
+static Status GetDate(const RjObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_unit = json_type.FindMember("unit");
+  RETURN_NOT_STRING("unit", json_unit, json_type);
 
-    const auto& json_type = json_field.FindMember("type");
-    RETURN_NOT_OBJECT("type", json_type, json_field);
+  std::string unit_str = json_unit->value.GetString();
 
-    const auto& json_children = json_field.FindMember("children");
-    RETURN_NOT_ARRAY("children", json_children, json_field);
+  if (unit_str == "DAY") {
+    *type = date32();
+  } else if (unit_str == "MILLISECOND") {
+    *type = date64();
+  } else {
+    std::stringstream ss;
+    ss << "Invalid date unit: " << unit_str;
+    return Status::Invalid(ss.str());
+  }
+  return Status::OK();
+}
 
-    std::vector<std::shared_ptr<Field>> children;
-    RETURN_NOT_OK(GetFieldsFromArray(json_children->value, &children));
+static Status GetTime(const RjObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_unit = json_type.FindMember("unit");
+  RETURN_NOT_STRING("unit", json_unit, json_type);
+
+  std::string unit_str = json_unit->value.GetString();
+
+  if (unit_str == "SECOND") {
+    *type = time32(TimeUnit::SECOND);
+  } else if (unit_str == "MILLISECOND") {
+    *type = time32(TimeUnit::MILLI);
+  } else if (unit_str == "MICROSECOND") {
+    *type = time64(TimeUnit::MICRO);
+  } else if (unit_str == "NANOSECOND") {
+    *type = time64(TimeUnit::NANO);
+  } else {
+    std::stringstream ss;
+    ss << "Invalid time unit: " << unit_str;
+    return Status::Invalid(ss.str());
+  }
+  return Status::OK();
+}
 
-    std::shared_ptr<DataType> type;
-    RETURN_NOT_OK(GetType(json_type->value.GetObject(), children, &type));
+static Status GetTimestamp(const RjObject& json_type, std::shared_ptr<DataType>* type) {
+  const auto& json_unit = json_type.FindMember("unit");
+  RETURN_NOT_STRING("unit", json_unit, json_type);
+
+  std::string unit_str = json_unit->value.GetString();
+
+  TimeUnit unit;
+  if (unit_str == "SECOND") {
+    unit = TimeUnit::SECOND;
+  } else if (unit_str == "MILLISECOND") {
+    unit = TimeUnit::MILLI;
+  } else if (unit_str == "MICROSECOND") {
+    unit = TimeUnit::MICRO;
+  } else if (unit_str == "NANOSECOND") {
+    unit = TimeUnit::NANO;
+  } else {
+    std::stringstream ss;
+    ss << "Invalid time unit: " << unit_str;
+    return Status::Invalid(ss.str());
+  }
 
-    *field = std::make_shared<Field>(
-        json_name->value.GetString(), type, json_nullable->value.GetBool());
-    return Status::OK();
+  const auto& json_tz = json_type.FindMember("timezone");
+  if (json_tz == json_type.MemberEnd()) {
+    *type = timestamp(unit);
+  } else {
+    *type = timestamp(unit, json_tz->value.GetString());
   }
 
-  Status GetInteger(
-      const rj::Value::ConstObject& json_type, std::shared_ptr<DataType>* type) {
-    const auto& json_bit_width = json_type.FindMember("bitWidth");
-    RETURN_NOT_INT("bitWidth", json_bit_width, json_type);
+  return Status::OK();
+}
 
-    const auto& json_is_signed = json_type.FindMember("isSigned");
-    RETURN_NOT_BOOL("isSigned", json_is_signed, json_type);
+static Status GetUnion(const RjObject& json_type,
+    const std::vector<std::shared_ptr<Field>>& children,
+    std::shared_ptr<DataType>* type) {
+  const auto& json_mode = json_type.FindMember("mode");
+  RETURN_NOT_STRING("mode", json_mode, json_type);
 
-    bool is_signed = json_is_signed->value.GetBool();
-    int bit_width = json_bit_width->value.GetInt();
+  std::string mode_str = json_mode->value.GetString();
+  UnionMode mode;
 
-    switch (bit_width) {
-      case 8:
-        *type = is_signed ? int8() : uint8();
-        break;
-      case 16:
-        *type = is_signed ? int16() : uint16();
-        break;
-      case 32:
-        *type = is_signed ? int32() : uint32();
-        break;
-      case 64:
-        *type = is_signed ? int64() : uint64();
-        break;
-      default:
-        std::stringstream ss;
-        ss << "Invalid bit width: " << bit_width;
-        return Status::Invalid(ss.str());
-    }
-    return Status::OK();
+  if (mode_str == "SPARSE") {
+    mode = UnionMode::SPARSE;
+  } else if (mode_str == "DENSE") {
+    mode = UnionMode::DENSE;
+  } else {
+    std::stringstream ss;
+    ss << "Invalid union mode: " << mode_str;
+    return Status::Invalid(ss.str());
   }
 
-  Status GetFloatingPoint(const RjObject& json_type, std::shared_ptr<DataType>* type) {
-    const auto& json_precision = json_type.FindMember("precision");
-    RETURN_NOT_STRING("precision", json_precision, json_type);
-
-    std::string precision = json_precision->value.GetString();
+  const auto& json_type_codes = json_type.FindMember("typeIds");
+  RETURN_NOT_ARRAY("typeIds", json_type_codes, json_type);
 
-    if (precision == "DOUBLE") {
-      *type = float64();
-    } else if (precision == "SINGLE") {
-      *type = float32();
-    } else if (precision == "HALF") {
-      *type = float16();
-    } else {
-      std::stringstream ss;
-      ss << "Invalid precision: " << precision;
-      return Status::Invalid(ss.str());
-    }
-    return Status::OK();
+  std::vector<uint8_t> type_codes;
+  const auto& id_array = json_type_codes->value.GetArray();
+  for (const rj::Value& val : id_array) {
+    DCHECK(val.IsUint());
+    type_codes.push_back(static_cast<uint8_t>(val.GetUint()));
   }
 
-  Status GetTime(const RjObject& json_type, std::shared_ptr<DataType>* type) {
-    const auto& json_unit = json_type.FindMember("unit");
-    RETURN_NOT_STRING("unit", json_unit, json_type);
-
-    std::string unit_str = json_unit->value.GetString();
-
-    if (unit_str == "SECOND") {
-      *type = time32(TimeUnit::SECOND);
-    } else if (unit_str == "MILLISECOND") {
-      *type = time32(TimeUnit::MILLI);
-    } else if (unit_str == "MICROSECOND") {
-      *type = time64(TimeUnit::MICRO);
-    } else if (unit_str == "NANOSECOND") {
-      *type = time64(TimeUnit::NANO);
-    } else {
-      std::stringstream ss;
-      ss << "Invalid time unit: " << unit_str;
-      return Status::Invalid(ss.str());
-    }
-    return Status::OK();
-  }
+  *type = union_(children, type_codes, mode);
 
-  Status GetTimestamp(const RjObject& json_type, std::shared_ptr<DataType>* type) {
-    const auto& json_unit = json_type.FindMember("unit");
-    RETURN_NOT_STRING("unit", json_unit, json_type);
+  return Status::OK();
+}
 
-    std::string unit_str = json_unit->value.GetString();
+static Status GetType(const RjObject& json_type,
+    const std::vector<std::shared_ptr<Field>>& children,
+    std::shared_ptr<DataType>* type) {
+  const auto& json_type_name = json_type.FindMember("name");
+  RETURN_NOT_STRING("name", json_type_name, json_type);
+
+  std::string type_name = json_type_name->value.GetString();
+
+  if (type_name == "int") {
+    return GetInteger(json_type, type);
+  } else if (type_name == "floatingpoint") {
+    return GetFloatingPoint(json_type, type);
+  } else if (type_name == "bool") {
+    *type = boolean();
+  } else if (type_name == "utf8") {
+    *type = utf8();
+  } else if (type_name == "binary") {
+    *type = binary();
+  } else if (type_name == "fixedwidthbinary") {
+    return GetFixedWidthBinary(json_type, type);
+  } else if (type_name == "null") {
+    *type = null();
+  } else if (type_name == "date") {
+    return GetDate(json_type, type);
+  } else if (type_name == "time") {
+    return GetTime(json_type, type);
+  } else if (type_name == "timestamp") {
+    return GetTimestamp(json_type, type);
+  } else if (type_name == "list") {
+    *type = list(children[0]);
+  } else if (type_name == "struct") {
+    *type = struct_(children);
+  } else {
+    return GetUnion(json_type, children, type);
+  }
+  return Status::OK();
+}
 
-    TimeUnit unit;
-    if (unit_str == "SECOND") {
-      unit = TimeUnit::SECOND;
-    } else if (unit_str == "MILLISECOND") {
-      unit = TimeUnit::MILLI;
-    } else if (unit_str == "MICROSECOND") {
-      unit = TimeUnit::MICRO;
-    } else if (unit_str == "NANOSECOND") {
-      unit = TimeUnit::NANO;
-    } else {
-      std::stringstream ss;
-      ss << "Invalid time unit: " << unit_str;
-      return Status::Invalid(ss.str());
-    }
+static Status GetField(const rj::Value& obj, std::shared_ptr<Field>* field);
 
-    *type = timestamp(unit);
+static Status GetFieldsFromArray(
+    const rj::Value& obj, std::vector<std::shared_ptr<Field>>* fields) {
+  const auto& values = obj.GetArray();
 
-    return Status::OK();
+  fields->resize(values.Size());
+  for (rj::SizeType i = 0; i < fields->size(); ++i) {
+    RETURN_NOT_OK(GetField(values[i], &(*fields)[i]));
   }
+  return Status::OK();
+}
 
-  Status GetUnion(const RjObject& json_type,
-      const std::vector<std::shared_ptr<Field>>& children,
-      std::shared_ptr<DataType>* type) {
-    const auto& json_mode = json_type.FindMember("mode");
-    RETURN_NOT_STRING("mode", json_mode, json_type);
-
-    std::string mode_str = json_mode->value.GetString();
-    UnionMode mode;
+static Status GetField(const rj::Value& obj, std::shared_ptr<Field>* field) {
+  if (!obj.IsObject()) { return Status::Invalid("Field was not a JSON object"); }
+  const auto& json_field = obj.GetObject();
 
-    if (mode_str == "SPARSE") {
-      mode = UnionMode::SPARSE;
-    } else if (mode_str == "DENSE") {
-      mode = UnionMode::DENSE;
-    } else {
-      std::stringstream ss;
-      ss << "Invalid union mode: " << mode_str;
-      return Status::Invalid(ss.str());
-    }
+  const auto& json_name = json_field.FindMember("name");
+  RETURN_NOT_STRING("name", json_name, json_field);
 
-    const auto& json_type_codes = json_type.FindMember("typeIds");
-    RETURN_NOT_ARRAY("typeIds", json_type_codes, json_type);
+  const auto& json_nullable = json_field.FindMember("nullable");
+  RETURN_NOT_BOOL("nullable", json_nullable, json_field);
 
-    std::vector<uint8_t> type_codes;
-    const auto& id_array = json_type_codes->value.GetArray();
-    for (const rj::Value& val : id_array) {
-      DCHECK(val.IsUint());
-      type_codes.push_back(static_cast<uint8_t>(val.GetUint()));
-    }
+  const auto& json_type = json_field.FindMember("type");
+  RETURN_NOT_OBJECT("type", json_type, json_field);
 
-    *type = union_(children, type_codes, mode);
+  const auto& json_children = json_field.FindMember("children");
+  RETURN_NOT_ARRAY("children", json_children, json_field);
 
-    return Status::OK();
-  }
+  std::vector<std::shared_ptr<Field>> children;
+  RETURN_NOT_OK(GetFieldsFromArray(json_children->value, &children));
 
-  Status GetType(const RjObject& json_type,
-      const std::vector<std::shared_ptr<Field>>& children,
-      std::shared_ptr<DataType>* type) {
-    const auto& json_type_name = json_type.FindMember("name");
-    RETURN_NOT_STRING("name", json_type_name, json_type);
-
-    std::string type_name = json_type_name->value.GetString();
-
-    if (type_name == "int") {
-      return GetInteger(json_type, type);
-    } else if (type_name == "floatingpoint") {
-      return GetFloatingPoint(json_type, type);
-    } else if (type_name == "bool") {
-      *type = boolean();
-    } else if (type_name == "utf8") {
-      *type = utf8();
-    } else if (type_name == "binary") {
-      *type = binary();
-    } else if (type_name == "null") {
-      *type = null();
-    } else if (type_name == "date") {
-      // TODO
-      *type = date64();
-    } else if (type_name == "time") {
-      return GetTime(json_type, type);
-    } else if (type_name == "timestamp") {
-      return GetTimestamp(json_type, type);
-    } else if (type_name == "list") {
-      *type = list(children[0]);
-    } else if (type_name == "struct") {
-      *type = struct_(children);
-    } else {
-      return GetUnion(json_type, children, type);
-    }
-    return Status::OK();
-  }
+  std::shared_ptr<DataType> type;
+  RETURN_NOT_OK(GetType(json_type->value.GetObject(), children, &type));
 
- private:
-  const rj::Value& json_schema_;
-};
+  *field = std::make_shared<Field>(
+      json_name->value.GetString(), type, json_nullable->value.GetBool());
+  return Status::OK();
+}
 
 template <typename T>
 inline typename std::enable_if<IsSignedInt<T>::value, typename T::c_type>::type
 UnboxValue(const rj::Value& val) {
-  DCHECK(val.IsInt());
+  DCHECK(val.IsInt64());
   return static_cast<typename T::c_type>(val.GetInt64());
 }
 
@@ -833,8 +824,10 @@ class JsonArrayReader {
   }
 
   template <typename T>
-  typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
-                              std::is_base_of<BooleanType, T>::value,
+  typename std::enable_if<
+      std::is_base_of<PrimitiveCType, T>::value || std::is_base_of<DateType, T>::value ||
+          std::is_base_of<TimestampType, T>::value ||
+          std::is_base_of<TimeType, T>::value || std::is_base_of<BooleanType, T>::value,
       Status>::type
   ReadArray(const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
       const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
@@ -904,6 +897,47 @@ class JsonArrayReader {
   }
 
   template <typename T>
+  typename std::enable_if<std::is_base_of<FixedWidthBinaryType, T>::value, Status>::type
+  ReadArray(const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
+      const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
+    FixedWidthBinaryBuilder builder(pool_, type);
+
+    const auto& json_data = json_array.FindMember("DATA");
+    RETURN_NOT_ARRAY("DATA", json_data, json_array);
+
+    const auto& json_data_arr = json_data->value.GetArray();
+
+    DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length);
+
+    int32_t byte_width = static_cast<const FixedWidthBinaryType&>(*type).byte_width();
+
+    // Allocate space for parsed values
+    std::shared_ptr<MutableBuffer> byte_buffer;
+    RETURN_NOT_OK(AllocateBuffer(pool_, byte_width, &byte_buffer));
+    uint8_t* byte_buffer_data = byte_buffer->mutable_data();
+
+    for (int i = 0; i < length; ++i) {
+      if (!is_valid[i]) {
+        builder.AppendNull();
+        continue;
+      }
+
+      const rj::Value& val = json_data_arr[i];
+      DCHECK(val.IsString());
+      std::string hex_string = val.GetString();
+      DCHECK_EQ(static_cast<int32_t>(hex_string.size()), byte_width * 2)
+          << "Expected size: " << byte_width * 2 << " got: " << hex_string.size();
+      const char* hex_data = hex_string.c_str();
+
+      for (int32_t j = 0; j < byte_width; ++j) {
+        RETURN_NOT_OK(ParseHexValue(hex_data + j * 2, &byte_buffer_data[j]));
+      }
+      RETURN_NOT_OK(builder.Append(byte_buffer_data));
+    }
+    return builder.Finish(array);
+  }
+
+  template <typename T>
   Status GetIntArray(
       const RjArray& json_array, const int32_t length, std::shared_ptr<Buffer>* out) {
     std::shared_ptr<MutableBuffer> buffer;
@@ -1063,13 +1097,6 @@ class JsonArrayReader {
   case TYPE::type_id:   \
     return ReadArray<TYPE>(json_array, length, is_valid, type, array);
 
-#define NOT_IMPLEMENTED_CASE(TYPE_ENUM)      \
-  case Type::TYPE_ENUM: {                    \
-    std::stringstream ss;                    \
-    ss << type->ToString();                  \
-    return Status::NotImplemented(ss.str()); \
-  }
-
     switch (type->type) {
       TYPE_CASE(NullType);
       TYPE_CASE(BooleanType);
@@ -1086,16 +1113,15 @@ class JsonArrayReader {
       TYPE_CASE(DoubleType);
       TYPE_CASE(StringType);
       TYPE_CASE(BinaryType);
-      NOT_IMPLEMENTED_CASE(DATE32);
-      NOT_IMPLEMENTED_CASE(DATE64);
-      NOT_IMPLEMENTED_CASE(TIMESTAMP);
-      NOT_IMPLEMENTED_CASE(TIME32);
-      NOT_IMPLEMENTED_CASE(TIME64);
-      NOT_IMPLEMENTED_CASE(INTERVAL);
+      TYPE_CASE(FixedWidthBinaryType);
+      TYPE_CASE(Date32Type);
+      TYPE_CASE(Date64Type);
+      TYPE_CASE(TimestampType);
+      TYPE_CASE(Time32Type);
+      TYPE_CASE(Time64Type);
       TYPE_CASE(ListType);
       TYPE_CASE(StructType);
       TYPE_CASE(UnionType);
-      NOT_IMPLEMENTED_CASE(DICTIONARY);
       default:
         std::stringstream ss;
         ss << type->ToString();
@@ -1103,7 +1129,6 @@ class JsonArrayReader {
     }
 
 #undef TYPE_CASE
-#undef NOT_IMPLEMENTED_CASE
 
     return Status::OK();
   }
@@ -1118,8 +1143,16 @@ Status WriteJsonSchema(const Schema& schema, RjWriter* json_writer) {
 }
 
 Status ReadJsonSchema(const rj::Value& json_schema, std::shared_ptr<Schema>* schema) {
-  JsonSchemaReader converter(json_schema);
-  return converter.GetSchema(schema);
+  const auto& obj_schema = json_schema.GetObject();
+
+  const auto& json_fields = obj_schema.FindMember("fields");
+  RETURN_NOT_ARRAY("fields", json_fields, obj_schema);
+
+  std::vector<std::shared_ptr<Field>> fields;
+  RETURN_NOT_OK(GetFieldsFromArray(json_fields->value, &fields));
+
+  *schema = std::make_shared<Schema>(fields);
+  return Status::OK();
 }
 
 Status WriteJsonArray(

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 17af563..85dc8b3 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -602,7 +602,7 @@ static Status WriteBuffers(
   return Status::OK();
 }
 
-static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length,
+static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
     RecordBatchOffset* offset) {
   FieldNodeVector fb_nodes;
@@ -615,7 +615,7 @@ static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length,
   return Status::OK();
 }
 
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out) {
   FBB fbb;
@@ -625,7 +625,7 @@ Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
       fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out);
 }
 
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out) {
   FBB fbb;

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 6e903c0..f60fb77 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -188,11 +188,11 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
 Status WriteSchemaMessage(
     const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
 
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out);
 
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
     std::shared_ptr<Buffer>* out);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 83e03aa..03c678a 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -97,6 +97,8 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
 
   for (int i = 0; i < schema->num_fields(); ++i) {
     RETURN_NOT_OK(LoadArray(schema->field(i)->type, &context, &arrays[i]));
+    DCHECK_EQ(num_rows, arrays[i]->length())
+        << "Array length did not match record batch length";
   }
 
   *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays));

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 7ee57d2..994e128 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -29,6 +29,7 @@
 #include "arrow/buffer.h"
 #include "arrow/builder.h"
 #include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/test-util.h"
@@ -47,6 +48,41 @@ static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
   }
 }
 
+static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+  if (!left.schema()->Equals(right.schema())) {
+    FAIL() << "Left schema: " << left.schema()->ToString()
+           << "\nRight schema: " << right.schema()->ToString();
+  }
+  ASSERT_EQ(left.num_columns(), right.num_columns())
+      << left.schema()->ToString() << " result: " << right.schema()->ToString();
+  EXPECT_EQ(left.num_rows(), right.num_rows());
+  for (int i = 0; i < left.num_columns(); ++i) {
+    EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
+        << "Idx: " << i << " Name: " << left.column_name(i);
+  }
+}
+
+static inline void CompareArraysDetailed(
+    int index, const Array& result, const Array& expected) {
+  if (!expected.Equals(result)) {
+    std::stringstream pp_result;
+    std::stringstream pp_expected;
+
+    ASSERT_OK(PrettyPrint(expected, 0, &pp_expected));
+    ASSERT_OK(PrettyPrint(result, 0, &pp_result));
+
+    FAIL() << "Index: " << index << " Expected: " << pp_expected.str()
+           << "\nGot: " << pp_result.str();
+  }
+}
+
+static inline void CompareBatchColumnsDetailed(
+    const RecordBatch& result, const RecordBatch& expected) {
+  for (int i = 0; i < expected.num_columns(); ++i) {
+    CompareArraysDetailed(i, *result.column(i), *expected.column(i));
+  }
+}
+
 const auto kListInt32 = list(int32());
 const auto kListListInt32 = list(kListInt32);
 
@@ -474,7 +510,7 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) {
   ArrayFromVector<Date32Type, int32_t>(is_valid, date32_values, &date32_array);
 
   std::vector<int64_t> date64_values = {1489269000000, 1489270000000, 1489271000000,
-      1489272000000, 1489272000000, 1489273000000};
+      1489272000000, 1489272000000, 1489273000000, 1489274000000};
   std::shared_ptr<Array> date64_array;
   ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array);
 
@@ -548,7 +584,7 @@ Status MakeFWBinary(std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<Array> a1, a2;
 
   FixedWidthBinaryBuilder b1(default_memory_pool(), f0->type);
-  FixedWidthBinaryBuilder b2(default_memory_pool(), f0->type);
+  FixedWidthBinaryBuilder b2(default_memory_pool(), f1->type);
 
   std::vector<std::string> values1 = {"foo1", "foo2", "foo3", "foo4"};
   AppendValues(is_valid, values1, &b1);

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index e795ef9..da360f3 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -141,7 +141,7 @@ class RecordBatchWriter : public ArrayVisitor {
   virtual Status WriteMetadataMessage(
       int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
     return WriteRecordBatchMessage(
-        static_cast<int32_t>(num_rows), body_length, field_nodes_, buffer_meta_, out);
+        num_rows, body_length, field_nodes_, buffer_meta_, out);
   }
 
   Status WriteMetadata(int64_t num_rows, int64_t body_length, io::OutputStream* dst,
@@ -306,7 +306,7 @@ class RecordBatchWriter : public ArrayVisitor {
     auto data = array.data();
     int32_t width = array.byte_width();
 
-    if (array.offset() != 0) {
+    if (data && array.offset() != 0) {
       data = SliceBuffer(data, array.offset() * width, width * array.length());
     }
     buffers_.push_back(data);
@@ -476,8 +476,8 @@ class DictionaryWriter : public RecordBatchWriter {
 
   Status WriteMetadataMessage(
       int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
-    return WriteDictionaryMessage(dictionary_id_, static_cast<int32_t>(num_rows),
-        body_length, field_nodes_, buffer_meta_, out);
+    return WriteDictionaryMessage(
+        dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out);
   }
 
   Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/tensor.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/tensor.cc b/cpp/src/arrow/tensor.cc
index c0d128f..6489cd0 100644
--- a/cpp/src/arrow/tensor.cc
+++ b/cpp/src/arrow/tensor.cc
@@ -71,8 +71,7 @@ const std::string& Tensor::dim_name(int i) const {
 }
 
 int64_t Tensor::size() const {
-  return std::accumulate(
-      shape_.begin(), shape_.end(), 1, std::multiplies<int64_t>());
+  return std::accumulate(shape_.begin(), shape_.end(), 1, std::multiplies<int64_t>());
 }
 
 template <typename T>

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 3885022..c790f6e 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -45,8 +45,6 @@ std::string Field::ToString() const {
   return ss.str();
 }
 
-DataType::~DataType() {}
-
 bool DataType::Equals(const DataType& other) const {
   bool are_equal = false;
   Status error = TypeEquals(*this, other, &are_equal);
@@ -63,16 +61,16 @@ std::string BooleanType::ToString() const {
   return name();
 }
 
-FloatingPointMeta::Precision HalfFloatType::precision() const {
-  return FloatingPointMeta::HALF;
+FloatingPoint::Precision HalfFloatType::precision() const {
+  return FloatingPoint::HALF;
 }
 
-FloatingPointMeta::Precision FloatType::precision() const {
-  return FloatingPointMeta::SINGLE;
+FloatingPoint::Precision FloatType::precision() const {
+  return FloatingPoint::SINGLE;
 }
 
-FloatingPointMeta::Precision DoubleType::precision() const {
-  return FloatingPointMeta::DOUBLE;
+FloatingPoint::Precision DoubleType::precision() const {
+  return FloatingPoint::DOUBLE;
 }
 
 std::string StringType::ToString() const {
@@ -111,6 +109,16 @@ std::string StructType::ToString() const {
   return s.str();
 }
 
+// ----------------------------------------------------------------------
+// Date types
+
+DateType::DateType(Type::type type_id, DateUnit unit)
+    : FixedWidthType(type_id), unit(unit) {}
+
+Date32Type::Date32Type() : DateType(Type::DATE32, DateUnit::DAY) {}
+
+Date64Type::Date64Type() : DateType(Type::DATE64, DateUnit::MILLI) {}
+
 std::string Date64Type::ToString() const {
   return std::string("date64[ms]");
 }
@@ -122,7 +130,10 @@ std::string Date32Type::ToString() const {
 // ----------------------------------------------------------------------
 // Time types
 
-Time32Type::Time32Type(TimeUnit unit) : FixedWidthType(Type::TIME32), unit(unit) {
+TimeType::TimeType(Type::type type_id, TimeUnit unit)
+    : FixedWidthType(type_id), unit(unit) {}
+
+Time32Type::Time32Type(TimeUnit unit) : TimeType(Type::TIME32, unit) {
   DCHECK(unit == TimeUnit::SECOND || unit == TimeUnit::MILLI)
       << "Must be seconds or milliseconds";
 }
@@ -133,7 +144,7 @@ std::string Time32Type::ToString() const {
   return ss.str();
 }
 
-Time64Type::Time64Type(TimeUnit unit) : FixedWidthType(Type::TIME64), unit(unit) {
+Time64Type::Time64Type(TimeUnit unit) : TimeType(Type::TIME64, unit) {
   DCHECK(unit == TimeUnit::MICRO || unit == TimeUnit::NANO)
       << "Must be microseconds or nanoseconds";
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/3b71d87c/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 7ae5ae3..dc50ecd 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -132,7 +132,7 @@ struct ARROW_EXPORT DataType {
 
   explicit DataType(Type::type type) : type(type) {}
 
-  virtual ~DataType();
+  virtual ~DataType() = default;
 
   // Return whether the types are equal
   //
@@ -167,11 +167,17 @@ struct ARROW_EXPORT FixedWidthType : public DataType {
   std::vector<BufferDescr> GetBufferLayout() const override;
 };
 
-struct ARROW_EXPORT IntegerMeta {
+struct ARROW_EXPORT PrimitiveCType : public FixedWidthType {
+  using FixedWidthType::FixedWidthType;
+};
+
+struct ARROW_EXPORT Integer : public PrimitiveCType {
+  using PrimitiveCType::PrimitiveCType;
   virtual bool is_signed() const = 0;
 };
 
-struct ARROW_EXPORT FloatingPointMeta {
+struct ARROW_EXPORT FloatingPoint : public PrimitiveCType {
+  using PrimitiveCType::PrimitiveCType;
   enum Precision { HALF, SINGLE, DOUBLE };
   virtual Precision precision() const = 0;
 };
@@ -206,16 +212,12 @@ struct ARROW_EXPORT Field {
 
 typedef std::shared_ptr<Field> FieldPtr;
 
-struct ARROW_EXPORT PrimitiveCType : public FixedWidthType {
-  using FixedWidthType::FixedWidthType;
-};
-
-template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
-struct ARROW_EXPORT CTypeImpl : public PrimitiveCType {
+template <typename DERIVED, typename BASE, Type::type TYPE_ID, typename C_TYPE>
+struct ARROW_EXPORT CTypeImpl : public BASE {
   using c_type = C_TYPE;
   static constexpr Type::type type_id = TYPE_ID;
 
-  CTypeImpl() : PrimitiveCType(TYPE_ID) {}
+  CTypeImpl() : BASE(TYPE_ID) {}
 
   int bit_width() const override { return static_cast<int>(sizeof(C_TYPE) * 8); }
 
@@ -240,7 +242,7 @@ struct ARROW_EXPORT NullType : public DataType, public NoExtraMeta {
 };
 
 template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
-struct IntegerTypeImpl : public CTypeImpl<DERIVED, TYPE_ID, C_TYPE>, public IntegerMeta {
+struct IntegerTypeImpl : public CTypeImpl<DERIVED, Integer, TYPE_ID, C_TYPE> {
   bool is_signed() const override { return std::is_signed<C_TYPE>::value; }
 };
 
@@ -292,20 +294,19 @@ struct ARROW_EXPORT Int64Type : public IntegerTypeImpl<Int64Type, Type::INT64, i
 };
 
 struct ARROW_EXPORT HalfFloatType
-    : public CTypeImpl<HalfFloatType, Type::HALF_FLOAT, uint16_t>,
-      public FloatingPointMeta {
+    : public CTypeImpl<HalfFloatType, FloatingPoint, Type::HALF_FLOAT, uint16_t> {
   Precision precision() const override;
   static std::string name() { return "halffloat"; }
 };
 
-struct ARROW_EXPORT FloatType : public CTypeImpl<FloatType, Type::FLOAT, float>,
-                                public FloatingPointMeta {
+struct ARROW_EXPORT FloatType
+    : public CTypeImpl<FloatType, FloatingPoint, Type::FLOAT, float> {
   Precision precision() const override;
   static std::string name() { return "float"; }
 };
 
-struct ARROW_EXPORT DoubleType : public CTypeImpl<DoubleType, Type::DOUBLE, double>,
-                                 public FloatingPointMeta {
+struct ARROW_EXPORT DoubleType
+    : public CTypeImpl<DoubleType, FloatingPoint, Type::DOUBLE, double> {
   Precision precision() const override;
   static std::string name() { return "double"; }
 };
@@ -436,13 +437,23 @@ struct ARROW_EXPORT UnionType : public NestedType {
 // ----------------------------------------------------------------------
 // Date and time types
 
+enum class DateUnit : char { DAY = 0, MILLI = 1 };
+
+struct DateType : public FixedWidthType {
+ public:
+  DateUnit unit;
+
+ protected:
+  DateType(Type::type type_id, DateUnit unit);
+};
+
 /// Date as int32_t days since UNIX epoch
-struct ARROW_EXPORT Date32Type : public FixedWidthType, public NoExtraMeta {
+struct ARROW_EXPORT Date32Type : public DateType {
   static constexpr Type::type type_id = Type::DATE32;
 
   using c_type = int32_t;
 
-  Date32Type() : FixedWidthType(Type::DATE32) {}
+  Date32Type();
 
   int bit_width() const override { return static_cast<int>(sizeof(c_type) * 4); }
 
@@ -451,12 +462,12 @@ struct ARROW_EXPORT Date32Type : public FixedWidthType, public NoExtraMeta {
 };
 
 /// Date as int64_t milliseconds since UNIX epoch
-struct ARROW_EXPORT Date64Type : public FixedWidthType, public NoExtraMeta {
+struct ARROW_EXPORT Date64Type : public DateType {
   static constexpr Type::type type_id = Type::DATE64;
 
   using c_type = int64_t;
 
-  Date64Type() : FixedWidthType(Type::DATE64) {}
+  Date64Type();
 
   int bit_width() const override { return static_cast<int>(sizeof(c_type) * 8); }
 
@@ -485,13 +496,18 @@ static inline std::ostream& operator<<(std::ostream& os, TimeUnit unit) {
   return os;
 }
 
-struct ARROW_EXPORT Time32Type : public FixedWidthType {
+struct TimeType : public FixedWidthType {
+ public:
+  TimeUnit unit;
+
+ protected:
+  TimeType(Type::type type_id, TimeUnit unit);
+};
+
+struct ARROW_EXPORT Time32Type : public TimeType {
   static constexpr Type::type type_id = Type::TIME32;
-  using Unit = TimeUnit;
   using c_type = int32_t;
 
-  TimeUnit unit;
-
   int bit_width() const override { return static_cast<int>(sizeof(c_type) * 4); }
 
   explicit Time32Type(TimeUnit unit = TimeUnit::MILLI);
@@ -500,13 +516,10 @@ struct ARROW_EXPORT Time32Type : public FixedWidthType {
   std::string ToString() const override;
 };
 
-struct ARROW_EXPORT Time64Type : public FixedWidthType {
+struct ARROW_EXPORT Time64Type : public TimeType {
   static constexpr Type::type type_id = Type::TIME64;
-  using Unit = TimeUnit;
   using c_type = int64_t;
 
-  TimeUnit unit;
-
   int bit_width() const override { return static_cast<int>(sizeof(c_type) * 8); }
 
   explicit Time64Type(TimeUnit unit = TimeUnit::MILLI);


Mime
View raw message