arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-293: [C++] Implement Arrow IO interfaces for operating system files
Date Tue, 27 Sep 2016 13:45:12 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 32fd692f3 -> 45d88328d


ARROW-293: [C++] Implement Arrow IO interfaces for operating system files

I started with the code I put together previously for Feather and conformed it to the `arrow::io`
API. There's a bunch of Windows compatibility stuff; I left this until we add CI for Windows
and can sort this out.

We should also refactor the memory mapped file interfaces to be based on this common code
(see ARROW-294).

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #146 from wesm/ARROW-293 and squashes the following commits:

a2653b7 [Wes McKinney] cpplint
d56ef06 [Wes McKinney] Test the rest of ReadableFile methods
43126ca [Wes McKinney] Drafting OS file IO implementations based on Feather implementation.
Work on test suite


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/45d88328
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/45d88328
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/45d88328

Branch: refs/heads/master
Commit: 45d88328dd73a331b8099c07dc1332cc585ff8d2
Parents: 32fd692
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Sep 27 09:45:05 2016 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Sep 27 09:45:05 2016 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                    |   2 +-
 cpp/src/arrow/io/CMakeLists.txt       |   6 +
 cpp/src/arrow/io/file.cc              | 485 +++++++++++++++++++++++++++++
 cpp/src/arrow/io/file.h               |  96 ++++++
 cpp/src/arrow/io/io-file-test.cc      | 290 +++++++++++++++++
 cpp/src/arrow/io/libhdfs_shim.cc      |   2 +-
 cpp/src/arrow/io/memory.h             |   2 +-
 cpp/src/arrow/io/mman.h               | 189 +++++++++++
 cpp/src/arrow/ipc/adapter.cc          |   4 +-
 cpp/src/arrow/ipc/file.cc             |   2 +-
 cpp/src/arrow/types/primitive-test.cc |   3 +-
 cpp/src/arrow/util/logging.h          |   6 +-
 cpp/src/arrow/util/memory-pool.cc     |   4 +-
 cpp/src/arrow/util/status-test.cc     |   2 +-
 14 files changed, 1080 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index f3f4a7d..d65c715 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -570,7 +570,7 @@ if (UNIX)
   add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
   --verbose=2
   --linelength=90
-  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references
+  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references,-build/include_order
   ${FILTERED_LINT_FILES})
 endif (UNIX)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 87e227e..d2e3491 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -38,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS
   ${ARROW_IO_PRIVATE_LINK_LIBS})
 
 set(ARROW_IO_SRCS
+  file.cc
   memory.cc
 )
 
@@ -103,12 +104,17 @@ if (APPLE)
     INSTALL_NAME_DIR "@rpath")
 endif()
 
+ADD_ARROW_TEST(io-file-test)
+ARROW_TEST_LINK_LIBRARIES(io-file-test
+  ${ARROW_IO_TEST_LINK_LIBS})
+
 ADD_ARROW_TEST(io-memory-test)
 ARROW_TEST_LINK_LIBRARIES(io-memory-test
   ${ARROW_IO_TEST_LINK_LIBS})
 
 # Headers: top level
 install(FILES
+  file.h
   hdfs.h
   interfaces.h
   memory.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
new file mode 100644
index 0000000..87bae7f
--- /dev/null
+++ b/cpp/src/arrow/io/file.cc
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Ensure 64-bit off_t for platforms where it matters
+#ifdef _FILE_OFFSET_BITS
+#undef _FILE_OFFSET_BITS
+#endif
+
+#define _FILE_OFFSET_BITS 64
+
+#include "arrow/io/file.h"
+
+#if _WIN32 || _WIN64
+#if _WIN64
+#define ENVIRONMENT64
+#else
+#define ENVIRONMENT32
+#endif
+#endif
+
+// sys/mman.h not present in Visual Studio or Cygwin
+#ifdef _WIN32
+#ifndef NOMINMAX
+#define NOMINMAX
+#endif
+#include "arrow/io/mman.h"
+#undef Realloc
+#undef Free
+#include <windows.h>
+#else
+#include <sys/mman.h>
+#endif
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#ifndef _MSC_VER  // POSIX-like platforms
+
+#include <unistd.h>
+
+// Not available on some platforms
+#ifndef errno_t
+#define errno_t int
+#endif
+
+#endif  // _MSC_VER
+
+// defines that
+#if defined(__MINGW32__)
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
+#elif defined(_MSC_VER)  // Visual Studio
+
+#else  // gcc / clang on POSIX platforms
+#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
+#endif
+
+// ----------------------------------------------------------------------
+// C++ standard library
+
+#include <algorithm>
+#include <cstring>
+#include <iostream>
+#include <limits>
+#include <sstream>
+#include <vector>
+
+#if defined(_MSC_VER)
+#include <codecvt>
+#include <locale>
+#endif
+
+// ----------------------------------------------------------------------
+// file compatibility stuff
+
+#if defined(__MINGW32__)  // MinGW
+// nothing
+#elif defined(_MSC_VER)  // Visual Studio
+#include <io.h>
+#else  // POSIX / Linux
+// nothing
+#endif
+
+#include <cstdio>
+
+// POSIX systems do not have this
+#ifndef O_BINARY
+#define O_BINARY 0
+#endif
+
+// ----------------------------------------------------------------------
+// Other Arrow includes
+
+#include "arrow/io/interfaces.h"
+
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+// ----------------------------------------------------------------------
+// Cross-platform file compatability layer
+
+static inline Status CheckOpenResult(
+    int ret, int errno_actual, const char* filename, size_t filename_length) {
+  if (ret == -1) {
+    // TODO: errno codes to strings
+    std::stringstream ss;
+    ss << "Failed to open file: ";
+#if defined(_MSC_VER)
+    // using wchar_t
+
+    // this requires c++11
+    std::wstring_convert<std::codecvt_utf8<wchar_t>, wchar_t> converter;
+    std::wstring wide_string(
+        reinterpret_cast<const wchar_t*>(filename), filename_length / sizeof(wchar_t));
+    std::string byte_string = converter.to_bytes(wide_string);
+    ss << byte_string;
+#else
+    ss << filename;
+#endif
+    return Status::IOError(ss.str());
+  }
+  return Status::OK();
+}
+
+#define CHECK_LSEEK(retval) \
+  if ((retval) == -1) return Status::IOError("lseek failed");
+
+static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) {
+#if defined(_MSC_VER)
+  return _lseeki64(fd, pos, whence);
+#else
+  return lseek(fd, pos, whence);
+#endif
+}
+
+static inline Status FileOpenReadable(const std::string& filename, int* fd) {
+  int ret;
+  errno_t errno_actual = 0;
+#if defined(_MSC_VER)
+  // https://msdn.microsoft.com/en-us/library/w64k0ytk.aspx
+
+  // See GH #209. Here we are assuming that the filename has been encoded in
+  // utf-16le so that unicode filenames can be supported
+  const int nwchars = static_cast<int>(filename.size()) / sizeof(wchar_t);
+  std::vector<wchar_t> wpath(nwchars + 1);
+  memcpy(wpath.data(), filename.data(), filename.size());
+  memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));
+
+  errno_actual = _wsopen_s(fd, wpath.data(), _O_RDONLY | _O_BINARY, _SH_DENYNO, _S_IREAD);
+  ret = *fd;
+#else
+  ret = *fd = open(filename.c_str(), O_RDONLY | O_BINARY);
+  errno_actual = errno;
+#endif
+
+  return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
+}
+
+static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
+  int ret;
+  errno_t errno_actual = 0;
+
+#if defined(_MSC_VER)
+  // https://msdn.microsoft.com/en-us/library/w64k0ytk.aspx
+  // Same story with wchar_t as above
+  const int nwchars = static_cast<int>(filename.size()) / sizeof(wchar_t);
+  std::vector<wchar_t> wpath(nwchars + 1);
+  memcpy(wpath.data(), filename.data(), filename.size());
+  memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));
+
+  errno_actual = _wsopen_s(
+      fd, wpath.data(), _O_WRONLY | _O_CREAT | _O_BINARY, _SH_DENYNO, _S_IWRITE);
+  ret = *fd;
+
+#else
+  ret = *fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_BINARY, ARROW_WRITE_SHMODE);
+#endif
+  return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
+}
+
+static inline Status FileTell(int fd, int64_t* pos) {
+  int64_t current_pos;
+
+#if defined(_MSC_VER)
+  current_pos = _telli64(fd);
+  if (current_pos == -1) { return Status::IOError("_telli64 failed"); }
+#else
+  current_pos = lseek64_compat(fd, 0, SEEK_CUR);
+  CHECK_LSEEK(current_pos);
+#endif
+
+  *pos = current_pos;
+  return Status::OK();
+}
+
+static inline Status FileSeek(int fd, int64_t pos) {
+  int64_t ret = lseek64_compat(fd, pos, SEEK_SET);
+  CHECK_LSEEK(ret);
+  return Status::OK();
+}
+
+static inline Status FileRead(
+    int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
+#if defined(_MSC_VER)
+  if (nbytes > INT32_MAX) { return Status::IOError("Unable to read > 2GB blocks yet");
}
+  *bytes_read = _read(fd, buffer, static_cast<unsigned int>(nbytes));
+#else
+  *bytes_read = read(fd, buffer, nbytes);
+#endif
+
+  if (*bytes_read == -1) {
+    // TODO(wesm): errno to string
+    return Status::IOError("Error reading bytes from file");
+  }
+
+  return Status::OK();
+}
+
+static inline Status FileWrite(int fd, const uint8_t* buffer, int64_t nbytes) {
+  int ret;
+#if defined(_MSC_VER)
+  if (nbytes > INT32_MAX) {
+    return Status::IOError("Unable to write > 2GB blocks to file yet");
+  }
+  ret = _write(fd, buffer, static_cast<unsigned int>(nbytes));
+#else
+  ret = write(fd, buffer, nbytes);
+#endif
+
+  if (ret == -1) {
+    // TODO(wesm): errno to string
+    return Status::IOError("Error writing bytes to file");
+  }
+  return Status::OK();
+}
+
+static inline Status FileGetSize(int fd, int64_t* size) {
+  int64_t ret;
+
+  // Save current position
+  int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR);
+  CHECK_LSEEK(current_position);
+
+  // move to end of the file
+  ret = lseek64_compat(fd, 0, SEEK_END);
+  CHECK_LSEEK(ret);
+
+  // Get file length
+  ret = lseek64_compat(fd, 0, SEEK_CUR);
+  CHECK_LSEEK(ret);
+
+  *size = ret;
+
+  // Restore file position
+  ret = lseek64_compat(fd, current_position, SEEK_SET);
+  CHECK_LSEEK(ret);
+
+  return Status::OK();
+}
+
+static inline Status FileClose(int fd) {
+  int ret;
+
+#if defined(_MSC_VER)
+  ret = _close(fd);
+#else
+  ret = close(fd);
+#endif
+
+  if (ret == -1) { return Status::IOError("error closing file"); }
+  return Status::OK();
+}
+
+class OSFile {
+ public:
+  OSFile() : fd_(-1), is_open_(false), size_(-1) {}
+
+  ~OSFile() {}
+
+  Status OpenWritable(const std::string& path) {
+    RETURN_NOT_OK(FileOpenWriteable(path, &fd_));
+    path_ = path;
+    is_open_ = true;
+    return Status::OK();
+  }
+
+  Status OpenReadable(const std::string& path) {
+    RETURN_NOT_OK(FileOpenReadable(path, &fd_));
+    RETURN_NOT_OK(FileGetSize(fd_, &size_));
+
+    // The position should be 0 after GetSize
+    // RETURN_NOT_OK(Seek(0));
+
+    path_ = path;
+    is_open_ = true;
+    return Status::OK();
+  }
+
+  Status Close() {
+    if (is_open_) {
+      RETURN_NOT_OK(FileClose(fd_));
+      is_open_ = false;
+    }
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+    return FileRead(fd_, out, nbytes, bytes_read);
+  }
+
+  Status Seek(int64_t pos) {
+    if (pos > size_) { pos = size_; }
+    return FileSeek(fd_, pos);
+  }
+
+  Status Tell(int64_t* pos) const { return FileTell(fd_, pos); }
+
+  Status Write(const uint8_t* data, int64_t length) {
+    if (length < 0) { return Status::IOError("Length must be non-negative"); }
+    return FileWrite(fd_, data, length);
+  }
+
+  int fd() const { return fd_; }
+
+  bool is_open() const { return is_open_; }
+  const std::string& path() const { return path_; }
+
+  int64_t size() const { return size_; }
+
+ private:
+  std::string path_;
+
+  // File descriptor
+  int fd_;
+
+  bool is_open_;
+  int64_t size_;
+};
+
+// ----------------------------------------------------------------------
+// ReadableFile implementation
+
+class ReadableFile::ReadableFileImpl : public OSFile {
+ public:
+  explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {}
+
+  Status Open(const std::string& path) { return OpenReadable(path); }
+
+  Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    auto buffer = std::make_shared<PoolBuffer>(pool_);
+    RETURN_NOT_OK(buffer->Resize(nbytes));
+
+    int64_t bytes_read = 0;
+    RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
+
+    // XXX: heuristic
+    if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }
+
+    *out = buffer;
+    return Status::OK();
+  }
+
+ private:
+  MemoryPool* pool_;
+};
+
+ReadableFile::ReadableFile(MemoryPool* pool) {
+  impl_.reset(new ReadableFileImpl(pool));
+}
+
+ReadableFile::~ReadableFile() {
+  impl_->Close();
+}
+
+Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>*
file) {
+  *file = std::shared_ptr<ReadableFile>(new ReadableFile(default_memory_pool()));
+  return (*file)->impl_->Open(path);
+}
+
+Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
+    std::shared_ptr<ReadableFile>* file) {
+  *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool));
+  return (*file)->impl_->Open(path);
+}
+
+Status ReadableFile::Close() {
+  return impl_->Close();
+}
+
+Status ReadableFile::Tell(int64_t* pos) {
+  return impl_->Tell(pos);
+}
+
+Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  return impl_->Read(nbytes, bytes_read, out);
+}
+
+Status ReadableFile::ReadAt(
+    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  RETURN_NOT_OK(Seek(position));
+  return impl_->Read(nbytes, bytes_read, out);
+}
+
+Status ReadableFile::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(Seek(position));
+  return impl_->ReadBuffer(nbytes, out);
+}
+
+Status ReadableFile::GetSize(int64_t* size) {
+  *size = impl_->size();
+  return Status::OK();
+}
+
+Status ReadableFile::Seek(int64_t pos) {
+  return impl_->Seek(pos);
+}
+
+bool ReadableFile::supports_zero_copy() const {
+  return false;
+}
+
+int ReadableFile::file_descriptor() const {
+  return impl_->fd();
+}
+
+// ----------------------------------------------------------------------
+// FileOutputStream
+
+class FileOutputStream::FileOutputStreamImpl : public OSFile {
+ public:
+  Status Open(const std::string& path) { return OpenWritable(path); }
+};
+
+FileOutputStream::FileOutputStream() {
+  impl_.reset(new FileOutputStreamImpl());
+}
+
+FileOutputStream::~FileOutputStream() {
+  impl_->Close();
+}
+
+Status FileOutputStream::Open(
+    const std::string& path, std::shared_ptr<FileOutputStream>* file) {
+  // private ctor
+  *file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
+  return (*file)->impl_->Open(path);
+}
+
+Status FileOutputStream::Close() {
+  return impl_->Close();
+}
+
+Status FileOutputStream::Tell(int64_t* pos) {
+  return impl_->Tell(pos);
+}
+
+Status FileOutputStream::Write(const uint8_t* data, int64_t length) {
+  return impl_->Write(data, length);
+}
+
+int FileOutputStream::file_descriptor() const {
+  return impl_->fd();
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
new file mode 100644
index 0000000..5e714ea
--- /dev/null
+++ b/cpp/src/arrow/io/file.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// IO interface implementations for OS files
+
+#ifndef ARROW_IO_FILE_H
+#define ARROW_IO_FILE_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class MemoryPool;
+class Status;
+
+namespace io {
+
+class ARROW_EXPORT FileOutputStream : public OutputStream {
+ public:
+  ~FileOutputStream();
+
+  static Status Open(const std::string& path, std::shared_ptr<FileOutputStream>*
file);
+
+  // OutputStream interface
+  Status Close() override;
+  Status Tell(int64_t* position) override;
+  Status Write(const uint8_t* data, int64_t nbytes) override;
+
+  int file_descriptor() const;
+
+ private:
+  FileOutputStream();
+
+  class ARROW_NO_EXPORT FileOutputStreamImpl;
+  std::unique_ptr<FileOutputStreamImpl> impl_;
+};
+
+// Operating system file
+class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
+ public:
+  ~ReadableFile();
+
+  // Open file, allocate memory (if needed) from default memory pool
+  static Status Open(const std::string& path, std::shared_ptr<ReadableFile>* file);
+
+  // Open file with one's own memory pool for memory allocations
+  static Status Open(const std::string& path, MemoryPool* memory_pool,
+      std::shared_ptr<ReadableFile>* file);
+
+  Status Close() override;
+  Status Tell(int64_t* position) override;
+
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+  Status GetSize(int64_t* size) override;
+  Status Seek(int64_t position) override;
+
+  bool supports_zero_copy() const override;
+
+  int file_descriptor() const;
+
+ private:
+  explicit ReadableFile(MemoryPool* pool);
+
+  class ARROW_NO_EXPORT ReadableFileImpl;
+  std::unique_ptr<ReadableFileImpl> impl_;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_FILE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
new file mode 100644
index 0000000..cde769f
--- /dev/null
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <fcntl.h>
+#include <fstream>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/io/file.h"
+#include "arrow/io/test-common.h"
+#include "arrow/util/memory-pool.h"
+
+namespace arrow {
+namespace io {
+
+static bool FileExists(const std::string& path) {
+  return std::ifstream(path.c_str()).good();
+}
+
+static bool FileIsClosed(int fd) {
+  if (-1 != fcntl(fd, F_GETFD)) { return false; }
+  return errno == EBADF;
+}
+
+class FileTestFixture : public ::testing::Test {
+ public:
+  void SetUp() {
+    path_ = "arrow-test-io-file-output-stream.txt";
+    EnsureFileDeleted();
+  }
+
+  void TearDown() { EnsureFileDeleted(); }
+
+  void EnsureFileDeleted() {
+    if (FileExists(path_)) { std::remove(path_.c_str()); }
+  }
+
+ protected:
+  std::string path_;
+};
+
+// ----------------------------------------------------------------------
+// File output tests
+
+class TestFileOutputStream : public FileTestFixture {
+ public:
+  void OpenFile() { ASSERT_OK(FileOutputStream::Open(path_, &file_)); }
+
+ protected:
+  std::shared_ptr<FileOutputStream> file_;
+};
+
+TEST_F(TestFileOutputStream, DestructorClosesFile) {
+  int fd;
+  {
+    std::shared_ptr<FileOutputStream> file;
+    ASSERT_OK(FileOutputStream::Open(path_, &file));
+    fd = file->file_descriptor();
+  }
+  ASSERT_TRUE(FileIsClosed(fd));
+}
+
+TEST_F(TestFileOutputStream, Close) {
+  OpenFile();
+
+  const char* data = "testdata";
+  ASSERT_OK(file_->Write(reinterpret_cast<const uint8_t*>(data), strlen(data)));
+
+  int fd = file_->file_descriptor();
+  file_->Close();
+
+  ASSERT_TRUE(FileIsClosed(fd));
+
+  // Idempotent
+  file_->Close();
+
+  std::shared_ptr<ReadableFile> rd_file;
+  ASSERT_OK(ReadableFile::Open(path_, &rd_file));
+
+  int64_t size = 0;
+  ASSERT_OK(rd_file->GetSize(&size));
+  ASSERT_EQ(strlen(data), size);
+}
+
+TEST_F(TestFileOutputStream, InvalidWrites) {
+  OpenFile();
+
+  const char* data = "";
+
+  ASSERT_RAISES(IOError, file_->Write(reinterpret_cast<const uint8_t*>(data), -1));
+}
+
+TEST_F(TestFileOutputStream, Tell) {
+  OpenFile();
+
+  int64_t position;
+
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(0, position);
+
+  const char* data = "testdata";
+  ASSERT_OK(file_->Write(reinterpret_cast<const uint8_t*>(data), 8));
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(8, position);
+}
+
+// ----------------------------------------------------------------------
+// File input tests
+
+class TestReadableFile : public FileTestFixture {
+ public:
+  void OpenFile() { ASSERT_OK(ReadableFile::Open(path_, &file_)); }
+
+  void MakeTestFile() {
+    std::string data = "testdata";
+    std::ofstream stream;
+    stream.open(path_.c_str());
+    stream << data;
+  }
+
+ protected:
+  std::shared_ptr<ReadableFile> file_;
+};
+
+TEST_F(TestReadableFile, DestructorClosesFile) {
+  MakeTestFile();
+
+  int fd;
+  {
+    std::shared_ptr<ReadableFile> file;
+    ASSERT_OK(ReadableFile::Open(path_, &file));
+    fd = file->file_descriptor();
+  }
+  ASSERT_TRUE(FileIsClosed(fd));
+}
+
+TEST_F(TestReadableFile, Close) {
+  MakeTestFile();
+  OpenFile();
+
+  int fd = file_->file_descriptor();
+  file_->Close();
+
+  ASSERT_TRUE(FileIsClosed(fd));
+
+  // Idempotent
+  file_->Close();
+}
+
+TEST_F(TestReadableFile, SeekTellSize) {
+  MakeTestFile();
+  OpenFile();
+
+  int64_t position;
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(0, position);
+
+  ASSERT_OK(file_->Seek(4));
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(4, position);
+
+  ASSERT_OK(file_->Seek(100));
+  ASSERT_OK(file_->Tell(&position));
+
+  // now at EOF
+  ASSERT_EQ(8, position);
+
+  int64_t size;
+  ASSERT_OK(file_->GetSize(&size));
+  ASSERT_EQ(8, size);
+
+  // does not support zero copy
+  ASSERT_FALSE(file_->supports_zero_copy());
+}
+
+TEST_F(TestReadableFile, Read) {
+  uint8_t buffer[50];
+
+  MakeTestFile();
+  OpenFile();
+
+  int64_t bytes_read;
+  ASSERT_OK(file_->Read(4, &bytes_read, buffer));
+  ASSERT_EQ(4, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
+
+  ASSERT_OK(file_->Read(10, &bytes_read, buffer));
+  ASSERT_EQ(4, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
+}
+
+TEST_F(TestReadableFile, ReadAt) {
+  uint8_t buffer[50];
+  const char* test_data = "testdata";
+
+  MakeTestFile();
+  OpenFile();
+
+  int64_t bytes_read;
+  int64_t position;
+
+  ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer));
+  ASSERT_EQ(4, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, "test", 4));
+
+  // position advanced
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(4, position);
+
+  ASSERT_OK(file_->ReadAt(4, 10, &bytes_read, buffer));
+  ASSERT_EQ(4, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, "data", 4));
+
+  // position advanced to EOF
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(8, position);
+
+  // Check buffer API
+  std::shared_ptr<Buffer> buffer2;
+
+  ASSERT_OK(file_->ReadAt(0, 4, &buffer2));
+  ASSERT_EQ(4, buffer2->size());
+
+  Buffer expected(reinterpret_cast<const uint8_t*>(test_data), 4);
+  ASSERT_TRUE(buffer2->Equals(expected));
+
+  // position advanced
+  ASSERT_OK(file_->Tell(&position));
+  ASSERT_EQ(4, position);
+}
+
+TEST_F(TestReadableFile, NonExistentFile) {
+  ASSERT_RAISES(IOError, ReadableFile::Open("0xDEADBEEF.txt", &file_));
+}
+
+class MyMemoryPool : public MemoryPool {
+ public:
+  MyMemoryPool() : num_allocations_(0) {}
+
+  Status Allocate(int64_t size, uint8_t** out) override {
+    *out = reinterpret_cast<uint8_t*>(std::malloc(size));
+    ++num_allocations_;
+    return Status::OK();
+  }
+
+  void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); }
+
+  int64_t bytes_allocated() const override { return -1; }
+
+  int64_t num_allocations() const { return num_allocations_; }
+
+ private:
+  int64_t num_allocations_;
+};
+
+TEST_F(TestReadableFile, CustomMemoryPool) {
+  MakeTestFile();
+
+  MyMemoryPool pool;
+  ASSERT_OK(ReadableFile::Open(path_, &pool, &file_));
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_OK(file_->ReadAt(0, 4, &buffer));
+  ASSERT_OK(file_->ReadAt(4, 8, &buffer));
+
+  ASSERT_EQ(2, pool.num_allocations());
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
index 0b805ab..f256c31 100644
--- a/cpp/src/arrow/io/libhdfs_shim.cc
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -33,8 +33,8 @@
 #ifndef _WIN32
 #include <dlfcn.h>
 #else
-#include <winsock2.h>
 #include <windows.h>
+#include <winsock2.h>
 
 // TODO(wesm): address when/if we add windows support
 // #include <util/syserr_reporting.hpp>

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 51601a0..6989d73 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -94,7 +94,7 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
   Status WriteInternal(const uint8_t* data, int64_t nbytes);
 
   // Hide the internal details of this class for now
-  class MemoryMappedFileImpl;
+  class ARROW_NO_EXPORT MemoryMappedFileImpl;
   std::unique_ptr<MemoryMappedFileImpl> impl_;
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/io/mman.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/mman.h b/cpp/src/arrow/io/mman.h
new file mode 100644
index 0000000..00d1f93
--- /dev/null
+++ b/cpp/src/arrow/io/mman.h
@@ -0,0 +1,189 @@
+// Copyright https://code.google.com/p/mman-win32/
+//
+// Licensed under the MIT License;
+// You may obtain a copy of the License at
+//
+// https://opensource.org/licenses/MIT
+
+#ifndef _MMAN_WIN32_H
+#define _MMAN_WIN32_H
+
+// Allow use of features specific to Windows XP or later.
+#ifndef _WIN32_WINNT
+// Change this to the appropriate value to target other versions of Windows.
+#define _WIN32_WINNT 0x0501
+
+#endif
+
+#include <errno.h>
+#include <io.h>
+#include <sys/types.h>
+#include <windows.h>
+
+#define PROT_NONE 0
+#define PROT_READ 1
+#define PROT_WRITE 2
+#define PROT_EXEC 4
+
+#define MAP_FILE 0
+#define MAP_SHARED 1
+#define MAP_PRIVATE 2
+#define MAP_TYPE 0xf
+#define MAP_FIXED 0x10
+#define MAP_ANONYMOUS 0x20
+#define MAP_ANON MAP_ANONYMOUS
+
+#define MAP_FAILED ((void*)-1)
+
+/* Flags for msync. */
+#define MS_ASYNC 1
+#define MS_SYNC 2
+#define MS_INVALIDATE 4
+
+#ifndef FILE_MAP_EXECUTE
+#define FILE_MAP_EXECUTE 0x0020
+#endif
+
+static int __map_mman_error(const DWORD err, const int deferr) {
+  if (err == 0) return 0;
+  // TODO: implement
+  return err;
+}
+
+static DWORD __map_mmap_prot_page(const int prot) {
+  DWORD protect = 0;
+
+  if (prot == PROT_NONE) return protect;
+
+  if ((prot & PROT_EXEC) != 0) {
+    protect = ((prot & PROT_WRITE) != 0) ? PAGE_EXECUTE_READWRITE : PAGE_EXECUTE_READ;
+  } else {
+    protect = ((prot & PROT_WRITE) != 0) ? PAGE_READWRITE : PAGE_READONLY;
+  }
+
+  return protect;
+}
+
+static DWORD __map_mmap_prot_file(const int prot) {
+  DWORD desiredAccess = 0;
+
+  if (prot == PROT_NONE) return desiredAccess;
+
+  if ((prot & PROT_READ) != 0) desiredAccess |= FILE_MAP_READ;
+  if ((prot & PROT_WRITE) != 0) desiredAccess |= FILE_MAP_WRITE;
+  if ((prot & PROT_EXEC) != 0) desiredAccess |= FILE_MAP_EXECUTE;
+
+  return desiredAccess;
+}
+
+void* mmap(void* addr, size_t len, int prot, int flags, int fildes, off_t off) {
+  HANDLE fm, h;
+
+  void* map = MAP_FAILED;
+
+#ifdef _MSC_VER
+#pragma warning(push)
+#pragma warning(disable : 4293)
+#endif
+
+  const DWORD dwFileOffsetLow =
+      (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)off : (DWORD)(off & 0xFFFFFFFFL);
+  const DWORD dwFileOffsetHigh =
+      (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)0 : (DWORD)((off >> 32) & 0xFFFFFFFFL);
+  const DWORD protect = __map_mmap_prot_page(prot);
+  const DWORD desiredAccess = __map_mmap_prot_file(prot);
+
+  const off_t maxSize = off + (off_t)len;
+
+  const DWORD dwMaxSizeLow =
+      (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)maxSize : (DWORD)(maxSize & 0xFFFFFFFFL);
+  const DWORD dwMaxSizeHigh = (sizeof(off_t) <= sizeof(DWORD))
+                                  ? (DWORD)0
+                                  : (DWORD)((maxSize >> 32) & 0xFFFFFFFFL);
+
+#ifdef _MSC_VER
+#pragma warning(pop)
+#endif
+
+  errno = 0;
+
+  if (len == 0
+      /* Unsupported flag combinations */
+      || (flags & MAP_FIXED) != 0
+      /* Usupported protection combinations */
+      || prot == PROT_EXEC) {
+    errno = EINVAL;
+    return MAP_FAILED;
+  }
+
+  h = ((flags & MAP_ANONYMOUS) == 0) ? (HANDLE)_get_osfhandle(fildes)
+                                     : INVALID_HANDLE_VALUE;
+
+  if ((flags & MAP_ANONYMOUS) == 0 && h == INVALID_HANDLE_VALUE) {
+    errno = EBADF;
+    return MAP_FAILED;
+  }
+
+  fm = CreateFileMapping(h, NULL, protect, dwMaxSizeHigh, dwMaxSizeLow, NULL);
+
+  if (fm == NULL) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    return MAP_FAILED;
+  }
+
+  map = MapViewOfFile(fm, desiredAccess, dwFileOffsetHigh, dwFileOffsetLow, len);
+
+  CloseHandle(fm);
+
+  if (map == NULL) {
+    errno = __map_mman_error(GetLastError(), EPERM);
+    return MAP_FAILED;
+  }
+
+  return map;
+}
+
+int munmap(void* addr, size_t len) {
+  if (UnmapViewOfFile(addr)) return 0;
+
+  errno = __map_mman_error(GetLastError(), EPERM);
+
+  return -1;
+}
+
+int mprotect(void* addr, size_t len, int prot) {
+  DWORD newProtect = __map_mmap_prot_page(prot);
+  DWORD oldProtect = 0;
+
+  if (VirtualProtect(addr, len, newProtect, &oldProtect)) return 0;
+
+  errno = __map_mman_error(GetLastError(), EPERM);
+
+  return -1;
+}
+
+int msync(void* addr, size_t len, int flags) {
+  if (FlushViewOfFile(addr, len)) return 0;
+
+  errno = __map_mman_error(GetLastError(), EPERM);
+
+  return -1;
+}
+
+int mlock(const void* addr, size_t len) {
+  if (VirtualLock((LPVOID)addr, len)) return 0;
+
+  errno = __map_mman_error(GetLastError(), EPERM);
+
+  return -1;
+}
+
+int munlock(const void* addr, size_t len) {
+  if (VirtualUnlock((LPVOID)addr, len)) return 0;
+
+  errno = __map_mman_error(GetLastError(), EPERM);
+
+  return -1;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 89b7fb9..99974a4 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -23,12 +23,12 @@
 #include <vector>
 
 #include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
 #include "arrow/ipc/Message_generated.h"
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
 #include "arrow/schema.h"
 #include "arrow/table.h"
 #include "arrow/type.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index 2bf10dd..c68244d 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -22,10 +22,10 @@
 #include <sstream>
 #include <vector>
 
+#include "arrow/io/interfaces.h"
 #include "arrow/ipc/adapter.h"
 #include "arrow/ipc/metadata.h"
 #include "arrow/ipc/util.h"
-#include "arrow/io/interfaces.h"
 #include "arrow/util/buffer.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/status.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/types/primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc
index 87eb0fe..ffebb92 100644
--- a/cpp/src/arrow/types/primitive-test.cc
+++ b/cpp/src/arrow/types/primitive-test.cc
@@ -238,7 +238,8 @@ void TestPrimitiveBuilder<PBoolean>::Check(
 }
 
 typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
-    PInt32, PInt64, PFloat, PDouble> Primitives;
+    PInt32, PInt64, PFloat, PDouble>
+    Primitives;
 
 TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index d320d6a..b22f07d 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -117,10 +117,10 @@ class CerrLog {
 // return so we create a new class to give it a hint.
 class FatalLog : public CerrLog {
  public:
-  FatalLog(int /* severity */)  // NOLINT
-      : CerrLog(ARROW_FATAL) {}
+  explicit FatalLog(int /* severity */)  // NOLINT
+      : CerrLog(ARROW_FATAL){}           // NOLINT
 
-  [[noreturn]] ~FatalLog() {
+            [[noreturn]] ~FatalLog() {
     if (has_logged_) { std::cerr << std::endl; }
     std::exit(1);
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/util/memory-pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/memory-pool.cc b/cpp/src/arrow/util/memory-pool.cc
index fed149b..9f83afe 100644
--- a/cpp/src/arrow/util/memory-pool.cc
+++ b/cpp/src/arrow/util/memory-pool.cc
@@ -17,13 +17,13 @@
 
 #include "arrow/util/memory-pool.h"
 
-#include <stdlib.h>
 #include <cstdlib>
 #include <mutex>
 #include <sstream>
+#include <stdlib.h>
 
-#include "arrow/util/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/status.h"
 
 namespace arrow {
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/45d88328/cpp/src/arrow/util/status-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/status-test.cc b/cpp/src/arrow/util/status-test.cc
index 45e0ff3..e0ff20f 100644
--- a/cpp/src/arrow/util/status-test.cc
+++ b/cpp/src/arrow/util/status-test.cc
@@ -17,8 +17,8 @@
 
 #include "gtest/gtest.h"
 
-#include "arrow/util/status.h"
 #include "arrow/test-util.h"
+#include "arrow/util/status.h"
 
 namespace arrow {
 


Mime
View raw message