arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/2] arrow git commit: ARROW-459: [C++] Dictionary IPC support in file and stream formats
Date Fri, 24 Feb 2017 14:16:41 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 5e279f0a7 -> d28f1c1e0


http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
index 72eb134..7f5c993 100644
--- a/cpp/src/arrow/ipc/stream.cc
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -20,17 +20,20 @@
 #include <cstdint>
 #include <cstring>
 #include <sstream>
+#include <string>
 #include <vector>
 
 #include "arrow/buffer.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
 #include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
 #include "arrow/memory_pool.h"
 #include "arrow/schema.h"
 #include "arrow/status.h"
+#include "arrow/table.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
@@ -39,11 +42,10 @@ namespace ipc {
 // ----------------------------------------------------------------------
 // Stream writer implementation
 
-StreamWriter::~StreamWriter() {}
-
 StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>&
schema)
     : sink_(sink),
       schema_(schema),
+      dictionary_memo_(std::make_shared<DictionaryMemo>()),
       pool_(default_memory_pool()),
       position_(-1),
       started_(false) {}
@@ -107,7 +109,7 @@ Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>&
 
 Status StreamWriter::Start() {
   std::shared_ptr<Buffer> schema_fb;
-  RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb));
+  RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), &schema_fb));
 
   int32_t flatbuffer_size = schema_fb->size();
   RETURN_NOT_OK(
@@ -115,14 +117,41 @@ Status StreamWriter::Start() {
 
   // Write the flatbuffer
   RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
+
+  // If there are any dictionaries, write them as the next messages
+  RETURN_NOT_OK(WriteDictionaries());
+
   started_ = true;
   return Status::OK();
 }
 
 Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
-  // Pass FileBlock, but results not used
-  FileBlock dummy_block;
-  return WriteRecordBatch(batch, &dummy_block);
+  // Push an empty FileBlock. Can be written in the footer later
+  record_batches_.emplace_back(0, 0, 0);
+  return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]);
+}
+
+Status StreamWriter::WriteDictionaries() {
+  const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary();
+
+  dictionaries_.resize(id_to_dictionary.size());
+
+  // TODO(wesm): does sorting by id yield any benefit?
+  int dict_index = 0;
+  for (const auto& entry : id_to_dictionary) {
+    FileBlock* block = &dictionaries_[dict_index++];
+
+    block->offset = position_;
+
+    // Frame of reference in file format is 0, see ARROW-384
+    const int64_t buffer_start_offset = 0;
+    RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_,
+        &block->metadata_length, &block->body_length, pool_));
+    RETURN_NOT_OK(UpdatePosition());
+    DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes";
+  }
+
+  return Status::OK();
 }
 
 Status StreamWriter::Close() {
@@ -134,81 +163,147 @@ Status StreamWriter::Close() {
 // ----------------------------------------------------------------------
 // StreamReader implementation
 
-StreamReader::StreamReader(const std::shared_ptr<io::InputStream>& stream)
-    : stream_(stream), schema_(nullptr) {}
-
-StreamReader::~StreamReader() {}
-
-Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
-    std::shared_ptr<StreamReader>* reader) {
-  // Private ctor
-  *reader = std::shared_ptr<StreamReader>(new StreamReader(stream));
-  return (*reader)->ReadSchema();
+static inline std::string message_type_name(Message::Type type) {
+  switch (type) {
+    case Message::SCHEMA:
+      return "schema";
+    case Message::RECORD_BATCH:
+      return "record batch";
+    case Message::DICTIONARY_BATCH:
+      return "dictionary";
+    default:
+      break;
+  }
+  return "unknown";
 }
 
-Status StreamReader::ReadSchema() {
-  std::shared_ptr<Message> message;
-  RETURN_NOT_OK(ReadNextMessage(&message));
+class StreamReader::StreamReaderImpl {
+ public:
+  StreamReaderImpl() {}
+  ~StreamReaderImpl() {}
 
-  if (message->type() != Message::SCHEMA) {
-    return Status::IOError("First message was not schema type");
+  Status Open(const std::shared_ptr<io::InputStream>& stream) {
+    stream_ = stream;
+    return ReadSchema();
   }
 
-  SchemaMetadata schema_meta(message);
+  Status ReadNextMessage(Message::Type expected_type, std::shared_ptr<Message>* message)
{
+    std::shared_ptr<Buffer> buffer;
+    RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
 
-  // TODO(wesm): If the schema contains dictionaries, we must read all the
-  // dictionaries from the stream before constructing the final Schema
-  return schema_meta.GetSchema(&schema_);
-}
+    if (buffer->size() != sizeof(int32_t)) {
+      *message = nullptr;
+      return Status::OK();
+    }
+
+    int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+    RETURN_NOT_OK(stream_->Read(message_length, &buffer));
+    if (buffer->size() != message_length) {
+      return Status::IOError("Unexpected end of stream trying to read message");
+    }
 
-Status StreamReader::ReadNextMessage(std::shared_ptr<Message>* message) {
-  std::shared_ptr<Buffer> buffer;
-  RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
+    RETURN_NOT_OK(Message::Open(buffer, 0, message));
 
-  if (buffer->size() != sizeof(int32_t)) {
-    *message = nullptr;
+    if ((*message)->type() != expected_type) {
+      std::stringstream ss;
+      ss << "Message not expected type: " << message_type_name(expected_type)
+         << ", was: " << (*message)->type();
+      return Status::IOError(ss.str());
+    }
     return Status::OK();
   }
 
-  int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+  Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) {
+    RETURN_NOT_OK(stream_->Read(size, buffer));
 
-  RETURN_NOT_OK(stream_->Read(message_length, &buffer));
-  if (buffer->size() != message_length) {
-    return Status::IOError("Unexpected end of stream trying to read message");
+    if ((*buffer)->size() < size) {
+      return Status::IOError("Unexpected EOS when reading buffer");
+    }
+    return Status::OK();
   }
-  return Message::Open(buffer, 0, message);
-}
 
-std::shared_ptr<Schema> StreamReader::schema() const {
-  return schema_;
-}
+  Status ReadNextDictionary() {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message));
 
-Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
-  std::shared_ptr<Message> message;
-  RETURN_NOT_OK(ReadNextMessage(&message));
+    DictionaryBatchMetadata metadata(message);
 
-  if (message == nullptr) {
-    // End of stream
-    *batch = nullptr;
-    return Status::OK();
+    std::shared_ptr<Buffer> batch_body;
+    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body))
+    io::BufferReader reader(batch_body);
+
+    std::shared_ptr<Array> dictionary;
+    RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, &dictionary));
+    return dictionary_memo_.AddDictionary(metadata.id(), dictionary);
   }
 
-  if (message->type() != Message::RECORD_BATCH) {
-    return Status::IOError("Metadata not record batch");
+  Status ReadSchema() {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message));
+
+    SchemaMetadata schema_meta(message);
+    RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_));
+
+    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
+    // those found in the schema
+    int num_dictionaries = static_cast<int>(dictionary_types_.size());
+    for (int i = 0; i < num_dictionaries; ++i) {
+      RETURN_NOT_OK(ReadNextDictionary());
+    }
+
+    return schema_meta.GetSchema(dictionary_memo_, &schema_);
   }
 
-  auto batch_metadata = std::make_shared<RecordBatchMetadata>(message);
+  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message));
+
+    if (message == nullptr) {
+      // End of stream
+      *batch = nullptr;
+      return Status::OK();
+    }
 
-  std::shared_ptr<Buffer> batch_body;
-  RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body));
+    RecordBatchMetadata batch_metadata(message);
 
-  if (batch_body->size() < message->body_length()) {
-    return Status::IOError("Unexpected EOS when reading message body");
+    std::shared_ptr<Buffer> batch_body;
+    RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
+    io::BufferReader reader(batch_body);
+    return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
   }
 
-  io::BufferReader reader(batch_body);
+  std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+  // dictionary_id -> type
+  DictionaryTypeMap dictionary_types_;
+
+  DictionaryMemo dictionary_memo_;
+
+  std::shared_ptr<io::InputStream> stream_;
+  std::shared_ptr<Schema> schema_;
+};
+
+StreamReader::StreamReader() {
+  impl_.reset(new StreamReaderImpl());
+}
+
+StreamReader::~StreamReader() {}
+
+Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+    std::shared_ptr<StreamReader>* reader) {
+  // Private ctor
+  *reader = std::shared_ptr<StreamReader>(new StreamReader());
+  return (*reader)->impl_->Open(stream);
+}
+
+std::shared_ptr<Schema> StreamReader::schema() const {
+  return impl_->schema();
+}
 
-  return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
+Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+  return impl_->GetNextRecordBatch(batch);
 }
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
index 12414fa..1c3f65e 100644
--- a/cpp/src/arrow/ipc/stream.h
+++ b/cpp/src/arrow/ipc/stream.h
@@ -22,7 +22,9 @@
 
 #include <cstdint>
 #include <memory>
+#include <vector>
 
+#include "arrow/ipc/metadata.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -44,12 +46,19 @@ class OutputStream;
 
 namespace ipc {
 
-struct FileBlock;
-class Message;
+struct ARROW_EXPORT FileBlock {
+  FileBlock() {}
+  FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
+      : offset(offset), metadata_length(metadata_length), body_length(body_length) {}
+
+  int64_t offset;
+  int32_t metadata_length;
+  int64_t body_length;
+};
 
 class ARROW_EXPORT StreamWriter {
  public:
-  virtual ~StreamWriter();
+  virtual ~StreamWriter() = default;
 
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
       std::shared_ptr<StreamWriter>* out);
@@ -72,6 +81,8 @@ class ARROW_EXPORT StreamWriter {
   Status CheckStarted();
   Status UpdatePosition();
 
+  Status WriteDictionaries();
+
   Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block);
 
   // Adds padding bytes if necessary to ensure all memory blocks are written on
@@ -87,10 +98,17 @@ class ARROW_EXPORT StreamWriter {
   io::OutputStream* sink_;
   std::shared_ptr<Schema> schema_;
 
+  // When writing out the schema, we keep track of all the dictionaries we
+  // encounter, as they must be written out first in the stream
+  std::shared_ptr<DictionaryMemo> dictionary_memo_;
+
   MemoryPool* pool_;
 
   int64_t position_;
   bool started_;
+
+  std::vector<FileBlock> dictionaries_;
+  std::vector<FileBlock> record_batches_;
 };
 
 class ARROW_EXPORT StreamReader {
@@ -107,14 +125,10 @@ class ARROW_EXPORT StreamReader {
   Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
 
  private:
-  explicit StreamReader(const std::shared_ptr<io::InputStream>& stream);
-
-  Status ReadSchema();
+  StreamReader();
 
-  Status ReadNextMessage(std::shared_ptr<Message>* message);
-
-  std::shared_ptr<io::InputStream> stream_;
-  std::shared_ptr<Schema> schema_;
+  class ARROW_NO_EXPORT StreamReaderImpl;
+  std::unique_ptr<StreamReaderImpl> impl_;
 };
 
 }  // namespace ipc

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index b4930c4..07f786c 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -345,6 +345,86 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
   return Status::OK();
 }
 
+Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
+  const int32_t length = 6;
+
+  std::vector<bool> is_valid = {true, true, false, true, true, true};
+  std::shared_ptr<Array> dict1, dict2;
+
+  std::vector<std::string> dict1_values = {"foo", "bar", "baz"};
+  std::vector<std::string> dict2_values = {"foo", "bar", "baz", "qux"};
+
+  ArrayFromVector<StringType, std::string>(dict1_values, &dict1);
+  ArrayFromVector<StringType, std::string>(dict2_values, &dict2);
+
+  auto f0_type = arrow::dictionary(arrow::int32(), dict1);
+  auto f1_type = arrow::dictionary(arrow::int8(), dict1);
+  auto f2_type = arrow::dictionary(arrow::int32(), dict2);
+
+  std::shared_ptr<Array> indices0, indices1, indices2;
+  std::vector<int32_t> indices0_values = {1, 2, -1, 0, 2, 0};
+  std::vector<int8_t> indices1_values = {0, 0, 2, 2, 1, 1};
+  std::vector<int32_t> indices2_values = {3, 0, 2, 1, 0, 2};
+
+  ArrayFromVector<Int32Type, int32_t>(is_valid, indices0_values, &indices0);
+  ArrayFromVector<Int8Type, int8_t>(is_valid, indices1_values, &indices1);
+  ArrayFromVector<Int32Type, int32_t>(is_valid, indices2_values, &indices2);
+
+  auto a0 = std::make_shared<DictionaryArray>(f0_type, indices0);
+  auto a1 = std::make_shared<DictionaryArray>(f1_type, indices1);
+  auto a2 = std::make_shared<DictionaryArray>(f2_type, indices2);
+
+  // List of dictionary-encoded string
+  auto f3_type = list(f1_type);
+
+  std::vector<int32_t> list_offsets = {0, 0, 2, 2, 5, 6, 9};
+  std::shared_ptr<Array> offsets, indices3;
+  ArrayFromVector<Int32Type, int32_t>(
+      std::vector<bool>(list_offsets.size(), true), list_offsets, &offsets);
+
+  std::vector<int8_t> indices3_values = {0, 1, 2, 0, 1, 2, 0, 1, 2};
+  std::vector<bool> is_valid3(9, true);
+  ArrayFromVector<Int8Type, int8_t>(is_valid3, indices3_values, &indices3);
+
+  std::shared_ptr<Buffer> null_bitmap;
+  RETURN_NOT_OK(test::GetBitmapFromBoolVector(is_valid, &null_bitmap));
+
+  std::shared_ptr<Array> a3 = std::make_shared<ListArray>(f3_type, length,
+      std::static_pointer_cast<PrimitiveArray>(offsets)->data(),
+      std::make_shared<DictionaryArray>(f1_type, indices3), null_bitmap, 1);
+
+  // Dictionary-encoded list of integer
+  auto f4_value_type = list(int8());
+
+  std::shared_ptr<Array> offsets4, values4, indices4;
+
+  std::vector<int32_t> list_offsets4 = {0, 2, 2, 3};
+  ArrayFromVector<Int32Type, int32_t>(
+      std::vector<bool>(4, true), list_offsets4, &offsets4);
+
+  std::vector<int8_t> list_values4 = {0, 1, 2};
+  ArrayFromVector<Int8Type, int8_t>(std::vector<bool>(3, true), list_values4,
&values4);
+
+  auto dict3 = std::make_shared<ListArray>(f4_value_type, 3,
+      std::static_pointer_cast<PrimitiveArray>(offsets4)->data(), values4);
+
+  std::vector<int8_t> indices4_values = {0, 1, 2, 0, 1, 2};
+  ArrayFromVector<Int8Type, int8_t>(is_valid, indices4_values, &indices4);
+
+  auto f4_type = dictionary(int8(), dict3);
+  auto a4 = std::make_shared<DictionaryArray>(f4_type, indices4);
+
+  // construct batch
+  std::shared_ptr<Schema> schema(new Schema({field("dict1", f0_type),
+      field("sparse", f1_type), field("dense", f2_type),
+      field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)}));
+
+  std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4};
+
+  out->reset(new RecordBatch(schema, length, arrays));
+  return Status::OK();
+}
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index a1c2b79..b97b465 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -29,7 +29,7 @@ namespace arrow {
 bool Field::Equals(const Field& other) const {
   return (this == &other) ||
          (this->name == other.name && this->nullable == other.nullable &&
-             this->dictionary == dictionary && this->type->Equals(*other.type.get()));
+             this->type->Equals(*other.type.get()));
 }
 
 bool Field::Equals(const std::shared_ptr<Field>& other) const {
@@ -234,8 +234,8 @@ std::shared_ptr<DataType> dictionary(const std::shared_ptr<DataType>&
index_type
 }
 
 std::shared_ptr<Field> field(
-    const std::string& name, const TypePtr& type, bool nullable, int64_t dictionary)
{
-  return std::make_shared<Field>(name, type, nullable, dictionary);
+    const std::string& name, const TypePtr& type, bool nullable) {
+  return std::make_shared<Field>(name, type, nullable);
 }
 
 static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1);

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 927b8a4..b15aa27 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -114,6 +114,8 @@ class BufferDescr {
 
 class TypeVisitor {
  public:
+  virtual ~TypeVisitor() = default;
+
   virtual Status Visit(const NullType& type) = 0;
   virtual Status Visit(const BooleanType& type) = 0;
   virtual Status Visit(const Int8Type& type) = 0;
@@ -205,13 +207,9 @@ struct ARROW_EXPORT Field {
   // Fields can be nullable
   bool nullable;
 
-  // optional dictionary id if the field is dictionary encoded
-  // 0 means it's not dictionary encoded
-  int64_t dictionary;
-
   Field(const std::string& name, const std::shared_ptr<DataType>& type,
-      bool nullable = true, int64_t dictionary = 0)
-      : name(name), type(type), nullable(nullable), dictionary(dictionary) {}
+      bool nullable = true)
+      : name(name), type(type), nullable(nullable) {}
 
   bool operator==(const Field& other) const { return this->Equals(other); }
   bool operator!=(const Field& other) const { return !this->Equals(other); }
@@ -556,8 +554,8 @@ std::shared_ptr<DataType> ARROW_EXPORT union_(
 std::shared_ptr<DataType> ARROW_EXPORT dictionary(
     const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>&
values);
 
-std::shared_ptr<Field> ARROW_EXPORT field(const std::string& name,
-    const std::shared_ptr<DataType>& type, bool nullable = true, int64_t dictionary
= 0);
+std::shared_ptr<Field> ARROW_EXPORT field(
+    const std::string& name, const std::shared_ptr<DataType>& type, bool nullable
= true);
 
 // ----------------------------------------------------------------------
 //

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index 5ab9815..afc7dbd 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -63,7 +63,6 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
 
         shared_ptr[CSchema] schema()
 
-        int num_dictionaries()
         int num_record_batches()
 
         CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 89ce6e7..4acef21 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -995,11 +995,6 @@ cdef class _FileReader:
             else:
                 check_status(CFileReader.Open(reader, &self.reader))
 
-    property num_dictionaries:
-
-        def __get__(self):
-            return self.reader.get().num_dictionaries()
-
     property num_record_batches:
 
         def __get__(self):


Mime
View raw message