Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B2AA200B86 for ; Sun, 18 Sep 2016 22:02:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 39C0A160A8C; Sun, 18 Sep 2016 20:02:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 683B8160AC3 for ; Sun, 18 Sep 2016 22:02:08 +0200 (CEST) Received: (qmail 58132 invoked by uid 500); 18 Sep 2016 20:02:07 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 58118 invoked by uid 99); 18 Sep 2016 20:02:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Sep 2016 20:02:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52AD8E07FE; Sun, 18 Sep 2016 20:02:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Date: Sun, 18 Sep 2016 20:02:08 -0000 Message-Id: <96983846c9a241df8fbe44660740ab29@git.apache.org> In-Reply-To: <1d2fd122a0ff4caaa098122f53fe9471@git.apache.org> References: <1d2fd122a0ff4caaa098122f53fe9471@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] arrow git commit: ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io interfaces. Create arrow_ipc leaf library archived-at: Sun, 18 Sep 2016 20:02:11 -0000 ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io interfaces. Create arrow_ipc leaf library Several things here * Clean up IO interface class structure to be able to indicate precise characteristics of an implementation * Make the IPC reader/writer use more generic interfaces -- writing only needs an output stream, reading only needs a random access reader. This will unblock ARROW-267 * Create a separate arrow_ipc shared library Author: Wes McKinney Closes #138 from wesm/ARROW-280 and squashes the following commits: 6a59eb6 [Wes McKinney] * Restructure IO interfaces to accommodate more configurations. * Refactor memory mapped IO interfaces to be in line with other arrow::io classes. * Split arrow_ipc into a leaf library * Refactor pyarrow and arrow_parquet to suit. Move BufferReader to arrow_io. Pyarrow parquet tests currently segfault Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/559b8652 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/559b8652 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/559b8652 Branch: refs/heads/master Commit: 559b865226ec0f5d78e87957c2ff0f7711bec9a8 Parents: 17e90e1 Author: Wes McKinney Authored: Sun Sep 18 16:01:58 2016 -0400 Committer: Wes McKinney Committed: Sun Sep 18 16:01:58 2016 -0400 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 6 - cpp/src/arrow/io/CMakeLists.txt | 11 +- cpp/src/arrow/io/hdfs-io-test.cc | 315 ---------------------- cpp/src/arrow/io/hdfs.cc | 35 ++- cpp/src/arrow/io/hdfs.h | 29 +- cpp/src/arrow/io/interfaces.h | 71 ++++- cpp/src/arrow/io/io-hdfs-test.cc | 315 ++++++++++++++++++++++ cpp/src/arrow/io/io-memory-test.cc | 125 +++++++++ cpp/src/arrow/io/libhdfs_shim.cc | 3 +- cpp/src/arrow/io/memory.cc | 262 ++++++++++++++++++ cpp/src/arrow/io/memory.h | 130 +++++++++ cpp/src/arrow/io/test-common.h | 63 +++++ cpp/src/arrow/ipc/CMakeLists.txt | 58 +++- cpp/src/arrow/ipc/adapter.cc | 61 +++-- cpp/src/arrow/ipc/adapter.h | 39 +-- cpp/src/arrow/ipc/ipc-adapter-test.cc | 33 +-- cpp/src/arrow/ipc/ipc-memory-test.cc | 127 --------- cpp/src/arrow/ipc/memory.cc | 182 ------------- cpp/src/arrow/ipc/memory.h | 150 ----------- cpp/src/arrow/ipc/metadata-internal.cc | 9 +- cpp/src/arrow/ipc/metadata-internal.h | 2 +- cpp/src/arrow/ipc/metadata.h | 11 +- cpp/src/arrow/ipc/symbols.map | 18 ++ cpp/src/arrow/ipc/test-common.h | 25 -- cpp/src/arrow/ipc/util.h | 56 ++++ cpp/src/arrow/parquet/CMakeLists.txt | 1 + cpp/src/arrow/parquet/io.cc | 4 +- cpp/src/arrow/parquet/io.h | 4 +- cpp/src/arrow/parquet/parquet-io-test.cc | 51 +--- cpp/src/arrow/parquet/parquet-schema-test.cc | 3 +- cpp/src/arrow/parquet/reader.cc | 8 +- cpp/src/arrow/parquet/reader.h | 2 +- cpp/src/arrow/parquet/schema.cc | 2 +- cpp/src/arrow/parquet/writer.cc | 2 +- cpp/src/arrow/type.h | 4 +- cpp/src/arrow/util/memory-pool-test.cc | 2 +- python/pyarrow/includes/libarrow_io.pxd | 42 ++- python/pyarrow/includes/parquet.pxd | 18 +- python/pyarrow/io.pxd | 7 +- python/pyarrow/io.pyx | 14 +- python/pyarrow/parquet.pyx | 6 +- 41 files changed, 1288 insertions(+), 1018 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a39a752..be95dab 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -626,12 +626,6 @@ set(ARROW_SRCS src/arrow/table.cc src/arrow/type.cc - # IPC / Shared memory library; to be turned into an optional component - src/arrow/ipc/adapter.cc - src/arrow/ipc/memory.cc - src/arrow/ipc/metadata.cc - src/arrow/ipc/metadata-internal.cc - src/arrow/types/construct.cc src/arrow/types/decimal.cc src/arrow/types/json.cc http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index b8c0e13..87e227e 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -20,6 +20,7 @@ set(ARROW_IO_LINK_LIBS arrow_shared + dl ) if (ARROW_BOOST_USE_SHARED) @@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS ${ARROW_IO_PRIVATE_LINK_LIBS}) set(ARROW_IO_SRCS + memory.cc ) if(ARROW_HDFS) @@ -71,8 +73,8 @@ if(ARROW_HDFS) ${ARROW_HDFS_SRCS} ${ARROW_IO_SRCS}) - ADD_ARROW_TEST(hdfs-io-test) - ARROW_TEST_LINK_LIBRARIES(hdfs-io-test + ADD_ARROW_TEST(io-hdfs-test) + ARROW_TEST_LINK_LIBRARIES(io-hdfs-test ${ARROW_IO_TEST_LINK_LIBS}) endif() @@ -101,10 +103,15 @@ if (APPLE) INSTALL_NAME_DIR "@rpath") endif() +ADD_ARROW_TEST(io-memory-test) +ARROW_TEST_LINK_LIBRARIES(io-memory-test + ${ARROW_IO_TEST_LINK_LIBS}) + # Headers: top level install(FILES hdfs.h interfaces.h + memory.h DESTINATION include/arrow/io) install(TARGETS arrow_io http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs-io-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc deleted file mode 100644 index e48a281..0000000 --- a/cpp/src/arrow/io/hdfs-io-test.cc +++ /dev/null @@ -1,315 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include - -#include "gtest/gtest.h" - -#include // NOLINT - -#include "arrow/io/hdfs.h" -#include "arrow/test-util.h" -#include "arrow/util/status.h" - -namespace arrow { -namespace io { - -std::vector RandomData(int64_t size) { - std::vector buffer(size); - test::random_bytes(size, 0, buffer.data()); - return buffer; -} - -class TestHdfsClient : public ::testing::Test { - public: - Status MakeScratchDir() { - if (client_->Exists(scratch_dir_)) { - RETURN_NOT_OK((client_->Delete(scratch_dir_, true))); - } - return client_->CreateDirectory(scratch_dir_); - } - - Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, - bool append = false, int buffer_size = 0, int replication = 0, - int default_block_size = 0) { - std::shared_ptr file; - RETURN_NOT_OK(client_->OpenWriteable( - path, append, buffer_size, replication, default_block_size, &file)); - - RETURN_NOT_OK(file->Write(buffer, size)); - RETURN_NOT_OK(file->Close()); - - return Status::OK(); - } - - std::string ScratchPath(const std::string& name) { - std::stringstream ss; - ss << scratch_dir_ << "/" << name; - return ss.str(); - } - - std::string HdfsAbsPath(const std::string& relpath) { - std::stringstream ss; - ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath; - return ss.str(); - } - - protected: - // Set up shared state between unit tests - static void SetUpTestCase() { - if (!ConnectLibHdfs().ok()) { - std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; - return; - } - - loaded_libhdfs_ = true; - - const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); - const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); - const char* user = std::getenv("ARROW_HDFS_TEST_USER"); - - ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER"; - - conf_.host = host == nullptr ? "localhost" : host; - conf_.user = user; - conf_.port = port == nullptr ? 20500 : atoi(port); - - ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); - } - - static void TearDownTestCase() { - if (client_) { - EXPECT_OK(client_->Delete(scratch_dir_, true)); - EXPECT_OK(client_->Disconnect()); - } - } - - static bool loaded_libhdfs_; - - // Resources shared amongst unit tests - static HdfsConnectionConfig conf_; - static std::string scratch_dir_; - static std::shared_ptr client_; -}; - -bool TestHdfsClient::loaded_libhdfs_ = false; -HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig(); - -std::string TestHdfsClient::scratch_dir_ = - boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native(); - -std::shared_ptr TestHdfsClient::client_ = nullptr; - -#define SKIP_IF_NO_LIBHDFS() \ - if (!loaded_libhdfs_) { \ - std::cout << "No libhdfs, skipping" << std::endl; \ - return; \ - } - -TEST_F(TestHdfsClient, ConnectsAgain) { - SKIP_IF_NO_LIBHDFS(); - - std::shared_ptr client; - ASSERT_OK(HdfsClient::Connect(&conf_, &client)); - ASSERT_OK(client->Disconnect()); -} - -TEST_F(TestHdfsClient, CreateDirectory) { - SKIP_IF_NO_LIBHDFS(); - - std::string path = ScratchPath("create-directory"); - - if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); } - - ASSERT_OK(client_->CreateDirectory(path)); - ASSERT_TRUE(client_->Exists(path)); - EXPECT_OK(client_->Delete(path, true)); - ASSERT_FALSE(client_->Exists(path)); -} - -TEST_F(TestHdfsClient, GetCapacityUsed) { - SKIP_IF_NO_LIBHDFS(); - - // Who knows what is actually in your DFS cluster, but expect it to have - // positive used bytes and capacity - int64_t nbytes = 0; - ASSERT_OK(client_->GetCapacity(&nbytes)); - ASSERT_LT(0, nbytes); - - ASSERT_OK(client_->GetUsed(&nbytes)); - ASSERT_LT(0, nbytes); -} - -TEST_F(TestHdfsClient, GetPathInfo) { - SKIP_IF_NO_LIBHDFS(); - - HdfsPathInfo info; - - ASSERT_OK(MakeScratchDir()); - - // Directory info - ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info)); - ASSERT_EQ(ObjectType::DIRECTORY, info.kind); - ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name); - ASSERT_EQ(conf_.user, info.owner); - - // TODO(wesm): test group, other attrs - - auto path = ScratchPath("test-file"); - - const int size = 100; - - std::vector buffer = RandomData(size); - - ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); - ASSERT_OK(client_->GetPathInfo(path, &info)); - - ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(HdfsAbsPath(path), info.name); - ASSERT_EQ(conf_.user, info.owner); - ASSERT_EQ(size, info.size); -} - -TEST_F(TestHdfsClient, AppendToFile) { - SKIP_IF_NO_LIBHDFS(); - - ASSERT_OK(MakeScratchDir()); - - auto path = ScratchPath("test-file"); - const int size = 100; - - std::vector buffer = RandomData(size); - ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); - - // now append - ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true)); - - HdfsPathInfo info; - ASSERT_OK(client_->GetPathInfo(path, &info)); - ASSERT_EQ(size * 2, info.size); -} - -TEST_F(TestHdfsClient, ListDirectory) { - SKIP_IF_NO_LIBHDFS(); - - const int size = 100; - std::vector data = RandomData(size); - - auto p1 = ScratchPath("test-file-1"); - auto p2 = ScratchPath("test-file-2"); - auto d1 = ScratchPath("test-dir-1"); - - ASSERT_OK(MakeScratchDir()); - ASSERT_OK(WriteDummyFile(p1, data.data(), size)); - ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2)); - ASSERT_OK(client_->CreateDirectory(d1)); - - std::vector listing; - ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); - - // Do it again, appends! - ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); - - ASSERT_EQ(6, static_cast(listing.size())); - - // Argh, well, shouldn't expect the listing to be in any particular order - for (size_t i = 0; i < listing.size(); ++i) { - const HdfsPathInfo& info = listing[i]; - if (info.name == HdfsAbsPath(p1)) { - ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(size, info.size); - } else if (info.name == HdfsAbsPath(p2)) { - ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(size / 2, info.size); - } else if (info.name == HdfsAbsPath(d1)) { - ASSERT_EQ(ObjectType::DIRECTORY, info.kind); - } else { - FAIL() << "Unexpected path: " << info.name; - } - } -} - -TEST_F(TestHdfsClient, ReadableMethods) { - SKIP_IF_NO_LIBHDFS(); - - ASSERT_OK(MakeScratchDir()); - - auto path = ScratchPath("test-file"); - const int size = 100; - - std::vector data = RandomData(size); - ASSERT_OK(WriteDummyFile(path, data.data(), size)); - - std::shared_ptr file; - ASSERT_OK(client_->OpenReadable(path, &file)); - - // Test GetSize -- move this into its own unit test if ever needed - int64_t file_size; - ASSERT_OK(file->GetSize(&file_size)); - ASSERT_EQ(size, file_size); - - uint8_t buffer[50]; - int64_t bytes_read = 0; - - ASSERT_OK(file->Read(50, &bytes_read, buffer)); - ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50)); - ASSERT_EQ(50, bytes_read); - - ASSERT_OK(file->Read(50, &bytes_read, buffer)); - ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50)); - ASSERT_EQ(50, bytes_read); - - // EOF - ASSERT_OK(file->Read(1, &bytes_read, buffer)); - ASSERT_EQ(0, bytes_read); - - // ReadAt to EOF - ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer)); - ASSERT_EQ(40, bytes_read); - ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read)); - - // Seek, Tell - ASSERT_OK(file->Seek(60)); - - int64_t position; - ASSERT_OK(file->Tell(&position)); - ASSERT_EQ(60, position); -} - -TEST_F(TestHdfsClient, RenameFile) { - SKIP_IF_NO_LIBHDFS(); - - ASSERT_OK(MakeScratchDir()); - - auto src_path = ScratchPath("src-file"); - auto dst_path = ScratchPath("dst-file"); - const int size = 100; - - std::vector data = RandomData(size); - ASSERT_OK(WriteDummyFile(src_path, data.data(), size)); - - ASSERT_OK(client_->Rename(src_path, dst_path)); - - ASSERT_FALSE(client_->Exists(src_path)); - ASSERT_TRUE(client_->Exists(dst_path)); -} - -} // namespace io -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 800c3ed..a6b4b2f 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt( return impl_->ReadAt(position, nbytes, bytes_read, buffer); } +Status HdfsReadableFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + return Status::NotImplemented("Not yet implemented"); +} + +bool HdfsReadableFile::supports_zero_copy() const { + return false; +} + Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { return impl_->Read(nbytes, bytes_read, buffer); } @@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) { // File writing // Private implementation for writeable-only files -class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { +class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { public: - HdfsWriteableFileImpl() {} + HdfsOutputStreamImpl() {} Status Close() { if (is_open_) { @@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { } }; -HdfsWriteableFile::HdfsWriteableFile() { - impl_.reset(new HdfsWriteableFileImpl()); +HdfsOutputStream::HdfsOutputStream() { + impl_.reset(new HdfsOutputStreamImpl()); } -HdfsWriteableFile::~HdfsWriteableFile() { +HdfsOutputStream::~HdfsOutputStream() { impl_->Close(); } -Status HdfsWriteableFile::Close() { +Status HdfsOutputStream::Close() { return impl_->Close(); } -Status HdfsWriteableFile::Write( +Status HdfsOutputStream::Write( const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { return impl_->Write(buffer, nbytes, bytes_read); } -Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) { +Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) { int64_t bytes_written_dummy = 0; return Write(buffer, nbytes, &bytes_written_dummy); } -Status HdfsWriteableFile::Tell(int64_t* position) { +Status HdfsOutputStream::Tell(int64_t* position) { return impl_->Tell(position); } @@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl { Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { + std::shared_ptr* file) { int flags = O_WRONLY; if (append) flags |= O_APPEND; @@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl { } // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsWriteableFile()); + *file = std::shared_ptr(new HdfsOutputStream()); (*file)->impl_->set_members(path, fs_, handle); return Status::OK(); @@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable( Status HdfsClient::OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { + std::shared_ptr* file) { return impl_->OpenWriteable( path, append, buffer_size, replication, default_block_size, file); } Status HdfsClient::OpenWriteable( - const std::string& path, bool append, std::shared_ptr* file) { + const std::string& path, bool append, std::shared_ptr* file) { return OpenWriteable(path, append, 0, 0, 0, file); } http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index b6449fc..39720cc 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -29,13 +29,14 @@ namespace arrow { +class Buffer; class Status; namespace io { class HdfsClient; class HdfsReadableFile; -class HdfsWriteableFile; +class HdfsOutputStream; struct HdfsPathInfo { ObjectType::type kind; @@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @param default_block_size, 0 for default Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr* file); + std::shared_ptr* file); Status OpenWriteable( - const std::string& path, bool append, std::shared_ptr* file); + const std::string& path, bool append, std::shared_ptr* file); private: friend class HdfsReadableFile; - friend class HdfsWriteableFile; + friend class HdfsOutputStream; class ARROW_NO_EXPORT HdfsClientImpl; std::unique_ptr impl_; @@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { DISALLOW_COPY_AND_ASSIGN(HdfsClient); }; -class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { +class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { public: ~HdfsReadableFile(); @@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { Status ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + bool supports_zero_copy() const override; + Status Seek(int64_t position) override; Status Tell(int64_t* position) override; @@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); }; -class ARROW_EXPORT HdfsWriteableFile : public WriteableFile { +// Naming this file OutputStream because it does not support seeking (like the +// WriteableFile interface) +class ARROW_EXPORT HdfsOutputStream : public OutputStream { public: - ~HdfsWriteableFile(); + ~HdfsOutputStream(); Status Close() override; @@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile { Status Tell(int64_t* position) override; private: - class ARROW_NO_EXPORT HdfsWriteableFileImpl; - std::unique_ptr impl_; + class ARROW_NO_EXPORT HdfsOutputStreamImpl; + std::unique_ptr impl_; friend class HdfsClient::HdfsClientImpl; - HdfsWriteableFile(); + HdfsOutputStream(); - DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile); + DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); }; Status ARROW_EXPORT ConnectLibHdfs(); http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/interfaces.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index c212852..fa34b43 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -21,8 +21,11 @@ #include #include +#include "arrow/util/macros.h" + namespace arrow { +class Buffer; class Status; namespace io { @@ -40,30 +43,78 @@ class FileSystemClient { virtual ~FileSystemClient() {} }; -class FileBase { +class FileInterface { public: + virtual ~FileInterface() {} virtual Status Close() = 0; virtual Status Tell(int64_t* position) = 0; + + FileMode::type mode() const { return mode_; } + + protected: + FileInterface() {} + FileMode::type mode_; + + void set_mode(FileMode::type mode) { mode_ = mode; } + + private: + DISALLOW_COPY_AND_ASSIGN(FileInterface); }; -class ReadableFile : public FileBase { +class Seekable { public: - virtual Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; + virtual Status Seek(int64_t position) = 0; +}; - virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0; +class Writeable { + public: + virtual Status Write(const uint8_t* data, int64_t nbytes) = 0; +}; - virtual Status GetSize(int64_t* size) = 0; +class Readable { + public: + virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; +}; + +class OutputStream : public FileInterface, public Writeable { + protected: + OutputStream() {} }; -class RandomAccessFile : public ReadableFile { +class InputStream : public FileInterface, public Readable { + protected: + InputStream() {} +}; + +class ReadableFileInterface : public InputStream, public Seekable { public: - virtual Status Seek(int64_t position) = 0; + virtual Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; + + virtual Status GetSize(int64_t* size) = 0; + + // Does not copy if not necessary + virtual Status ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) = 0; + + virtual bool supports_zero_copy() const = 0; + + protected: + ReadableFileInterface() { set_mode(FileMode::READ); } }; -class WriteableFile : public FileBase { +class WriteableFileInterface : public OutputStream, public Seekable { public: - virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0; + virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0; + + protected: + WriteableFileInterface() { set_mode(FileMode::READ); } +}; + +class ReadWriteFileInterface : public ReadableFileInterface, + public WriteableFileInterface { + protected: + ReadWriteFileInterface() { ReadableFileInterface::set_mode(FileMode::READWRITE); } }; } // namespace io http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc new file mode 100644 index 0000000..7901932 --- /dev/null +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -0,0 +1,315 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include // NOLINT + +#include "arrow/io/hdfs.h" +#include "arrow/test-util.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +std::vector RandomData(int64_t size) { + std::vector buffer(size); + test::random_bytes(size, 0, buffer.data()); + return buffer; +} + +class TestHdfsClient : public ::testing::Test { + public: + Status MakeScratchDir() { + if (client_->Exists(scratch_dir_)) { + RETURN_NOT_OK((client_->Delete(scratch_dir_, true))); + } + return client_->CreateDirectory(scratch_dir_); + } + + Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, + bool append = false, int buffer_size = 0, int replication = 0, + int default_block_size = 0) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenWriteable( + path, append, buffer_size, replication, default_block_size, &file)); + + RETURN_NOT_OK(file->Write(buffer, size)); + RETURN_NOT_OK(file->Close()); + + return Status::OK(); + } + + std::string ScratchPath(const std::string& name) { + std::stringstream ss; + ss << scratch_dir_ << "/" << name; + return ss.str(); + } + + std::string HdfsAbsPath(const std::string& relpath) { + std::stringstream ss; + ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath; + return ss.str(); + } + + protected: + // Set up shared state between unit tests + static void SetUpTestCase() { + if (!ConnectLibHdfs().ok()) { + std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; + return; + } + + loaded_libhdfs_ = true; + + const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); + const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); + const char* user = std::getenv("ARROW_HDFS_TEST_USER"); + + ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER"; + + conf_.host = host == nullptr ? "localhost" : host; + conf_.user = user; + conf_.port = port == nullptr ? 20500 : atoi(port); + + ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); + } + + static void TearDownTestCase() { + if (client_) { + EXPECT_OK(client_->Delete(scratch_dir_, true)); + EXPECT_OK(client_->Disconnect()); + } + } + + static bool loaded_libhdfs_; + + // Resources shared amongst unit tests + static HdfsConnectionConfig conf_; + static std::string scratch_dir_; + static std::shared_ptr client_; +}; + +bool TestHdfsClient::loaded_libhdfs_ = false; +HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig(); + +std::string TestHdfsClient::scratch_dir_ = + boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native(); + +std::shared_ptr TestHdfsClient::client_ = nullptr; + +#define SKIP_IF_NO_LIBHDFS() \ + if (!loaded_libhdfs_) { \ + std::cout << "No libhdfs, skipping" << std::endl; \ + return; \ + } + +TEST_F(TestHdfsClient, ConnectsAgain) { + SKIP_IF_NO_LIBHDFS(); + + std::shared_ptr client; + ASSERT_OK(HdfsClient::Connect(&conf_, &client)); + ASSERT_OK(client->Disconnect()); +} + +TEST_F(TestHdfsClient, CreateDirectory) { + SKIP_IF_NO_LIBHDFS(); + + std::string path = ScratchPath("create-directory"); + + if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); } + + ASSERT_OK(client_->CreateDirectory(path)); + ASSERT_TRUE(client_->Exists(path)); + EXPECT_OK(client_->Delete(path, true)); + ASSERT_FALSE(client_->Exists(path)); +} + +TEST_F(TestHdfsClient, GetCapacityUsed) { + SKIP_IF_NO_LIBHDFS(); + + // Who knows what is actually in your DFS cluster, but expect it to have + // positive used bytes and capacity + int64_t nbytes = 0; + ASSERT_OK(client_->GetCapacity(&nbytes)); + ASSERT_LT(0, nbytes); + + ASSERT_OK(client_->GetUsed(&nbytes)); + ASSERT_LT(0, nbytes); +} + +TEST_F(TestHdfsClient, GetPathInfo) { + SKIP_IF_NO_LIBHDFS(); + + HdfsPathInfo info; + + ASSERT_OK(MakeScratchDir()); + + // Directory info + ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info)); + ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name); + ASSERT_EQ(conf_.user, info.owner); + + // TODO(wesm): test group, other attrs + + auto path = ScratchPath("test-file"); + + const int size = 100; + + std::vector buffer = RandomData(size); + + ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); + ASSERT_OK(client_->GetPathInfo(path, &info)); + + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(HdfsAbsPath(path), info.name); + ASSERT_EQ(conf_.user, info.owner); + ASSERT_EQ(size, info.size); +} + +TEST_F(TestHdfsClient, AppendToFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-file"); + const int size = 100; + + std::vector buffer = RandomData(size); + ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); + + // now append + ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true)); + + HdfsPathInfo info; + ASSERT_OK(client_->GetPathInfo(path, &info)); + ASSERT_EQ(size * 2, info.size); +} + +TEST_F(TestHdfsClient, ListDirectory) { + SKIP_IF_NO_LIBHDFS(); + + const int size = 100; + std::vector data = RandomData(size); + + auto p1 = ScratchPath("test-file-1"); + auto p2 = ScratchPath("test-file-2"); + auto d1 = ScratchPath("test-dir-1"); + + ASSERT_OK(MakeScratchDir()); + ASSERT_OK(WriteDummyFile(p1, data.data(), size)); + ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2)); + ASSERT_OK(client_->CreateDirectory(d1)); + + std::vector listing; + ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); + + // Do it again, appends! + ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); + + ASSERT_EQ(6, static_cast(listing.size())); + + // Argh, well, shouldn't expect the listing to be in any particular order + for (size_t i = 0; i < listing.size(); ++i) { + const HdfsPathInfo& info = listing[i]; + if (info.name == HdfsAbsPath(p1)) { + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(size, info.size); + } else if (info.name == HdfsAbsPath(p2)) { + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(size / 2, info.size); + } else if (info.name == HdfsAbsPath(d1)) { + ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + } else { + FAIL() << "Unexpected path: " << info.name; + } + } +} + +TEST_F(TestHdfsClient, ReadableMethods) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-file"); + const int size = 100; + + std::vector data = RandomData(size); + ASSERT_OK(WriteDummyFile(path, data.data(), size)); + + std::shared_ptr file; + ASSERT_OK(client_->OpenReadable(path, &file)); + + // Test GetSize -- move this into its own unit test if ever needed + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(size, file_size); + + uint8_t buffer[50]; + int64_t bytes_read = 0; + + ASSERT_OK(file->Read(50, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50)); + ASSERT_EQ(50, bytes_read); + + ASSERT_OK(file->Read(50, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50)); + ASSERT_EQ(50, bytes_read); + + // EOF + ASSERT_OK(file->Read(1, &bytes_read, buffer)); + ASSERT_EQ(0, bytes_read); + + // ReadAt to EOF + ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer)); + ASSERT_EQ(40, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read)); + + // Seek, Tell + ASSERT_OK(file->Seek(60)); + + int64_t position; + ASSERT_OK(file->Tell(&position)); + ASSERT_EQ(60, position); +} + +TEST_F(TestHdfsClient, RenameFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto src_path = ScratchPath("src-file"); + auto dst_path = ScratchPath("dst-file"); + const int size = 100; + + std::vector data = RandomData(size); + ASSERT_OK(WriteDummyFile(src_path, data.data(), size)); + + ASSERT_OK(client_->Rename(src_path, dst_path)); + + ASSERT_FALSE(client_->Exists(src_path)); + ASSERT_TRUE(client_->Exists(dst_path)); +} + +} // namespace io +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc new file mode 100644 index 0000000..6de35da --- /dev/null +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "arrow/io/memory.h" +#include "arrow/io/test-common.h" + +namespace arrow { +namespace io { + +class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture { + public: + void TearDown() { MemoryMapFixture::TearDown(); } +}; + +TEST_F(TestMemoryMappedFile, InvalidUsages) {} + +TEST_F(TestMemoryMappedFile, WriteRead) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + const int reps = 5; + + std::string path = "ipc-write-read-test"; + CreateFile(path, reps * buffer_size); + + std::shared_ptr result; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result)); + + int64_t position = 0; + std::shared_ptr out_buffer; + for (int i = 0; i < reps; ++i) { + ASSERT_OK(result->Write(buffer.data(), buffer_size)); + ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer)); + + ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); + + position += buffer_size; + } +} + +TEST_F(TestMemoryMappedFile, ReadOnly) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + const int reps = 5; + + std::string path = "ipc-read-only-test"; + CreateFile(path, reps * buffer_size); + + std::shared_ptr rwmmap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap)); + + int64_t position = 0; + for (int i = 0; i < reps; ++i) { + ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size)); + position += buffer_size; + } + rwmmap->Close(); + + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); + + position = 0; + std::shared_ptr out_buffer; + for (int i = 0; i < reps; ++i) { + ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer)); + + ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); + position += buffer_size; + } + rommap->Close(); +} + +TEST_F(TestMemoryMappedFile, InvalidMode) { + const int64_t buffer_size = 1024; + std::vector buffer(buffer_size); + + test::random_bytes(1024, 0, buffer.data()); + + std::string path = "ipc-invalid-mode-test"; + CreateFile(path, buffer_size); + + std::shared_ptr rommap; + ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap)); + + ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size)); +} + +TEST_F(TestMemoryMappedFile, InvalidFile) { + std::string non_existent_path = "invalid-file-name-asfd"; + + std::shared_ptr result; + ASSERT_RAISES( + IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result)); +} + +} // namespace io +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/libhdfs_shim.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc index 003570d..0b805ab 100644 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -51,8 +51,7 @@ extern "C" { #include #include -#include // NOLINT -#include // NOLINT +#include // NOLINT #include "arrow/util/status.h" #include "arrow/util/visibility.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc new file mode 100644 index 0000000..1dd6c3a --- /dev/null +++ b/cpp/src/arrow/io/memory.cc @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/memory.h" + +#include // For memory-mapping + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/io/interfaces.h" + +#include "arrow/util/buffer.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +// Implement MemoryMappedFile + +class MemoryMappedFile::MemoryMappedFileImpl { + public: + MemoryMappedFileImpl() + : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {} + + ~MemoryMappedFileImpl() { + if (is_open_) { + munmap(data_, size_); + fclose(file_); + } + } + + Status Open(const std::string& path, FileMode::type mode) { + if (is_open_) { return Status::IOError("A file is already open"); } + + int prot_flags = PROT_READ; + + if (mode == FileMode::READWRITE) { + file_ = fopen(path.c_str(), "r+b"); + prot_flags |= PROT_WRITE; + is_writable_ = true; + } else { + file_ = fopen(path.c_str(), "rb"); + } + if (file_ == nullptr) { + std::stringstream ss; + ss << "Unable to open file, errno: " << errno; + return Status::IOError(ss.str()); + } + + fseek(file_, 0L, SEEK_END); + if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); } + size_ = ftell(file_); + + fseek(file_, 0L, SEEK_SET); + is_open_ = true; + position_ = 0; + + void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0); + if (result == MAP_FAILED) { + std::stringstream ss; + ss << "Memory mapping file failed, errno: " << errno; + return Status::IOError(ss.str()); + } + data_ = reinterpret_cast(result); + + return Status::OK(); + } + + int64_t size() const { return size_; } + + Status Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::Invalid("position is out of bounds"); + } + position_ = position; + return Status::OK(); + } + + int64_t position() { return position_; } + + void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); } + + uint8_t* data() { return data_; } + + uint8_t* head() { return data_ + position_; } + + bool writable() { return is_writable_; } + + bool opened() { return is_open_; } + + private: + FILE* file_; + int64_t position_; + int64_t size_; + bool is_open_; + bool is_writable_; + + // The memory map + uint8_t* data_; +}; + +MemoryMappedFile::MemoryMappedFile(FileMode::type mode) { + ReadableFileInterface::set_mode(mode); +} + +Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, + std::shared_ptr* out) { + std::shared_ptr result(new MemoryMappedFile(mode)); + + result->impl_.reset(new MemoryMappedFileImpl()); + RETURN_NOT_OK(result->impl_->Open(path, mode)); + + *out = result; + return Status::OK(); +} + +Status MemoryMappedFile::GetSize(int64_t* size) { + *size = impl_->size(); + return Status::OK(); +} + +Status MemoryMappedFile::Tell(int64_t* position) { + *position = impl_->position(); + return Status::OK(); +} + +Status MemoryMappedFile::Seek(int64_t position) { + return impl_->Seek(position); +} + +Status MemoryMappedFile::Close() { + // munmap handled in pimpl dtor + return Status::OK(); +} + +Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + nbytes = std::min(nbytes, impl_->size() - impl_->position()); + std::memcpy(out, impl_->head(), nbytes); + *bytes_read = nbytes; + impl_->advance(nbytes); + return Status::OK(); +} + +Status MemoryMappedFile::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + RETURN_NOT_OK(impl_->Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status MemoryMappedFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + nbytes = std::min(nbytes, impl_->size() - position); + RETURN_NOT_OK(impl_->Seek(position)); + *out = std::make_shared(impl_->head(), nbytes); + impl_->advance(nbytes); + return Status::OK(); +} + +bool MemoryMappedFile::supports_zero_copy() const { + return true; +} + +Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { + if (!impl_->opened() || !impl_->writable()) { + return Status::IOError("Unable to write"); + } + + RETURN_NOT_OK(impl_->Seek(position)); + return WriteInternal(data, nbytes); +} + +Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) { + if (!impl_->opened() || !impl_->writable()) { + return Status::IOError("Unable to write"); + } + if (nbytes + impl_->position() > impl_->size()) { + return Status::Invalid("Cannot write past end of memory map"); + } + + return WriteInternal(data, nbytes); +} + +Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) { + memcpy(impl_->head(), data, nbytes); + impl_->advance(nbytes); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// In-memory buffer reader + +Status BufferReader::Close() { + // no-op + return Status::OK(); +} + +Status BufferReader::Tell(int64_t* position) { + *position = position_; + return Status::OK(); +} + +Status BufferReader::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); +} + +Status BufferReader::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + int64_t size = std::min(nbytes, buffer_size_ - position_); + *out = std::make_shared(buffer_ + position, size); + position_ += nbytes; + return Status::OK(); +} + +bool BufferReader::supports_zero_copy() const { + return true; +} + +Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { + memcpy(buffer, buffer_ + position_, nbytes); + *bytes_read = std::min(nbytes, buffer_size_ - position_); + position_ += *bytes_read; + return Status::OK(); +} + +Status BufferReader::GetSize(int64_t* size) { + *size = buffer_size_; + return Status::OK(); +} + +Status BufferReader::Seek(int64_t position) { + if (position < 0 || position >= buffer_size_) { + return Status::IOError("position out of bounds"); + } + + position_ = position; + return Status::OK(); +} + +} // namespace io +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h new file mode 100644 index 0000000..6fe47c3 --- /dev/null +++ b/cpp/src/arrow/io/memory.h @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Public API for different memory sharing / IO mechanisms + +#ifndef ARROW_IO_MEMORY_H +#define ARROW_IO_MEMORY_H + +#include +#include +#include + +#include "arrow/io/interfaces.h" + +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class MutableBuffer; +class Status; + +namespace io { + +// An output stream that writes to a MutableBuffer, such as one obtained from a +// memory map +// +// TODO(wesm): Implement this class +class ARROW_EXPORT BufferOutputStream : public OutputStream { + public: + explicit BufferOutputStream(const std::shared_ptr& buffer) + : buffer_(buffer) {} + + // Implement the OutputStream interface + Status Close() override; + Status Tell(int64_t* position) override; + Status Write(const uint8_t* data, int64_t length) override; + + // Returns the number of bytes remaining in the buffer + int64_t bytes_remaining() const; + + private: + std::shared_ptr buffer_; + int64_t capacity_; + int64_t position_; +}; + +// A memory source that uses memory-mapped files for memory interactions +class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { + public: + static Status Open(const std::string& path, FileMode::type mode, + std::shared_ptr* out); + + Status Close() override; + + Status Tell(int64_t* position) override; + + Status Seek(int64_t position) override; + + // Required by ReadableFileInterface, copies memory into out + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + + // Read into a buffer, zero copy if possible + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + bool supports_zero_copy() const override; + + Status Write(const uint8_t* data, int64_t nbytes) override; + + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + + // @return: the size in bytes of the memory source + Status GetSize(int64_t* size) override; + + private: + explicit MemoryMappedFile(FileMode::type mode); + + Status WriteInternal(const uint8_t* data, int64_t nbytes); + + // Hide the internal details of this class for now + class MemoryMappedFileImpl; + std::unique_ptr impl_; +}; + +class ARROW_EXPORT BufferReader : public ReadableFileInterface { + public: + BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + + Status Close() override; + Status Tell(int64_t* position) override; + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status GetSize(int64_t* size) override; + Status Seek(int64_t position) override; + + bool supports_zero_copy() const override; + + private: + const uint8_t* buffer_; + int buffer_size_; + int64_t position_; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_MEMORY_H http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h new file mode 100644 index 0000000..1954d47 --- /dev/null +++ b/cpp/src/arrow/io/test-common.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_TEST_COMMON_H +#define ARROW_IO_TEST_COMMON_H + +#include +#include +#include +#include +#include + +#include "arrow/io/memory.h" +#include "arrow/test-util.h" +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" + +namespace arrow { +namespace io { + +class MemoryMapFixture { + public: + void TearDown() { + for (auto path : tmp_files_) { + std::remove(path.c_str()); + } + } + + void CreateFile(const std::string path, int64_t size) { + FILE* file = fopen(path.c_str(), "w"); + if (file != nullptr) { tmp_files_.push_back(path); } + ftruncate(fileno(file), size); + fclose(file); + } + + Status InitMemoryMap( + int64_t size, const std::string& path, std::shared_ptr* mmap) { + CreateFile(path, size); + return MemoryMappedFile::Open(path, FileMode::READWRITE, mmap); + } + + private: + std::vector tmp_files_; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_TEST_COMMON_H http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 8263416..e5553a6 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -19,16 +19,50 @@ # arrow_ipc ####################################### -# Headers: top level -install(FILES - adapter.h - metadata.h - memory.h - DESTINATION include/arrow/ipc) +set(ARROW_IPC_LINK_LIBS + arrow_io + arrow_shared +) + +set(ARROW_IPC_PRIVATE_LINK_LIBS + ) + +set(ARROW_IPC_TEST_LINK_LIBS + arrow_ipc + ${ARROW_IPC_PRIVATE_LINK_LIBS}) + +set(ARROW_IPC_SRCS + adapter.cc + metadata.cc + metadata-internal.cc +) + +# TODO(wesm): SHARED and STATIC targets +add_library(arrow_ipc SHARED + ${ARROW_IPC_SRCS} +) +target_link_libraries(arrow_ipc + LINK_PUBLIC ${ARROW_IPC_LINK_LIBS} + LINK_PRIVATE ${ARROW_IPC_PRIVATE_LINK_LIBS}) + +if(NOT APPLE) + # Localize thirdparty symbols using a linker version script. This hides them + # from the client application. The OS X linker does not support the + # version-script option. + set(ARROW_IPC_LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/symbols.map") +endif() + +SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES + LINKER_LANGUAGE CXX + LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}") ADD_ARROW_TEST(ipc-adapter-test) -ADD_ARROW_TEST(ipc-memory-test) +ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test + ${ARROW_IPC_TEST_LINK_LIBS}) + ADD_ARROW_TEST(ipc-metadata-test) +ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test + ${ARROW_IPC_TEST_LINK_LIBS}) # make clean will delete the generated file set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE) @@ -49,3 +83,13 @@ add_custom_command( add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES}) add_dependencies(arrow_objlib metadata_fbs) + +# Headers: top level +install(FILES + adapter.h + metadata.h + DESTINATION include/arrow/ipc) + +install(TARGETS arrow_ipc + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 40d372b..0e101c8 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -24,9 +24,11 @@ #include "arrow/array.h" #include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/memory.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" +#include "arrow/ipc/util.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/type.h" @@ -144,10 +146,15 @@ class RowBatchWriter { return Status::OK(); } - Status Write(MemorySource* dst, int64_t position, int64_t* data_header_offset) { + Status Write(io::OutputStream* dst, int64_t* data_header_offset) { // Write out all the buffers contiguously and compute the total size of the // memory payload int64_t offset = 0; + + // Get the starting position + int64_t position; + RETURN_NOT_OK(dst->Tell(&position)); + for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); int64_t size = 0; @@ -171,7 +178,7 @@ class RowBatchWriter { buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size)); if (size > 0) { - RETURN_NOT_OK(dst->Write(position + offset, buffer->data(), size)); + RETURN_NOT_OK(dst->Write(buffer->data(), size)); offset += size; } } @@ -180,7 +187,7 @@ class RowBatchWriter { // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer - // itself as an int32_t. On reading from a MemorySource, you will have to + // itself as an int32_t. On reading from a input, you will have to // determine the data header size then request a buffer such that you can // construct the flatbuffer data accessor object (see arrow::ipc::Message) std::shared_ptr data_header; @@ -188,8 +195,7 @@ class RowBatchWriter { batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header)); // Write the data header at the end - RETURN_NOT_OK( - dst->Write(position + offset, data_header->data(), data_header->size())); + RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size())); *data_header_offset = position + offset; return Status::OK(); @@ -199,9 +205,9 @@ class RowBatchWriter { Status GetTotalSize(int64_t* size) { // emulates the behavior of Write without actually writing int64_t data_header_offset; - MockMemorySource source(0); - RETURN_NOT_OK(Write(&source, 0, &data_header_offset)); - *size = source.GetExtentBytesWritten(); + MockOutputStream dst; + RETURN_NOT_OK(Write(&dst, &data_header_offset)); + *size = dst.GetExtentBytesWritten(); return Status::OK(); } @@ -214,12 +220,12 @@ class RowBatchWriter { int max_recursion_depth_; }; -Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position, - int64_t* header_offset, int max_recursion_depth) { +Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* header_offset, + int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); RowBatchWriter serializer(batch, max_recursion_depth); RETURN_NOT_OK(serializer.AssemblePayload()); - return serializer.Write(dst, position, header_offset); + return serializer.Write(dst, header_offset); } Status GetRowBatchSize(const RowBatch* batch, int64_t* size) { @@ -234,11 +240,11 @@ Status GetRowBatchSize(const RowBatch* batch, int64_t* size) { static constexpr int64_t INIT_METADATA_SIZE = 4096; -class RowBatchReader::Impl { +class RowBatchReader::RowBatchReaderImpl { public: - Impl(MemorySource* source, const std::shared_ptr& metadata, - int max_recursion_depth) - : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { + RowBatchReaderImpl(io::ReadableFileInterface* file, + const std::shared_ptr& metadata, int max_recursion_depth) + : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { num_buffers_ = metadata->num_buffers(); num_flattened_fields_ = metadata->num_fields(); } @@ -339,10 +345,11 @@ class RowBatchReader::Impl { Status GetBuffer(int buffer_index, std::shared_ptr* out) { BufferMetadata metadata = metadata_->buffer(buffer_index); RETURN_NOT_OK(CheckMultipleOf64(metadata.length)); - return source_->ReadAt(metadata.offset, metadata.length, out); + return file_->ReadAt(metadata.offset, metadata.length, out); } - MemorySource* source_; + private: + io::ReadableFileInterface* file_; std::shared_ptr metadata_; int field_index_; @@ -352,22 +359,22 @@ class RowBatchReader::Impl { int num_flattened_fields_; }; -Status RowBatchReader::Open( - MemorySource* source, int64_t position, std::shared_ptr* out) { - return Open(source, position, kMaxIpcRecursionDepth, out); +Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, + std::shared_ptr* out) { + return Open(file, position, kMaxIpcRecursionDepth, out); } -Status RowBatchReader::Open(MemorySource* source, int64_t position, +Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position, int max_recursion_depth, std::shared_ptr* out) { std::shared_ptr metadata; - RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata)); + RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata)); int32_t metadata_size = *reinterpret_cast(metadata->data()); - // We may not need to call source->ReadAt again + // We may not need to call ReadAt again if (metadata_size > static_cast(INIT_METADATA_SIZE - sizeof(int32_t))) { // We don't have enough data, read the indicated metadata size. - RETURN_NOT_OK(source->ReadAt(position + sizeof(int32_t), metadata_size, &metadata)); + RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata)); } // TODO(wesm): buffer slicing here would be better in case ReadAt returns @@ -383,14 +390,14 @@ Status RowBatchReader::Open(MemorySource* source, int64_t position, std::shared_ptr batch_meta = message->GetRecordBatch(); std::shared_ptr result(new RowBatchReader()); - result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth)); + result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, max_recursion_depth)); *out = result; return Status::OK(); } // Here the explicit destructor is required for compilers to be aware of -// the complete information of RowBatchReader::Impl class +// the complete information of RowBatchReader::RowBatchReaderImpl class RowBatchReader::~RowBatchReader() {} Status RowBatchReader::GetRowBatch( http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 6231af6..215b46f 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -33,9 +33,15 @@ class RowBatch; class Schema; class Status; +namespace io { + +class ReadableFileInterface; +class OutputStream; + +} // namespace io + namespace ipc { -class MemorySource; class RecordBatchMessage; // ---------------------------------------------------------------------- @@ -43,22 +49,21 @@ class RecordBatchMessage; // We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number // TODO(emkornfield) investigate this more constexpr int kMaxIpcRecursionDepth = 64; -// Write the RowBatch (collection of equal-length Arrow arrays) to the memory -// source at the indicated position + +// Write the RowBatch (collection of equal-length Arrow arrays) to the output +// stream // -// First, each of the memory buffers are written out end-to-end in starting at -// the indicated position. +// First, each of the memory buffers are written out end-to-end // // Then, this function writes the batch metadata as a flatbuffer (see // format/Message.fbs -- the RecordBatch message type) like so: // // // -// Finally, the memory offset to the start of the metadata / data header is -// returned in an out-variable -ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, - int64_t position, int64_t* header_offset, - int max_recursion_depth = kMaxIpcRecursionDepth); +// Finally, the absolute offset (relative to the start of the output stream) to +// the start of the metadata / data header is returned in an out-variable +ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, + int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth); // int64_t GetRowBatchMetadata(const RowBatch* batch); @@ -68,16 +73,16 @@ ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size); // ---------------------------------------------------------------------- -// "Read" path; does not copy data if the MemorySource does not +// "Read" path; does not copy data if the input supports zero copy reads class ARROW_EXPORT RowBatchReader { public: - static Status Open( - MemorySource* source, int64_t position, std::shared_ptr* out); - - static Status Open(MemorySource* source, int64_t position, int max_recursion_depth, + static Status Open(io::ReadableFileInterface* file, int64_t position, std::shared_ptr* out); + static Status Open(io::ReadableFileInterface* file, int64_t position, + int max_recursion_depth, std::shared_ptr* out); + virtual ~RowBatchReader(); // Reassemble the row batch. A Schema is required to be able to construct the @@ -86,8 +91,8 @@ class ARROW_EXPORT RowBatchReader { const std::shared_ptr& schema, std::shared_ptr* out); private: - class Impl; - std::unique_ptr impl_; + class RowBatchReaderImpl; + std::unique_ptr impl_; }; } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 6740e0f..ca4d015 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -24,9 +24,11 @@ #include "gtest/gtest.h" +#include "arrow/io/memory.h" +#include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" -#include "arrow/ipc/memory.h" #include "arrow/ipc/test-common.h" +#include "arrow/ipc/util.h" #include "arrow/test-util.h" #include "arrow/types/list.h" @@ -49,17 +51,18 @@ const auto LIST_LIST_INT32 = std::make_shared(LIST_INT32); typedef Status MakeRowBatch(std::shared_ptr* out); class TestWriteRowBatch : public ::testing::TestWithParam, - public MemoryMapFixture { + public io::MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } - void TearDown() { MemoryMapFixture::TearDown(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } Status RoundTripHelper(const RowBatch& batch, int memory_map_size, std::shared_ptr* batch_result) { std::string path = "test-write-row-batch"; - MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); int64_t header_location; - RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location)); + + RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location)); std::shared_ptr reader; RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); @@ -69,7 +72,7 @@ class TestWriteRowBatch : public ::testing::TestWithParam, } protected: - std::shared_ptr mmap_; + std::shared_ptr mmap_; MemoryPool* pool_; }; @@ -276,12 +279,12 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch, &MakeStringTypesRowBatch, &MakeStruct)); void TestGetRowBatchSize(std::shared_ptr batch) { - MockMemorySource mock_source(1 << 16); + ipc::MockOutputStream mock; int64_t mock_header_location = -1; int64_t size = -1; - ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location)); + ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location)); ASSERT_OK(GetRowBatchSize(batch.get(), &size)); - ASSERT_EQ(mock_source.GetExtentBytesWritten(), size); + ASSERT_EQ(mock.GetExtentBytesWritten(), size); } TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) { @@ -303,10 +306,10 @@ TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) { TestGetRowBatchSize(batch); } -class RecursionLimits : public ::testing::Test, public MemoryMapFixture { +class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } - void TearDown() { MemoryMapFixture::TearDown(); } + void TearDown() { io::MemoryMapFixture::TearDown(); } Status WriteToMmap(int recursion_level, bool override_level, int64_t* header_out = nullptr, std::shared_ptr* schema_out = nullptr) { @@ -329,19 +332,19 @@ class RecursionLimits : public ::testing::Test, public MemoryMapFixture { std::string path = "test-write-past-max-recursion"; const int memory_map_size = 1 << 16; - MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); int64_t header_location; int64_t* header_out_param = header_out == nullptr ? &header_location : header_out; if (override_level) { return WriteRowBatch( - mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1); + mmap_.get(), batch.get(), header_out_param, recursion_level + 1); } else { - return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param); + return WriteRowBatch(mmap_.get(), batch.get(), header_out_param); } } protected: - std::shared_ptr mmap_; + std::shared_ptr mmap_; MemoryPool* pool_; }; http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc b/cpp/src/arrow/ipc/ipc-memory-test.cc deleted file mode 100644 index a2dbd35..0000000 --- a/cpp/src/arrow/ipc/ipc-memory-test.cc +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include - -#include "gtest/gtest.h" - -#include "arrow/ipc/memory.h" -#include "arrow/ipc/test-common.h" - -namespace arrow { -namespace ipc { - -class TestMemoryMappedSource : public ::testing::Test, public MemoryMapFixture { - public: - void TearDown() { MemoryMapFixture::TearDown(); } -}; - -TEST_F(TestMemoryMappedSource, InvalidUsages) {} - -TEST_F(TestMemoryMappedSource, WriteRead) { - const int64_t buffer_size = 1024; - std::vector buffer(buffer_size); - - test::random_bytes(1024, 0, buffer.data()); - - const int reps = 5; - - std::string path = "ipc-write-read-test"; - CreateFile(path, reps * buffer_size); - - std::shared_ptr result; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &result)); - - int64_t position = 0; - - std::shared_ptr out_buffer; - for (int i = 0; i < reps; ++i) { - ASSERT_OK(result->Write(position, buffer.data(), buffer_size)); - ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer)); - - ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); - - position += buffer_size; - } -} - -TEST_F(TestMemoryMappedSource, ReadOnly) { - const int64_t buffer_size = 1024; - std::vector buffer(buffer_size); - - test::random_bytes(1024, 0, buffer.data()); - - const int reps = 5; - - std::string path = "ipc-read-only-test"; - CreateFile(path, reps * buffer_size); - - std::shared_ptr rwmmap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap)); - - int64_t position = 0; - for (int i = 0; i < reps; ++i) { - ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size)); - - position += buffer_size; - } - rwmmap->Close(); - - std::shared_ptr rommap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); - - position = 0; - std::shared_ptr out_buffer; - for (int i = 0; i < reps; ++i) { - ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer)); - - ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size)); - position += buffer_size; - } - rommap->Close(); -} - -TEST_F(TestMemoryMappedSource, InvalidMode) { - const int64_t buffer_size = 1024; - std::vector buffer(buffer_size); - - test::random_bytes(1024, 0, buffer.data()); - - std::string path = "ipc-invalid-mode-test"; - CreateFile(path, buffer_size); - - std::shared_ptr rommap; - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap)); - - ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size)); -} - -TEST_F(TestMemoryMappedSource, InvalidFile) { - std::string non_existent_path = "invalid-file-name-asfd"; - - std::shared_ptr result; - ASSERT_RAISES(IOError, - MemoryMappedSource::Open(non_existent_path, MemorySource::READ_ONLY, &result)); -} - -} // namespace ipc -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc deleted file mode 100644 index a6c56d6..0000000 --- a/cpp/src/arrow/ipc/memory.cc +++ /dev/null @@ -1,182 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/memory.h" - -#include // For memory-mapping - -#include -#include -#include -#include -#include -#include -#include - -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" - -namespace arrow { -namespace ipc { - -MemorySource::MemorySource(AccessMode access_mode) : access_mode_(access_mode) {} - -MemorySource::~MemorySource() {} - -// Implement MemoryMappedSource - -class MemoryMappedSource::Impl { - public: - Impl() : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {} - - ~Impl() { - if (is_open_) { - munmap(data_, size_); - fclose(file_); - } - } - - Status Open(const std::string& path, MemorySource::AccessMode mode) { - if (is_open_) { return Status::IOError("A file is already open"); } - - int prot_flags = PROT_READ; - - if (mode == MemorySource::READ_WRITE) { - file_ = fopen(path.c_str(), "r+b"); - prot_flags |= PROT_WRITE; - is_writable_ = true; - } else { - file_ = fopen(path.c_str(), "rb"); - } - if (file_ == nullptr) { - std::stringstream ss; - ss << "Unable to open file, errno: " << errno; - return Status::IOError(ss.str()); - } - - fseek(file_, 0L, SEEK_END); - if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); } - size_ = ftell(file_); - - fseek(file_, 0L, SEEK_SET); - is_open_ = true; - - void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0); - if (result == MAP_FAILED) { - std::stringstream ss; - ss << "Memory mapping file failed, errno: " << errno; - return Status::IOError(ss.str()); - } - data_ = reinterpret_cast(result); - - return Status::OK(); - } - - int64_t size() const { return size_; } - - uint8_t* data() { return data_; } - - bool writable() { return is_writable_; } - - bool opened() { return is_open_; } - - private: - FILE* file_; - int64_t size_; - bool is_open_; - bool is_writable_; - - // The memory map - uint8_t* data_; -}; - -MemoryMappedSource::MemoryMappedSource(AccessMode access_mode) - : MemorySource(access_mode) {} - -Status MemoryMappedSource::Open(const std::string& path, AccessMode access_mode, - std::shared_ptr* out) { - std::shared_ptr result(new MemoryMappedSource(access_mode)); - - result->impl_.reset(new Impl()); - RETURN_NOT_OK(result->impl_->Open(path, access_mode)); - - *out = result; - return Status::OK(); -} - -int64_t MemoryMappedSource::Size() const { - return impl_->size(); -} - -Status MemoryMappedSource::Close() { - // munmap handled in ::Impl dtor - return Status::OK(); -} - -Status MemoryMappedSource::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - if (position < 0 || position >= impl_->size()) { - return Status::Invalid("position is out of bounds"); - } - - nbytes = std::min(nbytes, impl_->size() - position); - *out = std::make_shared(impl_->data() + position, nbytes); - return Status::OK(); -} - -Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { - if (!impl_->opened() || !impl_->writable()) { - return Status::IOError("Unable to write"); - } - if (position < 0 || position >= impl_->size()) { - return Status::Invalid("position is out of bounds"); - } - - // TODO(wesm): verify we are not writing past the end of the buffer - uint8_t* dst = impl_->data() + position; - memcpy(dst, data, nbytes); - - return Status::OK(); -} - -MockMemorySource::MockMemorySource(int64_t size) - : size_(size), extent_bytes_written_(0) {} - -Status MockMemorySource::Close() { - return Status::OK(); -} - -Status MockMemorySource::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - return Status::OK(); -} - -Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { - extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes); - return Status::OK(); -} - -int64_t MockMemorySource::Size() const { - return size_; -} - -int64_t MockMemorySource::GetExtentBytesWritten() const { - return extent_bytes_written_; -} - -} // namespace ipc -} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h deleted file mode 100644 index 377401d..0000000 --- a/cpp/src/arrow/ipc/memory.h +++ /dev/null @@ -1,150 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Public API for different interprocess memory sharing mechanisms - -#ifndef ARROW_IPC_MEMORY_H -#define ARROW_IPC_MEMORY_H - -#include -#include -#include - -#include "arrow/util/macros.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Buffer; -class MutableBuffer; -class Status; - -namespace ipc { - -// Abstract output stream -class OutputStream { - public: - virtual ~OutputStream() {} - // Close the output stream - virtual Status Close() = 0; - - // The current position in the output stream - virtual int64_t Tell() const = 0; - - // Write bytes to the stream - virtual Status Write(const uint8_t* data, int64_t length) = 0; -}; - -// An output stream that writes to a MutableBuffer, such as one obtained from a -// memory map -class BufferOutputStream : public OutputStream { - public: - explicit BufferOutputStream(const std::shared_ptr& buffer) - : buffer_(buffer) {} - - // Implement the OutputStream interface - Status Close() override; - int64_t Tell() const override; - Status Write(const uint8_t* data, int64_t length) override; - - // Returns the number of bytes remaining in the buffer - int64_t bytes_remaining() const; - - private: - std::shared_ptr buffer_; - int64_t capacity_; - int64_t position_; -}; - -class ARROW_EXPORT MemorySource { - public: - // Indicates the access permissions of the memory source - enum AccessMode { READ_ONLY, READ_WRITE }; - - virtual ~MemorySource(); - - // Retrieve a buffer of memory from the source of the indicates size and at - // the indicated location - // @returns: arrow::Status indicating success / failure. The buffer is set - // into the *out argument - virtual Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) = 0; - - virtual Status Close() = 0; - - virtual Status Write(int64_t position, const uint8_t* data, int64_t nbytes) = 0; - - // @return: the size in bytes of the memory source - virtual int64_t Size() const = 0; - - protected: - explicit MemorySource(AccessMode access_mode = AccessMode::READ_WRITE); - - AccessMode access_mode_; - - private: - DISALLOW_COPY_AND_ASSIGN(MemorySource); -}; - -// A memory source that uses memory-mapped files for memory interactions -class ARROW_EXPORT MemoryMappedSource : public MemorySource { - public: - static Status Open(const std::string& path, AccessMode access_mode, - std::shared_ptr* out); - - Status Close() override; - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - - Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override; - - // @return: the size in bytes of the memory source - int64_t Size() const override; - - private: - explicit MemoryMappedSource(AccessMode access_mode); - // Hide the internal details of this class for now - class Impl; - std::unique_ptr impl_; -}; - -// A MemorySource that tracks the size of allocations from a memory source -class MockMemorySource : public MemorySource { - public: - explicit MockMemorySource(int64_t size); - - Status Close() override; - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - - Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override; - - int64_t Size() const override; - - // @return: the smallest number of bytes containing the modified region of the - // MockMemorySource - int64_t GetExtentBytesWritten() const; - - private: - int64_t size_; - int64_t extent_bytes_written_; -}; - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_MEMORY_H http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 8cc902c..05e9c7a 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -220,9 +220,8 @@ static Status FieldToFlatbuffer( auto fb_children = fbb.CreateVector(children); // TODO: produce the list of VectorTypes - *offset = flatbuf::CreateField( - fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary, - fb_children); + *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data, + field->dictionary, fb_children); return Status::OK(); } @@ -295,8 +294,8 @@ Status WriteDataHeader(int32_t length, int64_t body_length, } Status MessageBuilder::Finish() { - auto message = flatbuf::CreateMessage(fbb_, kMetadataVersion, - header_type_, header_, body_length_); + auto message = + flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_); fbb_.Finish(message); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index db9a83f..d38df84 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -38,7 +38,7 @@ class Status; namespace ipc { static constexpr flatbuf::MetadataVersion kMetadataVersion = - flatbuf::MetadataVersion_V1_SNAPSHOT; + flatbuf::MetadataVersion_V1_SNAPSHOT; Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr* out); http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 838a4a6..d5ec533 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -23,6 +23,8 @@ #include #include +#include "arrow/util/visibility.h" + namespace arrow { class Buffer; @@ -36,6 +38,7 @@ namespace ipc { // Message read/write APIs // Serialize arrow::Schema as a Flatbuffer +ARROW_EXPORT Status WriteSchema(const Schema* schema, std::shared_ptr* out); //---------------------------------------------------------------------- @@ -47,7 +50,7 @@ Status WriteSchema(const Schema* schema, std::shared_ptr* out); class Message; // Container for serialized Schema metadata contained in an IPC message -class SchemaMessage { +class ARROW_EXPORT SchemaMessage { public: // Accepts an opaque flatbuffer pointer SchemaMessage(const std::shared_ptr& message, const void* schema); @@ -82,7 +85,7 @@ struct BufferMetadata { }; // Container for serialized record batch metadata contained in an IPC message -class RecordBatchMessage { +class ARROW_EXPORT RecordBatchMessage { public: // Accepts an opaque flatbuffer pointer RecordBatchMessage(const std::shared_ptr& message, const void* batch_meta); @@ -102,13 +105,13 @@ class RecordBatchMessage { std::unique_ptr impl_; }; -class DictionaryBatchMessage { +class ARROW_EXPORT DictionaryBatchMessage { public: int64_t id() const; std::unique_ptr data() const; }; -class Message : public std::enable_shared_from_this { +class ARROW_EXPORT Message : public std::enable_shared_from_this { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };