Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BFBF3200D0F for ; Fri, 25 Aug 2017 05:05:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE2FC16C0B4; Fri, 25 Aug 2017 03:05:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 47EB216C0B3 for ; Fri, 25 Aug 2017 05:05:13 +0200 (CEST) Received: (qmail 18199 invoked by uid 500); 25 Aug 2017 03:05:12 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 18146 invoked by uid 99); 25 Aug 2017 03:05:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Aug 2017 03:05:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07F22DFA20; Fri, 25 Aug 2017 03:05:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Date: Fri, 25 Aug 2017 03:05:11 -0000 Message-Id: <2e7d1b99a3774449b15e4f607bd0d6eb@git.apache.org> In-Reply-To: <55ca9ecac34e4872969512bf88ac6dd2@git.apache.org> References: <55ca9ecac34e4872969512bf88ac6dd2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] arrow git commit: ARROW-1408: [C++] IPC public API cleanup, refactoring. Add SerializeSchema, ReadSchema public APIs archived-at: Fri, 25 Aug 2017 03:05:15 -0000 http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc deleted file mode 100644 index a31b966..0000000 --- a/cpp/src/arrow/ipc/metadata.cc +++ /dev/null @@ -1,1201 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/metadata.h" - -#include -#include -#include -#include -#include -#include - -#include "flatbuffers/flatbuffers.h" - -#include "arrow/array.h" -#include "arrow/buffer.h" -#include "arrow/io/interfaces.h" -#include "arrow/ipc/File_generated.h" -#include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/Tensor_generated.h" -#include "arrow/ipc/util.h" -#include "arrow/status.h" -#include "arrow/tensor.h" -#include "arrow/type.h" -#include "arrow/util/logging.h" - -namespace arrow { - -namespace flatbuf = org::apache::arrow::flatbuf; - -namespace ipc { - -using FBB = flatbuffers::FlatBufferBuilder; -using DictionaryOffset = flatbuffers::Offset; -using FieldOffset = flatbuffers::Offset; -using KeyValueOffset = flatbuffers::Offset; -using RecordBatchOffset = flatbuffers::Offset; -using VectorLayoutOffset = flatbuffers::Offset; -using Offset = flatbuffers::Offset; -using FBString = flatbuffers::Offset; - -static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = - flatbuf::MetadataVersion_V3; - -static constexpr flatbuf::MetadataVersion kMinMetadataVersion = - flatbuf::MetadataVersion_V3; - -static Status IntFromFlatbuffer(const flatbuf::Int* int_data, - std::shared_ptr* out) { - if (int_data->bitWidth() > 64) { - return Status::NotImplemented("Integers with more than 64 bits not implemented"); - } - if (int_data->bitWidth() < 8) { - return Status::NotImplemented("Integers with less than 8 bits not implemented"); - } - - switch (int_data->bitWidth()) { - case 8: - *out = int_data->is_signed() ? int8() : uint8(); - break; - case 16: - *out = int_data->is_signed() ? int16() : uint16(); - break; - case 32: - *out = int_data->is_signed() ? int32() : uint32(); - break; - case 64: - *out = int_data->is_signed() ? int64() : uint64(); - break; - default: - return Status::NotImplemented("Integers not in cstdint are not implemented"); - } - return Status::OK(); -} - -static Status FloatFromFlatuffer(const flatbuf::FloatingPoint* float_data, - std::shared_ptr* out) { - if (float_data->precision() == flatbuf::Precision_HALF) { - *out = float16(); - } else if (float_data->precision() == flatbuf::Precision_SINGLE) { - *out = float32(); - } else { - *out = float64(); - } - return Status::OK(); -} - -// Forward declaration -static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset); - -static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { - return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); -} - -static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) { - return flatbuf::CreateFloatingPoint(fbb, precision).Union(); -} - -static Status AppendChildFields(FBB& fbb, const std::shared_ptr& type, - std::vector* out_children, - DictionaryMemo* dictionary_memo) { - FieldOffset field; - for (int i = 0; i < type->num_children(); ++i) { - RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field)); - out_children->push_back(field); - } - return Status::OK(); -} - -static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr& type, - std::vector* out_children, - DictionaryMemo* dictionary_memo, Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - *offset = flatbuf::CreateList(fbb).Union(); - return Status::OK(); -} - -static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr& type, - std::vector* out_children, - DictionaryMemo* dictionary_memo, Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - *offset = flatbuf::CreateStruct_(fbb).Union(); - return Status::OK(); -} - -// ---------------------------------------------------------------------- -// Union implementation - -static Status UnionFromFlatbuffer(const flatbuf::Union* union_data, - const std::vector>& children, - std::shared_ptr* out) { - UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE - : UnionMode::DENSE; - - std::vector type_codes; - - const flatbuffers::Vector* fb_type_ids = union_data->typeIds(); - if (fb_type_ids == nullptr) { - for (uint8_t i = 0; i < children.size(); ++i) { - type_codes.push_back(i); - } - } else { - for (int32_t id : (*fb_type_ids)) { - // TODO(wesm): can these values exceed 255? - type_codes.push_back(static_cast(id)); - } - } - - *out = union_(children, type_codes, mode); - return Status::OK(); -} - -static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr& type, - std::vector* out_children, - DictionaryMemo* dictionary_memo, Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - - const auto& union_type = static_cast(*type); - - flatbuf::UnionMode mode = union_type.mode() == UnionMode::SPARSE - ? flatbuf::UnionMode_Sparse - : flatbuf::UnionMode_Dense; - - std::vector type_ids; - type_ids.reserve(union_type.type_codes().size()); - for (uint8_t code : union_type.type_codes()) { - type_ids.push_back(code); - } - - auto fb_type_ids = fbb.CreateVector(type_ids); - - *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union(); - return Status::OK(); -} - -#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ - *out_type = flatbuf::Type_Int; \ - *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ - break; - -static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit::type unit) { - switch (unit) { - case TimeUnit::SECOND: - return flatbuf::TimeUnit_SECOND; - case TimeUnit::MILLI: - return flatbuf::TimeUnit_MILLISECOND; - case TimeUnit::MICRO: - return flatbuf::TimeUnit_MICROSECOND; - case TimeUnit::NANO: - return flatbuf::TimeUnit_NANOSECOND; - default: - break; - } - return flatbuf::TimeUnit_MIN; -} - -static inline TimeUnit::type FromFlatbufferUnit(flatbuf::TimeUnit unit) { - switch (unit) { - case flatbuf::TimeUnit_SECOND: - return TimeUnit::SECOND; - case flatbuf::TimeUnit_MILLISECOND: - return TimeUnit::MILLI; - case flatbuf::TimeUnit_MICROSECOND: - return TimeUnit::MICRO; - case flatbuf::TimeUnit_NANOSECOND: - return TimeUnit::NANO; - default: - break; - } - // cannot reach - return TimeUnit::SECOND; -} - -static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, - const std::vector>& children, - std::shared_ptr* out) { - switch (type) { - case flatbuf::Type_NONE: - return Status::Invalid("Type metadata cannot be none"); - case flatbuf::Type_Int: - return IntFromFlatbuffer(static_cast(type_data), out); - case flatbuf::Type_FloatingPoint: - return FloatFromFlatuffer(static_cast(type_data), - out); - case flatbuf::Type_Binary: - *out = binary(); - return Status::OK(); - case flatbuf::Type_FixedSizeBinary: { - auto fw_binary = static_cast(type_data); - *out = fixed_size_binary(fw_binary->byteWidth()); - return Status::OK(); - } - case flatbuf::Type_Utf8: - *out = utf8(); - return Status::OK(); - case flatbuf::Type_Bool: - *out = boolean(); - return Status::OK(); - case flatbuf::Type_Decimal: - return Status::NotImplemented("Decimal"); - case flatbuf::Type_Date: { - auto date_type = static_cast(type_data); - if (date_type->unit() == flatbuf::DateUnit_DAY) { - *out = date32(); - } else { - *out = date64(); - } - return Status::OK(); - } - case flatbuf::Type_Time: { - auto time_type = static_cast(type_data); - TimeUnit::type unit = FromFlatbufferUnit(time_type->unit()); - int32_t bit_width = time_type->bitWidth(); - switch (unit) { - case TimeUnit::SECOND: - case TimeUnit::MILLI: - if (bit_width != 32) { - return Status::Invalid("Time is 32 bits for second/milli unit"); - } - *out = time32(unit); - break; - default: - if (bit_width != 64) { - return Status::Invalid("Time is 64 bits for micro/nano unit"); - } - *out = time64(unit); - break; - } - return Status::OK(); - } - case flatbuf::Type_Timestamp: { - auto ts_type = static_cast(type_data); - TimeUnit::type unit = FromFlatbufferUnit(ts_type->unit()); - if (ts_type->timezone() != 0 && ts_type->timezone()->Length() > 0) { - *out = timestamp(unit, ts_type->timezone()->str()); - } else { - *out = timestamp(unit); - } - return Status::OK(); - } - case flatbuf::Type_Interval: - return Status::NotImplemented("Interval"); - case flatbuf::Type_List: - if (children.size() != 1) { - return Status::Invalid("List must have exactly 1 child field"); - } - *out = std::make_shared(children[0]); - return Status::OK(); - case flatbuf::Type_Struct_: - *out = std::make_shared(children); - return Status::OK(); - case flatbuf::Type_Union: - return UnionFromFlatbuffer(static_cast(type_data), children, - out); - default: - return Status::Invalid("Unrecognized type"); - } -} - -// TODO(wesm): Convert this to visitor pattern -static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, - std::vector* children, - std::vector* layout, - flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, - Offset* offset) { - if (type->id() == Type::DICTIONARY) { - // In this library, the dictionary "type" is a logical construct. Here we - // pass through to the value type, as we've already captured the index - // type in the DictionaryEncoding metadata in the parent field - const auto& dict_type = static_cast(*type); - return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout, - out_type, dictionary_memo, offset); - } - - std::vector buffer_layout = type->GetBufferLayout(); - for (const BufferDescr& descr : buffer_layout) { - flatbuf::VectorType vector_type; - switch (descr.type()) { - case BufferType::OFFSET: - vector_type = flatbuf::VectorType_OFFSET; - break; - case BufferType::DATA: - vector_type = flatbuf::VectorType_DATA; - break; - case BufferType::VALIDITY: - vector_type = flatbuf::VectorType_VALIDITY; - break; - case BufferType::TYPE: - vector_type = flatbuf::VectorType_TYPE; - break; - default: - vector_type = flatbuf::VectorType_DATA; - break; - } - auto offset = flatbuf::CreateVectorLayout( - fbb, static_cast(descr.bit_width()), vector_type); - layout->push_back(offset); - } - - switch (type->id()) { - case Type::BOOL: - *out_type = flatbuf::Type_Bool; - *offset = flatbuf::CreateBool(fbb).Union(); - break; - case Type::UINT8: - INT_TO_FB_CASE(8, false); - case Type::INT8: - INT_TO_FB_CASE(8, true); - case Type::UINT16: - INT_TO_FB_CASE(16, false); - case Type::INT16: - INT_TO_FB_CASE(16, true); - case Type::UINT32: - INT_TO_FB_CASE(32, false); - case Type::INT32: - INT_TO_FB_CASE(32, true); - case Type::UINT64: - INT_TO_FB_CASE(64, false); - case Type::INT64: - INT_TO_FB_CASE(64, true); - case Type::FLOAT: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE); - break; - case Type::DOUBLE: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE); - break; - case Type::FIXED_SIZE_BINARY: { - const auto& fw_type = static_cast(*type); - *out_type = flatbuf::Type_FixedSizeBinary; - *offset = flatbuf::CreateFixedSizeBinary(fbb, fw_type.byte_width()).Union(); - } break; - case Type::BINARY: - *out_type = flatbuf::Type_Binary; - *offset = flatbuf::CreateBinary(fbb).Union(); - break; - case Type::STRING: - *out_type = flatbuf::Type_Utf8; - *offset = flatbuf::CreateUtf8(fbb).Union(); - break; - case Type::DATE32: - *out_type = flatbuf::Type_Date; - *offset = flatbuf::CreateDate(fbb, flatbuf::DateUnit_DAY).Union(); - break; - case Type::DATE64: - *out_type = flatbuf::Type_Date; - *offset = flatbuf::CreateDate(fbb, flatbuf::DateUnit_MILLISECOND).Union(); - break; - case Type::TIME32: { - const auto& time_type = static_cast(*type); - *out_type = flatbuf::Type_Time; - *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit()), 32).Union(); - } break; - case Type::TIME64: { - const auto& time_type = static_cast(*type); - *out_type = flatbuf::Type_Time; - *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit()), 64).Union(); - } break; - case Type::TIMESTAMP: { - const auto& ts_type = static_cast(*type); - *out_type = flatbuf::Type_Timestamp; - - flatbuf::TimeUnit fb_unit = ToFlatbufferUnit(ts_type.unit()); - FBString fb_timezone = 0; - if (ts_type.timezone().size() > 0) { - fb_timezone = fbb.CreateString(ts_type.timezone()); - } - *offset = flatbuf::CreateTimestamp(fbb, fb_unit, fb_timezone).Union(); - } break; - case Type::LIST: - *out_type = flatbuf::Type_List; - return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset); - case Type::STRUCT: - *out_type = flatbuf::Type_Struct_; - return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset); - case Type::UNION: - *out_type = flatbuf::Type_Union; - return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset); - default: - *out_type = flatbuf::Type_NONE; // Make clang-tidy happy - std::stringstream ss; - ss << "Unable to convert type: " << type->ToString() << std::endl; - return Status::NotImplemented(ss.str()); - } - return Status::OK(); -} - -static Status TensorTypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, - flatbuf::Type* out_type, Offset* offset) { - switch (type->id()) { - case Type::UINT8: - INT_TO_FB_CASE(8, false); - case Type::INT8: - INT_TO_FB_CASE(8, true); - case Type::UINT16: - INT_TO_FB_CASE(16, false); - case Type::INT16: - INT_TO_FB_CASE(16, true); - case Type::UINT32: - INT_TO_FB_CASE(32, false); - case Type::INT32: - INT_TO_FB_CASE(32, true); - case Type::UINT64: - INT_TO_FB_CASE(64, false); - case Type::INT64: - INT_TO_FB_CASE(64, true); - case Type::HALF_FLOAT: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_HALF); - break; - case Type::FLOAT: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE); - break; - case Type::DOUBLE: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE); - break; - default: - *out_type = flatbuf::Type_NONE; // Make clang-tidy happy - std::stringstream ss; - ss << "Unable to convert type: " << type->ToString() << std::endl; - return Status::NotImplemented(ss.str()); - } - return Status::OK(); -} - -static DictionaryOffset GetDictionaryEncoding(FBB& fbb, const DictionaryType& type, - DictionaryMemo* memo) { - int64_t dictionary_id = memo->GetId(type.dictionary()); - - // We assume that the dictionary index type (as an integer) has already been - // validated elsewhere, and can safely assume we are dealing with signed - // integers - const auto& fw_index_type = static_cast(*type.index_type()); - - auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true); - - // TODO(wesm): ordered dictionaries - return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset, - type.ordered()); -} - -static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset) { - auto fb_name = fbb.CreateString(field->name()); - - flatbuf::Type type_enum; - Offset type_offset; - Offset type_layout; - std::vector children; - std::vector layout; - - RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type(), &children, &layout, &type_enum, - dictionary_memo, &type_offset)); - auto fb_children = fbb.CreateVector(children); - auto fb_layout = fbb.CreateVector(layout); - - DictionaryOffset dictionary = 0; - if (field->type()->id() == Type::DICTIONARY) { - dictionary = GetDictionaryEncoding( - fbb, static_cast(*field->type()), dictionary_memo); - } - - // TODO: produce the list of VectorTypes - *offset = flatbuf::CreateField(fbb, fb_name, field->nullable(), type_enum, type_offset, - dictionary, fb_children, fb_layout); - - return Status::OK(); -} - -static Status FieldFromFlatbuffer(const flatbuf::Field* field, - const DictionaryMemo& dictionary_memo, - std::shared_ptr* out) { - std::shared_ptr type; - - const flatbuf::DictionaryEncoding* encoding = field->dictionary(); - - if (encoding == nullptr) { - // The field is not dictionary encoded. We must potentially visit its - // children to fully reconstruct the data type - auto children = field->children(); - std::vector> child_fields(children->size()); - for (int i = 0; i < static_cast(children->size()); ++i) { - RETURN_NOT_OK( - FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i])); - } - RETURN_NOT_OK( - TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); - } else { - // The field is dictionary encoded. The type of the dictionary values has - // been determined elsewhere, and is stored in the DictionaryMemo. Here we - // construct the logical DictionaryType object - - std::shared_ptr dictionary; - RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary)); - - std::shared_ptr index_type; - RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type)); - type = ::arrow::dictionary(index_type, dictionary, encoding->isOrdered()); - } - *out = std::make_shared(field->name()->str(), type, field->nullable()); - return Status::OK(); -} - -static Status FieldFromFlatbufferDictionary(const flatbuf::Field* field, - std::shared_ptr* out) { - // Need an empty memo to pass down for constructing children - DictionaryMemo dummy_memo; - - // Any DictionaryEncoding set is ignored here - - std::shared_ptr type; - auto children = field->children(); - std::vector> child_fields(children->size()); - for (int i = 0; i < static_cast(children->size()); ++i) { - RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i])); - } - - RETURN_NOT_OK( - TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); - - *out = std::make_shared(field->name()->str(), type, field->nullable()); - return Status::OK(); -} - -// will return the endianness of the system we are running on -// based the NUMPY_API function. See NOTICE.txt -flatbuf::Endianness endianness() { - union { - uint32_t i; - char c[4]; - } bint = {0x01020304}; - - return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little; -} - -static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, - DictionaryMemo* dictionary_memo, - flatbuffers::Offset* out) { - /// Fields - std::vector field_offsets; - for (int i = 0; i < schema.num_fields(); ++i) { - std::shared_ptr field = schema.field(i); - FieldOffset offset; - RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset)); - field_offsets.push_back(offset); - } - - auto fb_offsets = fbb.CreateVector(field_offsets); - - /// Custom metadata - const KeyValueMetadata* metadata = schema.metadata().get(); - - if (metadata != nullptr) { - std::vector key_value_offsets; - size_t metadata_size = metadata->size(); - key_value_offsets.reserve(metadata_size); - for (size_t i = 0; i < metadata_size; ++i) { - const auto& key = metadata->key(i); - const auto& value = metadata->value(i); - key_value_offsets.push_back( - flatbuf::CreateKeyValue(fbb, fbb.CreateString(key), fbb.CreateString(value))); - } - *out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets, - fbb.CreateVector(key_value_offsets)); - } else { - *out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets); - } - - return Status::OK(); -} - -static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr* out) { - int32_t size = fbb.GetSize(); - - auto result = std::make_shared(); - RETURN_NOT_OK(result->Resize(size)); - - uint8_t* dst = result->mutable_data(); - memcpy(dst, fbb.GetBufferPointer(), size); - *out = result; - return Status::OK(); -} - -static Status WriteFBMessage(FBB& fbb, flatbuf::MessageHeader header_type, - flatbuffers::Offset header, int64_t body_length, - std::shared_ptr* out) { - auto message = flatbuf::CreateMessage(fbb, kCurrentMetadataVersion, header_type, header, - body_length); - fbb.Finish(message); - return WriteFlatbufferBuilder(fbb, out); -} - -Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, - std::shared_ptr* out) { - FBB fbb; - flatbuffers::Offset fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); - return WriteFBMessage(fbb, flatbuf::MessageHeader_Schema, fb_schema.Union(), 0, out); -} - -using FieldNodeVector = - flatbuffers::Offset>; -using BufferVector = flatbuffers::Offset>; - -static Status WriteFieldNodes(FBB& fbb, const std::vector& nodes, - FieldNodeVector* out) { - std::vector fb_nodes; - fb_nodes.reserve(nodes.size()); - - for (size_t i = 0; i < nodes.size(); ++i) { - const FieldMetadata& node = nodes[i]; - if (node.offset != 0) { - return Status::Invalid("Field metadata for IPC must have offset 0"); - } - fb_nodes.emplace_back(node.length, node.null_count); - } - *out = fbb.CreateVectorOfStructs(fb_nodes); - return Status::OK(); -} - -static Status WriteBuffers(FBB& fbb, const std::vector& buffers, - BufferVector* out) { - std::vector fb_buffers; - fb_buffers.reserve(buffers.size()); - - for (size_t i = 0; i < buffers.size(); ++i) { - const BufferMetadata& buffer = buffers[i]; - fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length); - } - *out = fbb.CreateVectorOfStructs(fb_buffers); - return Status::OK(); -} - -static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - RecordBatchOffset* offset) { - FieldNodeVector fb_nodes; - BufferVector fb_buffers; - - RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes)); - RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers)); - - *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers); - return Status::OK(); -} - -Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - std::shared_ptr* out) { - FBB fbb; - RecordBatchOffset record_batch; - RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); - return WriteFBMessage(fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), - body_length, out); -} - -Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, - std::shared_ptr* out) { - using TensorDimOffset = flatbuffers::Offset; - using TensorOffset = flatbuffers::Offset; - - FBB fbb; - - flatbuf::Type fb_type_type; - Offset fb_type; - RETURN_NOT_OK(TensorTypeToFlatbuffer(fbb, tensor.type(), &fb_type_type, &fb_type)); - - std::vector dims; - for (int i = 0; i < tensor.ndim(); ++i) { - FBString name = fbb.CreateString(tensor.dim_name(i)); - dims.push_back(flatbuf::CreateTensorDim(fbb, tensor.shape()[i], name)); - } - - auto fb_shape = fbb.CreateVector(dims); - auto fb_strides = fbb.CreateVector(tensor.strides()); - int64_t body_length = tensor.data()->size(); - flatbuf::Buffer buffer(-1, buffer_start_offset, body_length); - - TensorOffset fb_tensor = - flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); - - return WriteFBMessage(fbb, flatbuf::MessageHeader_Tensor, fb_tensor.Union(), - body_length, out); -} - -Status WriteDictionaryMessage(const int64_t id, const int64_t length, - const int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - std::shared_ptr* out) { - FBB fbb; - RecordBatchOffset record_batch; - RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); - auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union(); - return WriteFBMessage(fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, - body_length, out); -} - -static flatbuffers::Offset> -FileBlocksToFlatbuffer(FBB& fbb, const std::vector& blocks) { - std::vector fb_blocks; - - for (const FileBlock& block : blocks) { - fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); - } - - return fbb.CreateVectorOfStructs(fb_blocks); -} - -Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, - const std::vector& record_batches, - DictionaryMemo* dictionary_memo, io::OutputStream* out) { - FBB fbb; - - flatbuffers::Offset fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); - -#ifndef NDEBUG - for (size_t i = 0; i < dictionaries.size(); ++i) { - DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].offset)) << i; - DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].metadata_length)) << i; - DCHECK(BitUtil::IsMultipleOf8(dictionaries[i].body_length)) << i; - } - - for (size_t i = 0; i < record_batches.size(); ++i) { - DCHECK(BitUtil::IsMultipleOf8(record_batches[i].offset)) << i; - DCHECK(BitUtil::IsMultipleOf8(record_batches[i].metadata_length)) << i; - DCHECK(BitUtil::IsMultipleOf8(record_batches[i].body_length)) << i; - } -#endif - - auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); - auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - - auto footer = flatbuf::CreateFooter(fbb, kCurrentMetadataVersion, fb_schema, - fb_dictionaries, fb_record_batches); - - fbb.Finish(footer); - - int32_t size = fbb.GetSize(); - - return out->Write(fbb.GetBufferPointer(), size); -} - -// ---------------------------------------------------------------------- -// Memoization data structure for handling shared dictionaries - -DictionaryMemo::DictionaryMemo() {} - -// Returns KeyError if dictionary not found -Status DictionaryMemo::GetDictionary(int64_t id, - std::shared_ptr* dictionary) const { - auto it = id_to_dictionary_.find(id); - if (it == id_to_dictionary_.end()) { - std::stringstream ss; - ss << "Dictionary with id " << id << " not found"; - return Status::KeyError(ss.str()); - } - *dictionary = it->second; - return Status::OK(); -} - -int64_t DictionaryMemo::GetId(const std::shared_ptr& dictionary) { - intptr_t address = reinterpret_cast(dictionary.get()); - auto it = dictionary_to_id_.find(address); - if (it != dictionary_to_id_.end()) { - // Dictionary already observed, return the id - return it->second; - } else { - int64_t new_id = static_cast(dictionary_to_id_.size()); - dictionary_to_id_[address] = new_id; - id_to_dictionary_[new_id] = dictionary; - return new_id; - } -} - -bool DictionaryMemo::HasDictionary(const std::shared_ptr& dictionary) const { - intptr_t address = reinterpret_cast(dictionary.get()); - auto it = dictionary_to_id_.find(address); - return it != dictionary_to_id_.end(); -} - -bool DictionaryMemo::HasDictionaryId(int64_t id) const { - auto it = id_to_dictionary_.find(id); - return it != id_to_dictionary_.end(); -} - -Status DictionaryMemo::AddDictionary(int64_t id, - const std::shared_ptr& dictionary) { - if (HasDictionaryId(id)) { - std::stringstream ss; - ss << "Dictionary with id " << id << " already exists"; - return Status::KeyError(ss.str()); - } - intptr_t address = reinterpret_cast(dictionary.get()); - id_to_dictionary_[id] = dictionary; - dictionary_to_id_[address] = id; - return Status::OK(); -} - -//---------------------------------------------------------------------- -// Message reader - -class Message::MessageImpl { - public: - explicit MessageImpl(const std::shared_ptr& metadata, - const std::shared_ptr& body) - : metadata_(metadata), message_(nullptr), body_(body) {} - - Status Open() { - message_ = flatbuf::GetMessage(metadata_->data()); - - // Check that the metadata version is supported - if (message_->version() < kMinMetadataVersion) { - return Status::Invalid("Old metadata version not supported"); - } - - return Status::OK(); - } - - Message::Type type() const { - switch (message_->header_type()) { - case flatbuf::MessageHeader_Schema: - return Message::SCHEMA; - case flatbuf::MessageHeader_DictionaryBatch: - return Message::DICTIONARY_BATCH; - case flatbuf::MessageHeader_RecordBatch: - return Message::RECORD_BATCH; - case flatbuf::MessageHeader_Tensor: - return Message::TENSOR; - default: - return Message::NONE; - } - } - - MetadataVersion version() const { - switch (message_->version()) { - case flatbuf::MetadataVersion_V1: - // Arrow 0.1 - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - // Arrow 0.2 - return MetadataVersion::V2; - case flatbuf::MetadataVersion_V3: - // Arrow >= 0.3 - return MetadataVersion::V3; - // Add cases as other versions become available - default: - return MetadataVersion::V3; - } - } - - const void* header() const { return message_->header(); } - - std::shared_ptr body() const { return body_; } - - std::shared_ptr metadata() const { return metadata_; } - - private: - // The Flatbuffer metadata - std::shared_ptr metadata_; - const flatbuf::Message* message_; - - // The message body, if any - std::shared_ptr body_; -}; - -Message::Message(const std::shared_ptr& metadata, - const std::shared_ptr& body) { - impl_.reset(new MessageImpl(metadata, body)); -} - -Status Message::Open(const std::shared_ptr& metadata, - const std::shared_ptr& body, std::unique_ptr* out) { - out->reset(new Message(metadata, body)); - return (*out)->impl_->Open(); -} - -Message::~Message() {} - -std::shared_ptr Message::body() const { return impl_->body(); } - -std::shared_ptr Message::metadata() const { return impl_->metadata(); } - -Message::Type Message::type() const { return impl_->type(); } - -MetadataVersion Message::metadata_version() const { return impl_->version(); } - -const void* Message::header() const { return impl_->header(); } - -bool Message::Equals(const Message& other) const { - int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size()); - - if (!metadata()->Equals(*other.metadata(), metadata_bytes)) { - return false; - } - - // Compare bodies, if they have them - auto this_body = body(); - auto other_body = other.body(); - - const bool this_has_body = (this_body != nullptr) && (this_body->size() > 0); - const bool other_has_body = (other_body != nullptr) && (other_body->size() > 0); - - if (this_has_body && other_has_body) { - return this_body->Equals(*other_body); - } else if (this_has_body ^ other_has_body) { - // One has a body but not the other - return false; - } else { - // Neither has a body - return true; - } -} - -Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const { - int32_t metadata_length = 0; - RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length)); - - *output_length = metadata_length; - - auto body_buffer = body(); - if (body_buffer) { - RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size())); - *output_length += body_buffer->size(); - } - - return Status::OK(); -} - -std::string FormatMessageType(Message::Type type) { - switch (type) { - case Message::SCHEMA: - return "schema"; - case Message::RECORD_BATCH: - return "record batch"; - case Message::DICTIONARY_BATCH: - return "dictionary"; - default: - break; - } - return "unknown"; -} - -// ---------------------------------------------------------------------- - -static Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) { - const flatbuf::DictionaryEncoding* dict_metadata = field->dictionary(); - if (dict_metadata == nullptr) { - // Field is not dictionary encoded. Visit children - auto children = field->children(); - for (flatbuffers::uoffset_t i = 0; i < children->size(); ++i) { - RETURN_NOT_OK(VisitField(children->Get(i), id_to_field)); - } - } else { - // Field is dictionary encoded. Construct the data type for the - // dictionary (no descendents can be dictionary encoded) - std::shared_ptr dictionary_field; - RETURN_NOT_OK(FieldFromFlatbufferDictionary(field, &dictionary_field)); - (*id_to_field)[dict_metadata->id()] = dictionary_field; - } - return Status::OK(); -} - -Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_field) { - auto schema = static_cast(opaque_schema); - int num_fields = static_cast(schema->fields()->size()); - for (int i = 0; i < num_fields; ++i) { - RETURN_NOT_OK(VisitField(schema->fields()->Get(i), id_to_field)); - } - return Status::OK(); -} - -Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo, - std::shared_ptr* out) { - auto schema = static_cast(opaque_schema); - int num_fields = static_cast(schema->fields()->size()); - - std::vector> fields(num_fields); - for (int i = 0; i < num_fields; ++i) { - const flatbuf::Field* field = schema->fields()->Get(i); - RETURN_NOT_OK(FieldFromFlatbuffer(field, dictionary_memo, &fields[i])); - } - - auto metadata = std::make_shared(); - auto fb_metadata = schema->custom_metadata(); - if (fb_metadata != nullptr) { - metadata->reserve(fb_metadata->size()); - for (const auto& pair : *fb_metadata) { - metadata->Append(pair->key()->str(), pair->value()->str()); - } - } - - *out = ::arrow::schema(std::move(fields), metadata); - return Status::OK(); -} - -Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr* type, - std::vector* shape, std::vector* strides, - std::vector* dim_names) { - auto message = flatbuf::GetMessage(metadata.data()); - auto tensor = reinterpret_cast(message->header()); - - int ndim = static_cast(tensor->shape()->size()); - - for (int i = 0; i < ndim; ++i) { - auto dim = tensor->shape()->Get(i); - - shape->push_back(dim->size()); - auto fb_name = dim->name(); - if (fb_name == 0) { - dim_names->push_back(""); - } else { - dim_names->push_back(fb_name->str()); - } - } - - if (tensor->strides()->size() > 0) { - for (int i = 0; i < ndim; ++i) { - strides->push_back(tensor->strides()->Get(i)); - } - } - - return TypeFromFlatbuffer(tensor->type_type(), tensor->type(), {}, type); -} - -// ---------------------------------------------------------------------- -// Read and write messages - -static Status ReadFullMessage(const std::shared_ptr& metadata, - io::InputStream* stream, - std::unique_ptr* message) { - auto fb_message = flatbuf::GetMessage(metadata->data()); - - int64_t body_length = fb_message->bodyLength(); - - std::shared_ptr body; - RETURN_NOT_OK(stream->Read(body_length, &body)); - - if (body->size() < body_length) { - std::stringstream ss; - ss << "Expected to be able to read " << body_length << " bytes for message body, got " - << body->size(); - return Status::IOError(ss.str()); - } - - return Message::Open(metadata, body, message); -} - -Status ReadMessage(const int64_t offset, const int32_t metadata_length, - io::RandomAccessFile* file, std::unique_ptr* message) { - std::shared_ptr buffer; - RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); - - int32_t flatbuffer_size = *reinterpret_cast(buffer->data()); - - if (flatbuffer_size + static_cast(sizeof(int32_t)) > metadata_length) { - std::stringstream ss; - ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset - << ", metadata length: " << metadata_length; - return Status::Invalid(ss.str()); - } - - auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4); - return ReadFullMessage(metadata, file, message); -} - -Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { - std::shared_ptr buffer; - - RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); - if (buffer->size() != sizeof(int32_t)) { - *message = nullptr; - return Status::OK(); - } - - int32_t message_length = *reinterpret_cast(buffer->data()); - - if (message_length == 0) { - // Optional 0 EOS control message - *message = nullptr; - return Status::OK(); - } - - RETURN_NOT_OK(file->Read(message_length, &buffer)); - if (buffer->size() != message_length) { - return Status::IOError("Unexpected end of stream trying to read message"); - } - - return ReadFullMessage(buffer, file, message); -} - -// ---------------------------------------------------------------------- -// Implement InputStream message reader - -Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr* message) { - return ReadMessage(stream_.get(), message); -} - -InputStreamMessageReader::~InputStreamMessageReader() {} - -// ---------------------------------------------------------------------- -// Implement message writing - -Status WriteMessage(const Buffer& message, io::OutputStream* file, - int32_t* message_length) { - // Need to write 4 bytes (message size), the message, plus padding to - // end on an 8-byte offset - int64_t start_offset; - RETURN_NOT_OK(file->Tell(&start_offset)); - - int32_t padded_message_length = static_cast(message.size()) + 4; - const int32_t remainder = - (padded_message_length + static_cast(start_offset)) % 8; - if (remainder != 0) { - padded_message_length += 8 - remainder; - } - - // The returned message size includes the length prefix, the flatbuffer, - // plus padding - *message_length = padded_message_length; - - // Write the flatbuffer size prefix including padding - int32_t flatbuffer_size = padded_message_length - 4; - RETURN_NOT_OK( - file->Write(reinterpret_cast(&flatbuffer_size), sizeof(int32_t))); - - // Write the flatbuffer - RETURN_NOT_OK(file->Write(message.data(), message.size())); - - // Write any padding - int32_t padding = padded_message_length - static_cast(message.size()) - 4; - if (padding > 0) { - RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); - } - - return Status::OK(); -} - -} // namespace ipc -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h deleted file mode 100644 index 81716ae..0000000 --- a/cpp/src/arrow/ipc/metadata.h +++ /dev/null @@ -1,300 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// C++ object model and user API for interprocess schema messaging - -#ifndef ARROW_IPC_METADATA_H -#define ARROW_IPC_METADATA_H - -#include -#include -#include -#include -#include - -#include "arrow/util/macros.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Array; -class Buffer; -class DataType; -class Field; -class Schema; -class Status; -class Tensor; - -namespace io { - -class InputStream; -class OutputStream; -class RandomAccessFile; - -} // namespace io - -namespace ipc { - -enum class MetadataVersion : char { V1, V2, V3 }; - -static constexpr const char* kArrowMagicBytes = "ARROW1"; - -// ARROW-109: We set this number arbitrarily to help catch user mistakes. For -// deeply nested schemas, it is expected the user will indicate explicitly the -// maximum allowed recursion depth -constexpr int kMaxNestingDepth = 64; - -struct ARROW_EXPORT FieldMetadata { - int64_t length; - int64_t null_count; - int64_t offset; -}; - -struct ARROW_EXPORT BufferMetadata { - /// The shared memory page id where to find this. Set to -1 if unused - int32_t page; - - /// The relative offset into the memory page to the starting byte of the buffer - int64_t offset; - - /// Absolute length in bytes of the buffer - int64_t length; -}; - -struct FileBlock { - int64_t offset; - int32_t metadata_length; - int64_t body_length; -}; - -//---------------------------------------------------------------------- - -using DictionaryMap = std::unordered_map>; -using DictionaryTypeMap = std::unordered_map>; - -// Memoization data structure for handling shared dictionaries -class ARROW_EXPORT DictionaryMemo { - public: - DictionaryMemo(); - - // Returns KeyError if dictionary not found - Status GetDictionary(int64_t id, std::shared_ptr* dictionary) const; - - /// Return id for dictionary, computing new id if necessary - int64_t GetId(const std::shared_ptr& dictionary); - - bool HasDictionary(const std::shared_ptr& dictionary) const; - bool HasDictionaryId(int64_t id) const; - - // Add a dictionary to the memo with a particular id. Returns KeyError if - // that dictionary already exists - Status AddDictionary(int64_t id, const std::shared_ptr& dictionary); - - const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; } - - int size() const { return static_cast(id_to_dictionary_.size()); } - - private: - // Dictionary memory addresses, to track whether a dictionary has been seen - // before - std::unordered_map dictionary_to_id_; - - // Map of dictionary id to dictionary array - DictionaryMap id_to_dictionary_; - - DISALLOW_COPY_AND_ASSIGN(DictionaryMemo); -}; - -// Read interface classes. We do not fully deserialize the flatbuffers so that -// individual fields metadata can be retrieved from very large schema without -// - -class Message; - -// Retrieve a list of all the dictionary ids and types required by the schema for -// reconstruction. The presumption is that these will be loaded either from -// the stream or file (or they may already be somewhere else in memory) -Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_field); - -// Construct a complete Schema from the message. May be expensive for very -// large schemas if you are only interested in a few fields -Status ARROW_EXPORT GetSchema(const void* opaque_schema, - const DictionaryMemo& dictionary_memo, - std::shared_ptr* out); - -Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata, - std::shared_ptr* type, - std::vector* shape, - std::vector* strides, - std::vector* dim_names); - -/// \brief An IPC message including metadata and body -class ARROW_EXPORT Message { - public: - enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR }; - - /// \brief Construct message, but do not validate - /// - /// Use at your own risk; Message::Open has more metadata validation - Message(const std::shared_ptr& metadata, const std::shared_ptr& body); - - ~Message(); - - /// \brief Create and validate a Message instance from two buffers - /// - /// \param[in] metadata a buffer containing the Flatbuffer metadata - /// \param[in] body a buffer containing the message body, which may be nullptr - /// \param[out] out the created message - static Status Open(const std::shared_ptr& metadata, - const std::shared_ptr& body, std::unique_ptr* out); - - /// \brief Write length-prefixed metadata and body to output stream - /// - /// \param[in] file output stream to write to - /// \param[out] output_length the number of bytes written - /// \return Status - bool Equals(const Message& other) const; - - /// \brief the Message metadata - /// - /// \return buffer - std::shared_ptr metadata() const; - - /// \brief the Message body, if any - /// - /// \return buffer is nullptr if no body - std::shared_ptr body() const; - - Type type() const; - - MetadataVersion metadata_version() const; - - const void* header() const; - - /// \brief Write length-prefixed metadata and body to output stream - /// - /// \param[in] file output stream to write to - /// \param[out] output_length the number of bytes written - /// \return Status - Status SerializeTo(io::OutputStream* file, int64_t* output_length) const; - - private: - // Hide serialization details from user API - class MessageImpl; - std::unique_ptr impl_; - - DISALLOW_COPY_AND_ASSIGN(Message); -}; - -ARROW_EXPORT std::string FormatMessageType(Message::Type type); - -/// \brief Abstract interface for a sequence of messages -/// \since 0.5.0 -class ARROW_EXPORT MessageReader { - public: - virtual ~MessageReader() = default; - - /// \brief Read next Message from the interface - /// - /// \param[out] message an arrow::ipc::Message instance - /// \return Status - virtual Status ReadNextMessage(std::unique_ptr* message) = 0; -}; - -/// \brief Implementation of MessageReader that reads from InputStream -/// \since 0.5.0 -class ARROW_EXPORT InputStreamMessageReader : public MessageReader { - public: - explicit InputStreamMessageReader(const std::shared_ptr& stream) - : stream_(stream) {} - - ~InputStreamMessageReader(); - - Status ReadNextMessage(std::unique_ptr* message) override; - - private: - std::shared_ptr stream_; -}; - -/// \brief Read encapulated RPC message from position in file -/// -/// Read a length-prefixed message flatbuffer starting at the indicated file -/// offset. If the message has a body with non-zero length, it will also be -/// read -/// -/// The metadata_length includes at least the length prefix and the flatbuffer -/// -/// \param[in] offset the position in the file where the message starts. The -/// first 4 bytes after the offset are the message length -/// \param[in] metadata_length the total number of bytes to read from file -/// \param[in] file the seekable file interface to read from -/// \param[out] message the message read -/// \return Status success or failure -ARROW_EXPORT -Status ReadMessage(const int64_t offset, const int32_t metadata_length, - io::RandomAccessFile* file, std::unique_ptr* message); - -/// \brief Read encapulated RPC message (metadata and body) from InputStream -/// -/// Read length-prefixed message with as-yet unknown length. Returns nullptr if -/// there are not enough bytes available or the message length is 0 (e.g. EOS -/// in a stream) -Status ARROW_EXPORT ReadMessage(io::InputStream* stream, - std::unique_ptr* message); - -/// Write a serialized message metadata with a length-prefix and padding to an -/// 8-byte offset -/// -/// -Status ARROW_EXPORT WriteMessage(const Buffer& message, io::OutputStream* file, - int32_t* message_length); - -// Serialize arrow::Schema as a Flatbuffer -// -// \param[in] schema a Schema instance -// \param[inout] dictionary_memo class for tracking dictionaries and assigning -// dictionary ids -// \param[out] out the serialized arrow::Buffer -// \return Status outcome -Status ARROW_EXPORT WriteSchemaMessage(const Schema& schema, - DictionaryMemo* dictionary_memo, - std::shared_ptr* out); - -ARROW_EXPORT -Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - std::shared_ptr* out); - -ARROW_EXPORT -Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, - std::shared_ptr* out); - -Status WriteDictionaryMessage(const int64_t id, const int64_t length, - const int64_t body_length, - const std::vector& nodes, - const std::vector& buffers, - std::shared_ptr* out); - -Status WriteFileFooter(const Schema& schema, const std::vector& dictionaries, - const std::vector& record_batches, - DictionaryMemo* dictionary_memo, io::OutputStream* out); - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_METADATA_H http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 6ea907e..7d7acad 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -28,7 +28,8 @@ #include "arrow/io/memory.h" #include "arrow/ipc/File_generated.h" #include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/metadata.h" +#include "arrow/ipc/message.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/table.h" @@ -460,6 +461,28 @@ RecordBatchStreamReader::RecordBatchStreamReader() { RecordBatchStreamReader::~RecordBatchStreamReader() {} Status RecordBatchStreamReader::Open(std::unique_ptr message_reader, + std::shared_ptr* reader) { + // Private ctor + auto result = std::shared_ptr(new RecordBatchStreamReader()); + RETURN_NOT_OK(result->impl_->Open(std::move(message_reader))); + *reader = result; + return Status::OK(); +} + +Status RecordBatchStreamReader::Open(io::InputStream* stream, + std::shared_ptr* out) { + std::unique_ptr message_reader(new InputStreamMessageReader(stream)); + return Open(std::move(message_reader), out); +} + +Status RecordBatchStreamReader::Open(const std::shared_ptr& stream, + std::shared_ptr* out) { + std::unique_ptr message_reader(new InputStreamMessageReader(stream)); + return Open(std::move(message_reader), out); +} + +#ifndef ARROW_NO_DEPRECATED_API +Status RecordBatchStreamReader::Open(std::unique_ptr message_reader, std::shared_ptr* reader) { // Private ctor *reader = std::shared_ptr(new RecordBatchStreamReader()); @@ -471,6 +494,7 @@ Status RecordBatchStreamReader::Open(const std::shared_ptr& str std::unique_ptr message_reader(new InputStreamMessageReader(stream)); return Open(std::move(message_reader), out); } +#endif std::shared_ptr RecordBatchStreamReader::schema() const { return impl_->schema(); @@ -559,8 +583,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { DCHECK(BitUtil::IsMultipleOf8(block.body_length)); std::unique_ptr message; - RETURN_NOT_OK( - ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message)); io::BufferReader reader(message->body()); return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch); @@ -578,8 +601,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { DCHECK(BitUtil::IsMultipleOf8(block.body_length)); std::unique_ptr message; - RETURN_NOT_OK( - ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); + RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message)); io::BufferReader reader(message->body()); @@ -595,6 +617,11 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { } Status Open(const std::shared_ptr& file, int64_t footer_offset) { + owned_file_ = file; + return Open(file.get(), footer_offset); + } + + Status Open(io::RandomAccessFile* file, int64_t footer_offset) { file_ = file; footer_offset_ = footer_offset; RETURN_NOT_OK(ReadFooter()); @@ -604,7 +631,10 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { std::shared_ptr schema() const { return schema_; } private: - std::shared_ptr file_; + io::RandomAccessFile* file_; + + // Deprecated as of 0.7.0 + std::shared_ptr owned_file_; // The location where the Arrow file layout ends. May be the end of the file // or some other location if embedded in a larger file. @@ -627,6 +657,19 @@ RecordBatchFileReader::RecordBatchFileReader() { RecordBatchFileReader::~RecordBatchFileReader() {} +Status RecordBatchFileReader::Open(io::RandomAccessFile* file, + std::shared_ptr* reader) { + int64_t footer_offset; + RETURN_NOT_OK(file->GetSize(&footer_offset)); + return Open(file, footer_offset, reader); +} + +Status RecordBatchFileReader::Open(io::RandomAccessFile* file, int64_t footer_offset, + std::shared_ptr* reader) { + *reader = std::shared_ptr(new RecordBatchFileReader()); + return (*reader)->impl_->Open(file, footer_offset); +} + Status RecordBatchFileReader::Open(const std::shared_ptr& file, std::shared_ptr* reader) { int64_t footer_offset; @@ -654,33 +697,46 @@ Status RecordBatchFileReader::ReadRecordBatch(int i, return impl_->ReadRecordBatch(i, batch); } -static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file, +static Status ReadContiguousPayload(io::InputStream* file, std::unique_ptr* message) { - std::shared_ptr buffer; - RETURN_NOT_OK(file->Seek(offset)); RETURN_NOT_OK(ReadMessage(file, message)); - if (*message == nullptr) { return Status::Invalid("Unable to read metadata at offset"); } return Status::OK(); } -Status ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, - io::RandomAccessFile* file, std::shared_ptr* out) { +Status ReadSchema(io::InputStream* stream, std::shared_ptr* out) { + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(stream, &reader)); + *out = reader->schema(); + return Status::OK(); +} + +Status ReadRecordBatch(const std::shared_ptr& schema, io::InputStream* file, + std::shared_ptr* out) { std::unique_ptr message; - RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message)); + RETURN_NOT_OK(ReadContiguousPayload(file, &message)); io::BufferReader buffer_reader(message->body()); return ReadRecordBatch(*message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out); } +// Deprecated +Status ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, + io::RandomAccessFile* file, std::shared_ptr* out) { + RETURN_NOT_OK(file->Seek(offset)); + return ReadRecordBatch(schema, file, out); +} + Status ReadTensor(int64_t offset, io::RandomAccessFile* file, std::shared_ptr* out) { // Respect alignment of Tensor messages (see WriteTensor) offset = PaddedLength(offset); + RETURN_NOT_OK(file->Seek(offset)); + std::unique_ptr message; - RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message)); + RETURN_NOT_OK(ReadContiguousPayload(file, &message)); std::shared_ptr type; std::vector shape; http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 4cffb18..f822a32 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// Implement Arrow file layout for IPC/RPC purposes and short-lived storage +// Read Arrow files and streams #ifndef ARROW_IPC_READER_H #define ARROW_IPC_READER_H @@ -24,7 +24,7 @@ #include #include -#include "arrow/ipc/metadata.h" +#include "arrow/ipc/message.h" #include "arrow/util/visibility.h" namespace arrow { @@ -69,18 +69,32 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// Create batch reader from generic MessageReader /// /// \param(in) message_reader a MessageReader implementation - /// \param(out) out the created RecordBatchStreamReader object + /// \param(out) out the created RecordBatchReader object /// \return Status static Status Open(std::unique_ptr message_reader, + std::shared_ptr* out); + +#ifndef ARROW_NO_DEPRECATED_API + /// \deprecated Since 0.7.0 + static Status Open(std::unique_ptr message_reader, + std::shared_ptr* out); + + /// \deprecated Since 0.7.0 + static Status Open(const std::shared_ptr& stream, std::shared_ptr* out); +#endif - /// \Create Record batch stream reader from InputStream + /// \brief Record batch stream reader from InputStream /// - /// \param(in) stream an input stream instance + /// \param(in) stream an input stream instance. Must stay alive throughout + /// lifetime of stream reader /// \param(out) out the created RecordBatchStreamReader object /// \return Status + static Status Open(io::InputStream* stream, std::shared_ptr* out); + + /// \brief Version of Open that retains ownership of stream static Status Open(const std::shared_ptr& stream, - std::shared_ptr* out); + std::shared_ptr* out); std::shared_ptr schema() const override; Status ReadNextRecordBatch(std::shared_ptr* batch) override; @@ -97,21 +111,31 @@ class ARROW_EXPORT RecordBatchFileReader { public: ~RecordBatchFileReader(); + /// \brief Open a RecordBatchFileReader // Open a file-like object that is assumed to be self-contained; i.e., the // end of the file interface is the end of the Arrow file. Note that there // can be any amount of data preceding the Arrow-formatted data, because we // need only locate the end of the Arrow file stream to discover the metadata // and then proceed to read the data into memory. + static Status Open(io::RandomAccessFile* file, + std::shared_ptr* reader); + + /// \brief Open a RecordBatchFileReader + /// If the file is embedded within some larger file or memory region, you can + /// pass the absolute memory offset to the end of the file (which contains the + /// metadata footer). The metadata must have been written with memory offsets + /// relative to the start of the containing file + /// + /// @param file: the data source + /// @param footer_offset: the position of the end of the Arrow "file" + static Status Open(io::RandomAccessFile* file, int64_t footer_offset, + std::shared_ptr* reader); + + /// \brief Version of Open that retains ownership of file static Status Open(const std::shared_ptr& file, std::shared_ptr* reader); - // If the file is embedded within some larger file or memory region, you can - // pass the absolute memory offset to the end of the file (which contains the - // metadata footer). The metadata must have been written with memory offsets - // relative to the start of the containing file - // - // @param file: the data source - // @param footer_offset: the position of the end of the Arrow "file" + /// \brief Version of Open that retains ownership of file static Status Open(const std::shared_ptr& file, int64_t footer_offset, std::shared_ptr* reader); @@ -142,6 +166,27 @@ class ARROW_EXPORT RecordBatchFileReader { // Generic read functions; does not copy data if the input supports zero copy reads +/// \brief Read Schema from stream serialized as a sequence of IPC messages +/// +/// \param[in] stream an InputStream +/// \param[out] out the output Schema +/// +/// If record batches follow the schema, it is better to use +/// RecordBatchStreamReader +ARROW_EXPORT +Status ReadSchema(io::InputStream* stream, std::shared_ptr* out); + +/// Read record batch as encapsulated IPC message with metadata size prefix and +/// header +/// +/// \param(in) schema the record batch schema +/// \param(in) offset the file location of the start of the message +/// \param(in) file the file where the batch is located +/// \param(out) out the read record batch +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema, io::InputStream* stream, + std::shared_ptr* out); + /// \brief Read record batch from file given metadata and schema /// /// \param(in) metadata a Message containing the record batch metadata @@ -174,17 +219,6 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr& sc int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr* out); -/// Read record batch as encapsulated IPC message with metadata size prefix and -/// header -/// -/// \param(in) schema the record batch schema -/// \param(in) offset the file location of the start of the message -/// \param(in) file the file where the batch is located -/// \param(out) out the read record batch -ARROW_EXPORT -Status ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, - io::RandomAccessFile* file, std::shared_ptr* out); - /// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file /// /// \param(in) offset the file location of the start of the message @@ -194,6 +228,15 @@ ARROW_EXPORT Status ReadTensor(int64_t offset, io::RandomAccessFile* file, std::shared_ptr* out); +#ifndef ARROW_NO_DEPRECATED_API +/// \deprecated Since 0.7.0 +/// +/// Deprecated in favor of more general InputStream-based API +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema, int64_t offset, + io::RandomAccessFile* stream, std::shared_ptr* out); +#endif + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index 33719b3..96339c8 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -30,12 +30,12 @@ namespace ipc { // A typical usage would be: // $ | stream-to-file > file.arrow Status ConvertToFile() { - std::shared_ptr input(new io::StdinStream); - std::shared_ptr reader; - RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader)); + io::StdinStream input; + std::shared_ptr reader; + RETURN_NOT_OK(RecordBatchStreamReader::Open(&input, &reader)); io::StdoutStream sink; - std::shared_ptr writer; + std::shared_ptr writer; RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer)); std::shared_ptr batch; http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9492364..9c05cba 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -28,7 +28,8 @@ #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" -#include "arrow/ipc/metadata.h" +#include "arrow/ipc/message.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" #include "arrow/status.h" @@ -542,25 +543,9 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, return writer.Write(batch, dst, metadata_length, body_length); } -Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, - std::shared_ptr* out) { - int64_t size = 0; - RETURN_NOT_OK(GetRecordBatchSize(batch, &size)); - std::shared_ptr buffer; - RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); - - io::FixedSizeBufferWriter stream(buffer); - int32_t metadata_length = 0; - int64_t body_length = 0; - RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, - kMaxNestingDepth, true)); - *out = buffer; - return Status::OK(); -} - Status WriteRecordBatchStream(const std::vector>& batches, io::OutputStream* dst) { - std::shared_ptr writer; + std::shared_ptr writer; RETURN_NOT_OK(RecordBatchStreamWriter::Open(dst, batches[0]->schema(), &writer)); for (const auto& batch : batches) { // allow sizes > INT32_MAX @@ -633,32 +618,44 @@ RecordBatchWriter::~RecordBatchWriter() {} // ---------------------------------------------------------------------- // Stream writer implementation -class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { +class StreamBookKeeper { public: - RecordBatchStreamWriterImpl() - : pool_(default_memory_pool()), position_(-1), started_(false) {} + StreamBookKeeper() : sink_(nullptr), position_(-1) {} + explicit StreamBookKeeper(io::OutputStream* sink) : sink_(sink), position_(-1) {} - virtual ~RecordBatchStreamWriterImpl() = default; + Status UpdatePosition() { return sink_->Tell(&position_); } - Status Open(io::OutputStream* sink, const std::shared_ptr& schema) { - sink_ = sink; - schema_ = schema; - return UpdatePosition(); + Status Align(int64_t alignment = kArrowIpcAlignment) { + // Adds padding bytes if necessary to ensure all memory blocks are written on + // 8-byte (or other alignment) boundaries. + int64_t remainder = PaddedLength(position_, alignment) - position_; + if (remainder > 0) { + return Write(kPaddingBytes, remainder); + } + return Status::OK(); } - virtual Status Start() { - RETURN_NOT_OK(WriteSchema()); - - // If there are any dictionaries, write them as the next messages - RETURN_NOT_OK(WriteDictionaries()); - - started_ = true; + // Write data and update position + Status Write(const uint8_t* data, int64_t nbytes) { + RETURN_NOT_OK(sink_->Write(data, nbytes)); + position_ += nbytes; return Status::OK(); } + protected: + io::OutputStream* sink_; + int64_t position_; +}; + +class SchemaWriter : public StreamBookKeeper { + public: + SchemaWriter(const Schema& schema, DictionaryMemo* dictionary_memo, MemoryPool* pool, + io::OutputStream* sink) + : StreamBookKeeper(sink), schema_(schema), dictionary_memo_(dictionary_memo) {} + Status WriteSchema() { std::shared_ptr schema_fb; - RETURN_NOT_OK(WriteSchemaMessage(*schema_, &dictionary_memo_, &schema_fb)); + RETURN_NOT_OK(WriteSchemaMessage(schema_, dictionary_memo_, &schema_fb)); int32_t metadata_length = 0; RETURN_NOT_OK(WriteMessage(*schema_fb, sink_, &metadata_length)); @@ -667,34 +664,15 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { return Status::OK(); } - virtual Status Close() { - // Write the schema if not already written - // User is responsible for closing the OutputStream - RETURN_NOT_OK(CheckStarted()); + Status WriteDictionaries(std::vector* dictionaries) { + const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary(); - // Write 0 EOS message - const int32_t kEos = 0; - return Write(reinterpret_cast(&kEos), sizeof(int32_t)); - } - - Status CheckStarted() { - if (!started_) { - return Start(); - } - return Status::OK(); - } - - Status UpdatePosition() { return sink_->Tell(&position_); } - - Status WriteDictionaries() { - const DictionaryMap& id_to_dictionary = dictionary_memo_.id_to_dictionary(); - - dictionaries_.resize(id_to_dictionary.size()); + dictionaries->resize(id_to_dictionary.size()); // TODO(wesm): does sorting by id yield any benefit? int dict_index = 0; for (const auto& entry : id_to_dictionary) { - FileBlock* block = &dictionaries_[dict_index++]; + FileBlock* block = &(*dictionaries)[dict_index++]; block->offset = position_; @@ -709,8 +687,57 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { return Status::OK(); } + Status Write(std::vector* dictionaries) { + RETURN_NOT_OK(WriteSchema()); + + // If there are any dictionaries, write them as the next messages + return WriteDictionaries(dictionaries); + } + + private: + MemoryPool* pool_; + const Schema& schema_; + DictionaryMemo* dictionary_memo_; +}; + +class RecordBatchStreamWriter::RecordBatchStreamWriterImpl : public StreamBookKeeper { + public: + RecordBatchStreamWriterImpl(io::OutputStream* sink, + const std::shared_ptr& schema) + : StreamBookKeeper(sink), + schema_(schema), + pool_(default_memory_pool()), + started_(false) {} + + virtual ~RecordBatchStreamWriterImpl() = default; + + virtual Status Start() { + SchemaWriter schema_writer(*schema_, &dictionary_memo_, pool_, sink_); + RETURN_NOT_OK(schema_writer.Write(&dictionaries_)); + started_ = true; + return Status::OK(); + } + + virtual Status Close() { + // Write the schema if not already written + // User is responsible for closing the OutputStream + RETURN_NOT_OK(CheckStarted()); + + // Write 0 EOS message + const int32_t kEos = 0; + return Write(reinterpret_cast(&kEos), sizeof(int32_t)); + } + + Status CheckStarted() { + if (!started_) { + return Start(); + } + return Status::OK(); + } + Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit, FileBlock* block) { RETURN_NOT_OK(CheckStarted()); + RETURN_NOT_OK(UpdatePosition()); block->offset = position_; @@ -733,45 +760,22 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { &record_batches_[record_batches_.size() - 1]); } - Status Align(int64_t alignment = kArrowIpcAlignment) { - // Adds padding bytes if necessary to ensure all memory blocks are written on - // 8-byte (or other alignment) boundaries. - int64_t remainder = PaddedLength(position_, alignment) - position_; - if (remainder > 0) { - return Write(kPaddingBytes, remainder); - } - return Status::OK(); - } - - // Write data and update position - Status Write(const uint8_t* data, int64_t nbytes) { - RETURN_NOT_OK(sink_->Write(data, nbytes)); - position_ += nbytes; - return Status::OK(); - } - void set_memory_pool(MemoryPool* pool) { pool_ = pool; } protected: - io::OutputStream* sink_; std::shared_ptr schema_; + MemoryPool* pool_; + bool started_; // When writing out the schema, we keep track of all the dictionaries we // encounter, as they must be written out first in the stream DictionaryMemo dictionary_memo_; - MemoryPool* pool_; - - int64_t position_; - bool started_; - std::vector dictionaries_; std::vector record_batches_; }; -RecordBatchStreamWriter::RecordBatchStreamWriter() { - impl_.reset(new RecordBatchStreamWriterImpl()); -} +RecordBatchStreamWriter::RecordBatchStreamWriter() {} RecordBatchStreamWriter::~RecordBatchStreamWriter() {} @@ -786,11 +790,24 @@ void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) { Status RecordBatchStreamWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out) { + // ctor is private + auto result = std::shared_ptr(new RecordBatchStreamWriter()); + result->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema)); + *out = result; + return Status::OK(); +} + +#ifndef ARROW_NO_DEPRECATED_API +Status RecordBatchStreamWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, std::shared_ptr* out) { // ctor is private *out = std::shared_ptr(new RecordBatchStreamWriter()); - return (*out)->impl_->Open(sink, schema); + (*out)->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema)); + return Status::OK(); } +#endif Status RecordBatchStreamWriter::Close() { return impl_->Close(); } @@ -802,6 +819,9 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl public: using BASE = RecordBatchStreamWriter::RecordBatchStreamWriterImpl; + RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr& schema) + : BASE(sink, schema) {} + Status Start() override { // It is only necessary to align to 8-byte boundary at the start of the file RETURN_NOT_OK(Write(reinterpret_cast(kArrowMagicBytes), @@ -815,6 +835,8 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl Status Close() override { // Write metadata + RETURN_NOT_OK(UpdatePosition()); + int64_t initial_position = position_; RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, &dictionary_memo_, sink_)); @@ -836,19 +858,30 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl } }; -RecordBatchFileWriter::RecordBatchFileWriter() { - impl_.reset(new RecordBatchFileWriterImpl()); -} +RecordBatchFileWriter::RecordBatchFileWriter() {} RecordBatchFileWriter::~RecordBatchFileWriter() {} Status RecordBatchFileWriter::Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out) { + // ctor is private + auto result = std::shared_ptr(new RecordBatchFileWriter()); + result->impl_.reset(new RecordBatchFileWriterImpl(sink, schema)); + *out = result; + return Status::OK(); +} + +#ifndef ARROW_NO_DEPRECATED_API +Status RecordBatchFileWriter::Open(io::OutputStream* sink, + const std::shared_ptr& schema, std::shared_ptr* out) { - *out = std::shared_ptr( - new RecordBatchFileWriter()); // ctor is private - return (*out)->impl_->Open(sink, schema); + // ctor is private + *out = std::shared_ptr(new RecordBatchFileWriter()); + (*out)->impl_.reset(new RecordBatchFileWriterImpl(sink, schema)); + return Status::OK(); } +#endif Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { @@ -857,5 +890,39 @@ Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch, Status RecordBatchFileWriter::Close() { return impl_->Close(); } +// ---------------------------------------------------------------------- +// Serialization public APIs + +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + std::shared_ptr* out) { + int64_t size = 0; + RETURN_NOT_OK(GetRecordBatchSize(batch, &size)); + std::shared_ptr buffer; + RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); + + io::FixedSizeBufferWriter stream(buffer); + int32_t metadata_length = 0; + int64_t body_length = 0; + RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, + kMaxNestingDepth, true)); + *out = buffer; + return Status::OK(); +} + +Status SerializeSchema(const Schema& schema, MemoryPool* pool, + std::shared_ptr* out) { + std::shared_ptr stream; + RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream)); + + DictionaryMemo memo; + SchemaWriter schema_writer(schema, &memo, pool, stream.get()); + + // Unused + std::vector dictionary_blocks; + + RETURN_NOT_OK(schema_writer.Write(&dictionary_blocks)); + return stream->Finish(out); +} + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 79ea7c6..d867982 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -25,7 +25,7 @@ #include #include -#include "arrow/ipc/metadata.h" +#include "arrow/ipc/message.h" #include "arrow/util/visibility.h" namespace arrow { @@ -86,7 +86,13 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + +#ifndef ARROW_NO_DEPRECATED_API + /// \deprecated Since 0.7.0 + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, std::shared_ptr* out); +#endif Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; @@ -114,7 +120,13 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, + std::shared_ptr* out); + +#ifndef ARROW_NO_DEPRECATED_API + /// \deprecated Since 0.7.0 + static Status Open(io::OutputStream* sink, const std::shared_ptr& schema, std::shared_ptr* out); +#endif Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; @@ -155,14 +167,6 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth = kMaxNestingDepth, bool allow_64bit = false); -/// \brief Write dictionary message to output stream -/// -/// Low-level API -ARROW_EXPORT -Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, - int64_t buffer_start_offset, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length, MemoryPool* pool); - /// \brief Serialize record batch as encapsulated IPC message in a new buffer /// /// \param[in] batch the record batch @@ -173,6 +177,17 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out); +/// \brief Serialize schema using stream writer as a sequence of one or more +/// IPC messages +/// +/// \param[in] scheam the schema to write +/// \param[in] pool a MemoryPool to allocate memory from +/// \param[out] out the serialized schema +/// \return Status +ARROW_EXPORT +Status SerializeSchema(const Schema& schema, MemoryPool* pool, + std::shared_ptr* out); + /// \brief Write multiple record batches to OutputStream /// \param[in] batches a vector of batches. Must all have same schema /// \param[out] dst an OutputStream http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/python/arrow_to_python.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 622ef82..dcb06f8 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -189,22 +189,23 @@ Status DeserializeTuple(std::shared_ptr array, int64_t start_idx, int64_t DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM) } -Status ReadSerializedObject(std::shared_ptr src, - SerializedPyObject* out) { - std::shared_ptr reader; +Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) { int64_t offset; int64_t bytes_read; int32_t num_tensors; // Read number of tensors RETURN_NOT_OK( src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_tensors))); + + std::shared_ptr reader; RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader)); RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch)); + RETURN_NOT_OK(src->Tell(&offset)); offset += 4; // Skip the end-of-stream message for (int i = 0; i < num_tensors; ++i) { std::shared_ptr tensor; - RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor)); + RETURN_NOT_OK(ipc::ReadTensor(offset, src, &tensor)); out->tensors.push_back(tensor); RETURN_NOT_OK(src->Tell(&offset)); }