arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [2/2] arrow git commit: ARROW-222: Prototyping an IO interface for Arrow, with initial HDFS target
Date Fri, 24 Jun 2016 23:55:37 GMT
ARROW-222: Prototyping an IO interface for Arrow, with initial HDFS target

- Switch Travis CI back to Ubuntu trusty (old Boost in precise has issues with
  C++11)
- Adapt SFrame libhdfs shim for arrow
- Create C++ public API within arrow:io to libhdfs
- Implement and test many functions in libhdfs
- Start Cython wrapper interface to arrow_io. Begin Python file-like interface,
  unit tests
- Add thirdparty hdfs.h so builds are possible without a local Hadoop distro
  (e.g. in Travis CI).

Change-Id: I4a46e50f6c1c22787baa3749d8a542216341e630


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ef908302
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ef908302
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ef908302

Branch: refs/heads/master
Commit: ef90830290491294d2fccfc5dcb16d3c0f96a70a
Parents: f7ade7b
Author: Wes McKinney <wesm@apache.org>
Authored: Fri Jun 24 16:41:08 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Fri Jun 24 16:41:08 2016 -0700

----------------------------------------------------------------------
 .travis.yml                              |    5 +-
 NOTICE.txt                               |    9 +
 ci/travis_before_script_cpp.sh           |   15 +-
 cpp/CMakeLists.txt                       |   60 +-
 cpp/doc/HDFS.md                          |   39 +
 cpp/src/arrow/io/CMakeLists.txt          |   97 +++
 cpp/src/arrow/io/hdfs-io-test.cc         |  315 ++++++++
 cpp/src/arrow/io/hdfs.cc                 |  458 +++++++++++
 cpp/src/arrow/io/hdfs.h                  |  213 +++++
 cpp/src/arrow/io/interfaces.h            |   71 ++
 cpp/src/arrow/io/libhdfs_shim.cc         |  544 +++++++++++++
 cpp/src/arrow/parquet/parquet-io-test.cc |    4 +-
 cpp/thirdparty/hadoop/include/hdfs.h     | 1024 +++++++++++++++++++++++++
 dev/merge_arrow_pr.py                    |    5 +-
 python/CMakeLists.txt                    |    6 +-
 python/cmake_modules/FindArrow.cmake     |   17 +-
 python/conda.recipe/meta.yaml            |    1 +
 python/pyarrow/error.pxd                 |    4 +-
 python/pyarrow/error.pyx                 |   14 +-
 python/pyarrow/includes/common.pxd       |   18 +
 python/pyarrow/includes/libarrow.pxd     |   19 -
 python/pyarrow/includes/libarrow_io.pxd  |   93 +++
 python/pyarrow/io.pyx                    |  504 ++++++++++++
 python/pyarrow/tests/test_array.py       |   47 +-
 python/pyarrow/tests/test_io.py          |  126 +++
 python/setup.py                          |    9 +-
 26 files changed, 3656 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ac2b0d4..97229b1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,5 +1,5 @@
 sudo: required
-dist: precise
+dist: trusty
 addons:
   apt:
     sources:
@@ -12,6 +12,9 @@ addons:
     - ccache
     - cmake
     - valgrind
+    - libboost-dev
+    - libboost-filesystem-dev
+    - libboost-system-dev
 
 matrix:
   fast_finish: true

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..0310c89
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,9 @@
+Apache Arrow
+Copyright 2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This product includes software from the SFrame project (BSD, 3-clause).
+* Copyright (C) 2015 Dato, Inc.
+* Copyright (c) 2009 Carnegie Mellon University.

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 9060cc9..08551f3 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -23,12 +23,21 @@ echo $GTEST_HOME
 
 : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}
 
-CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
+CMAKE_COMMON_FLAGS="\
+-DARROW_BUILD_BENCHMARKS=ON \
+-DARROW_PARQUET=ON \
+-DARROW_HDFS=on \
+-DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
 
 if [ $TRAVIS_OS_NAME == "linux" ]; then
-  cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+    cmake -DARROW_TEST_MEMCHECK=on \
+          $CMAKE_COMMON_FLAGS \
+          -DCMAKE_CXX_FLAGS="-Werror" \
+          $CPP_DIR
 else
-  cmake $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+    cmake $CMAKE_COMMON_FLAGS \
+          -DCMAKE_CXX_FLAGS="-Werror" \
+          $CPP_DIR
 fi
 
 make -j4

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index bdf7572..18b4759 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -62,6 +62,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
     "Build the Arrow IPC extensions"
     ON)
 
+  option(ARROW_HDFS
+    "Build the Arrow IO extensions for the Hadoop file system"
+    OFF)
+
   option(ARROW_SSE3
     "Build Arrow with SSE3"
     ON)
@@ -454,6 +458,47 @@ if ("$ENV{GBENCHMARK_HOME}" STREQUAL "")
   set(GBENCHMARK_HOME ${THIRDPARTY_DIR}/installed)
 endif()
 
+# ----------------------------------------------------------------------
+# Add Boost dependencies (code adapted from Apache Kudu (incubating))
+
+# find boost headers and libs
+set(Boost_DEBUG TRUE)
+set(Boost_USE_MULTITHREADED ON)
+set(Boost_USE_STATIC_LIBS ON)
+find_package(Boost COMPONENTS system filesystem REQUIRED)
+include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
+set(BOOST_STATIC_LIBS ${Boost_LIBRARIES})
+list(LENGTH BOOST_STATIC_LIBS BOOST_STATIC_LIBS_LEN)
+
+# Find Boost shared libraries.
+set(Boost_USE_STATIC_LIBS OFF)
+find_package(Boost COMPONENTS system filesystem REQUIRED)
+set(BOOST_SHARED_LIBS ${Boost_LIBRARIES})
+list(LENGTH BOOST_SHARED_LIBS BOOST_SHARED_LIBS_LEN)
+list(SORT BOOST_SHARED_LIBS)
+
+message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS})
+message(STATUS "Boost libraries: " ${Boost_LIBRARIES})
+
+math(EXPR LAST_IDX "${BOOST_STATIC_LIBS_LEN} - 1")
+foreach(IDX RANGE ${LAST_IDX})
+  list(GET BOOST_STATIC_LIBS ${IDX} BOOST_STATIC_LIB)
+  list(GET BOOST_SHARED_LIBS ${IDX} BOOST_SHARED_LIB)
+
+  # Remove the prefix/suffix from the library name.
+  #
+  # e.g. libboost_system-mt --> boost_system
+  get_filename_component(LIB_NAME ${BOOST_STATIC_LIB} NAME_WE)
+  string(REGEX REPLACE "lib([^-]*)(-mt)?" "\\1" LIB_NAME_NO_PREFIX_SUFFIX ${LIB_NAME})
+  ADD_THIRDPARTY_LIB(${LIB_NAME_NO_PREFIX_SUFFIX}
+    STATIC_LIB "${BOOST_STATIC_LIB}"
+    SHARED_LIB "${BOOST_SHARED_LIB}")
+  list(APPEND ARROW_BOOST_LIBS ${LIB_NAME_NO_PREFIX_SUFFIX})
+endforeach()
+include_directories(SYSTEM ${Boost_INCLUDE_DIR})
+
+# ----------------------------------------------------------------------
+# Enable / disable tests and benchmarks
 
 if(ARROW_BUILD_TESTS)
   add_custom_target(unittest ctest -L unittest)
@@ -529,12 +574,24 @@ endif (UNIX)
 # "make lint" target
 ############################################################
 if (UNIX)
+
+  file(GLOB_RECURSE LINT_FILES
+	"${CMAKE_CURRENT_SOURCE_DIR}/src/*.h"
+	"${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc"
+	)
+
+  FOREACH(item ${LINT_FILES})
+	IF(NOT (item MATCHES "_generated.h"))
+      LIST(APPEND FILTERED_LINT_FILES ${item})
+	ENDIF()
+  ENDFOREACH(item ${LINT_FILES})
+
   # Full lint
   add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
   --verbose=2
   --linelength=90
   --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references
-    `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`)
+  ${FILTERED_LINT_FILES})
 endif (UNIX)
 
 
@@ -624,6 +681,7 @@ set_target_properties(arrow
 target_link_libraries(arrow ${LIBARROW_LINK_LIBS})
 
 add_subdirectory(src/arrow)
+add_subdirectory(src/arrow/io)
 add_subdirectory(src/arrow/util)
 add_subdirectory(src/arrow/types)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/doc/HDFS.md
----------------------------------------------------------------------
diff --git a/cpp/doc/HDFS.md b/cpp/doc/HDFS.md
new file mode 100644
index 0000000..e0d5dfd
--- /dev/null
+++ b/cpp/doc/HDFS.md
@@ -0,0 +1,39 @@
+## Using Arrow's HDFS (Apache Hadoop Distributed File System) interface
+
+### Build requirements
+
+To build the integration, pass the following option to CMake
+
+```shell
+-DARROW_HDFS=on
+```
+
+For convenience, we have bundled `hdfs.h` for libhdfs from Apache Hadoop in
+Arrow's thirdparty. If you wish to build against the `hdfs.h` in your installed
+Hadoop distribution, set the `$HADOOP_HOME` environment variable.
+
+### Runtime requirements
+
+By default, the HDFS client C++ class in `libarrow_io` uses the libhdfs JNI
+interface to the Java Hadoop client. This library is loaded **at runtime**
+(rather than at link / library load time, since the library may not be in your
+LD_LIBRARY_PATH), and relies on some environment variables.
+
+* `HADOOP_HOME`: the root of your installed Hadoop distribution. Check in the
+  `lib/native` directory to look for `libhdfs.so` if you have any questions
+  about which directory you're after.
+* `JAVA_HOME`: the location of your Java SDK installation
+* `CLASSPATH`: must contain the Hadoop jars. You can set these using:
+
+```shell
+export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
+```
+
+#### Setting $JAVA_HOME  automatically on OS X
+
+The installed location of Java on OS X can vary, however the following snippet
+will set it automatically for you:
+
+```shell
+export JAVA_HOME=$(/usr/libexec/java_home)
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
new file mode 100644
index 0000000..33b654f
--- /dev/null
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# arrow_io : Arrow IO interfaces
+
+set(ARROW_IO_LINK_LIBS
+  arrow
+)
+
+set(ARROW_IO_PRIVATE_LINK_LIBS
+  boost_system
+  boost_filesystem
+)
+
+set(ARROW_IO_TEST_LINK_LIBS
+  arrow_io
+  ${ARROW_IO_PRIVATE_LINK_LIBS})
+
+set(ARROW_IO_SRCS
+)
+
+if(ARROW_HDFS)
+  if(NOT THIRDPARTY_DIR)
+	message(FATAL_ERROR "THIRDPARTY_DIR not set")
+  endif()
+
+  if (DEFINED ENV{HADOOP_HOME})
+	set(HADOOP_HOME $ENV{HADOOP_HOME})
+  else()
+	set(HADOOP_HOME "${THIRDPARTY_DIR}/hadoop")
+  endif()
+
+  set(HDFS_H_PATH "${HADOOP_HOME}/include/hdfs.h")
+  if (NOT EXISTS ${HDFS_H_PATH})
+	message(FATAL_ERROR "Did not find hdfs.h at ${HDFS_H_PATH}")
+  endif()
+  message(STATUS "Found hdfs.h at: " ${HDFS_H_PATH})
+  message(STATUS "Building libhdfs shim component")
+
+  include_directories(SYSTEM "${HADOOP_HOME}/include")
+
+  set(ARROW_HDFS_SRCS
+	hdfs.cc
+	libhdfs_shim.cc)
+
+  set_property(SOURCE ${ARROW_HDFS_SRCS}
+	APPEND_STRING PROPERTY
+	COMPILE_FLAGS "-DHAS_HADOOP")
+
+  set(ARROW_IO_SRCS
+	${ARROW_HDFS_SRCS}
+	${ARROW_IO_SRCS})
+
+  ADD_ARROW_TEST(hdfs-io-test)
+  ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
+	${ARROW_IO_TEST_LINK_LIBS})
+endif()
+
+add_library(arrow_io SHARED
+  ${ARROW_IO_SRCS}
+)
+target_link_libraries(arrow_io LINK_PUBLIC ${ARROW_IO_LINK_LIBS})
+target_link_libraries(arrow_io LINK_PRIVATE ${ARROW_IO_PRIVATE_LINK_LIBS})
+
+SET_TARGET_PROPERTIES(arrow_io PROPERTIES LINKER_LANGUAGE CXX)
+
+if (APPLE)
+  set_target_properties(arrow_io
+    PROPERTIES
+    BUILD_WITH_INSTALL_RPATH ON
+    INSTALL_NAME_DIR "@rpath")
+endif()
+
+# Headers: top level
+install(FILES
+  hdfs.h
+  interfaces.h
+  DESTINATION include/arrow/io)
+
+install(TARGETS arrow_io
+  LIBRARY DESTINATION lib
+  ARCHIVE DESTINATION lib)

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc
new file mode 100644
index 0000000..11d67ae
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs-io-test.cc
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include <boost/filesystem.hpp>  // NOLINT
+
+#include "arrow/io/hdfs.h"
+#include "arrow/test-util.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+std::vector<uint8_t> RandomData(int64_t size) {
+  std::vector<uint8_t> buffer(size);
+  test::random_bytes(size, 0, buffer.data());
+  return buffer;
+}
+
+class TestHdfsClient : public ::testing::Test {
+ public:
+  Status MakeScratchDir() {
+    if (client_->Exists(scratch_dir_)) {
+      RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
+    }
+    return client_->CreateDirectory(scratch_dir_);
+  }
+
+  Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
+      bool append = false, int buffer_size = 0, int replication = 0,
+      int default_block_size = 0) {
+    std::shared_ptr<HdfsWriteableFile> file;
+    RETURN_NOT_OK(client_->OpenWriteable(
+        path, append, buffer_size, replication, default_block_size, &file));
+
+    RETURN_NOT_OK(file->Write(buffer, size));
+    RETURN_NOT_OK(file->Close());
+
+    return Status::OK();
+  }
+
+  std::string ScratchPath(const std::string& name) {
+    std::stringstream ss;
+    ss << scratch_dir_ << "/" << name;
+    return ss.str();
+  }
+
+  std::string HdfsAbsPath(const std::string& relpath) {
+    std::stringstream ss;
+    ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
+    return ss.str();
+  }
+
+ protected:
+  // Set up shared state between unit tests
+  static void SetUpTestCase() {
+    if (!ConnectLibHdfs().ok()) {
+      std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
+      return;
+    }
+
+    loaded_libhdfs_ = true;
+
+    const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
+    const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
+    const char* user = std::getenv("ARROW_HDFS_TEST_USER");
+
+    ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
+
+    conf_.host = host == nullptr ? "localhost" : host;
+    conf_.user = user;
+    conf_.port = port == nullptr ? 20500 : atoi(port);
+
+    ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
+  }
+
+  static void TearDownTestCase() {
+    if (client_) {
+      EXPECT_OK(client_->Delete(scratch_dir_, true));
+      EXPECT_OK(client_->Disconnect());
+    }
+  }
+
+  static bool loaded_libhdfs_;
+
+  // Resources shared amongst unit tests
+  static HdfsConnectionConfig conf_;
+  static std::string scratch_dir_;
+  static std::shared_ptr<HdfsClient> client_;
+};
+
+bool TestHdfsClient::loaded_libhdfs_ = false;
+HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
+
+std::string TestHdfsClient::scratch_dir_ =
+    boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+
+std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
+
+#define SKIP_IF_NO_LIBHDFS()                          \
+  if (!loaded_libhdfs_) {                             \
+    std::cout << "No libhdfs, skipping" << std::endl; \
+    return;                                           \
+  }
+
+TEST_F(TestHdfsClient, ConnectsAgain) {
+  SKIP_IF_NO_LIBHDFS();
+
+  std::shared_ptr<HdfsClient> client;
+  ASSERT_OK(HdfsClient::Connect(&conf_, &client));
+  ASSERT_OK(client->Disconnect());
+}
+
+TEST_F(TestHdfsClient, CreateDirectory) {
+  SKIP_IF_NO_LIBHDFS();
+
+  std::string path = ScratchPath("create-directory");
+
+  if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
+
+  ASSERT_OK(client_->CreateDirectory(path));
+  ASSERT_TRUE(client_->Exists(path));
+  EXPECT_OK(client_->Delete(path, true));
+  ASSERT_FALSE(client_->Exists(path));
+}
+
+TEST_F(TestHdfsClient, GetCapacityUsed) {
+  SKIP_IF_NO_LIBHDFS();
+
+  // Who knows what is actually in your DFS cluster, but expect it to have
+  // positive used bytes and capacity
+  int64_t nbytes = 0;
+  ASSERT_OK(client_->GetCapacity(&nbytes));
+  ASSERT_LT(0, nbytes);
+
+  ASSERT_OK(client_->GetUsed(&nbytes));
+  ASSERT_LT(0, nbytes);
+}
+
+TEST_F(TestHdfsClient, GetPathInfo) {
+  SKIP_IF_NO_LIBHDFS();
+
+  HdfsPathInfo info;
+
+  ASSERT_OK(MakeScratchDir());
+
+  // Directory info
+  ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
+  ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+  ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
+  ASSERT_EQ(conf_.user, info.owner);
+
+  // TODO(wesm): test group, other attrs
+
+  auto path = ScratchPath("test-file");
+
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+  ASSERT_OK(client_->GetPathInfo(path, &info));
+
+  ASSERT_EQ(ObjectType::FILE, info.kind);
+  ASSERT_EQ(HdfsAbsPath(path), info.name);
+  ASSERT_EQ(conf_.user, info.owner);
+  ASSERT_EQ(size, info.size);
+}
+
+TEST_F(TestHdfsClient, AppendToFile) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto path = ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+
+  // now append
+  ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
+
+  HdfsPathInfo info;
+  ASSERT_OK(client_->GetPathInfo(path, &info));
+  ASSERT_EQ(size * 2, info.size);
+}
+
+TEST_F(TestHdfsClient, ListDirectory) {
+  SKIP_IF_NO_LIBHDFS();
+
+  const int size = 100;
+  std::vector<uint8_t> data = RandomData(size);
+
+  auto p1 = ScratchPath("test-file-1");
+  auto p2 = ScratchPath("test-file-2");
+  auto d1 = ScratchPath("test-dir-1");
+
+  ASSERT_OK(MakeScratchDir());
+  ASSERT_OK(WriteDummyFile(p1, data.data(), size));
+  ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
+  ASSERT_OK(client_->CreateDirectory(d1));
+
+  std::vector<HdfsPathInfo> listing;
+  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+  // Do it again, appends!
+  ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+  ASSERT_EQ(6, listing.size());
+
+  // Argh, well, shouldn't expect the listing to be in any particular order
+  for (size_t i = 0; i < listing.size(); ++i) {
+    const HdfsPathInfo& info = listing[i];
+    if (info.name == HdfsAbsPath(p1)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size, info.size);
+    } else if (info.name == HdfsAbsPath(p2)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size / 2, info.size);
+    } else if (info.name == HdfsAbsPath(d1)) {
+      ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+    } else {
+      FAIL() << "Unexpected path: " << info.name;
+    }
+  }
+}
+
+TEST_F(TestHdfsClient, ReadableMethods) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto path = ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(client_->OpenReadable(path, &file));
+
+  // Test GetSize -- move this into its own unit test if ever needed
+  int64_t file_size;
+  ASSERT_OK(file->GetSize(&file_size));
+  ASSERT_EQ(size, file_size);
+
+  uint8_t buffer[50];
+  int32_t bytes_read = 0;
+
+  ASSERT_OK(file->Read(50, &bytes_read, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
+  ASSERT_EQ(50, bytes_read);
+
+  ASSERT_OK(file->Read(50, &bytes_read, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
+  ASSERT_EQ(50, bytes_read);
+
+  // EOF
+  ASSERT_OK(file->Read(1, &bytes_read, buffer));
+  ASSERT_EQ(0, bytes_read);
+
+  // ReadAt to EOF
+  ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
+  ASSERT_EQ(40, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
+
+  // Seek, Tell
+  ASSERT_OK(file->Seek(60));
+
+  int64_t position;
+  ASSERT_OK(file->Tell(&position));
+  ASSERT_EQ(60, position);
+}
+
+TEST_F(TestHdfsClient, RenameFile) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto src_path = ScratchPath("src-file");
+  auto dst_path = ScratchPath("dst-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
+
+  ASSERT_OK(client_->Rename(src_path, dst_path));
+
+  ASSERT_FALSE(client_->Exists(src_path));
+  ASSERT_TRUE(client_->Exists(dst_path));
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
new file mode 100644
index 0000000..6da6ea4
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <hdfs.h>
+
+#include <cstdint>
+#include <sstream>
+#include <string>
+
+#include "arrow/io/hdfs.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+#define CHECK_FAILURE(RETURN_VALUE, WHAT)  \
+  do {                                     \
+    if (RETURN_VALUE == -1) {              \
+      std::stringstream ss;                \
+      ss << "HDFS: " << WHAT << " failed"; \
+      return Status::IOError(ss.str());    \
+    }                                      \
+  } while (0)
+
+static Status CheckReadResult(int ret) {
+  // Check for error on -1 (possibly errno set)
+
+  // ret == 0 at end of file, which is OK
+  if (ret == -1) {
+    // EOF
+    std::stringstream ss;
+    ss << "HDFS read failed, errno: " << errno;
+    return Status::IOError(ss.str());
+  }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// File reading
+
+class HdfsAnyFileImpl {
+ public:
+  void set_members(const std::string& path, hdfsFS fs, hdfsFile handle) {
+    path_ = path;
+    fs_ = fs;
+    file_ = handle;
+    is_open_ = true;
+  }
+
+  Status Seek(int64_t position) {
+    int ret = hdfsSeek(fs_, file_, position);
+    CHECK_FAILURE(ret, "seek");
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* offset) {
+    int64_t ret = hdfsTell(fs_, file_);
+    CHECK_FAILURE(ret, "tell");
+    *offset = ret;
+    return Status::OK();
+  }
+
+  bool is_open() const { return is_open_; }
+
+ protected:
+  std::string path_;
+
+  // These are pointers in libhdfs, so OK to copy
+  hdfsFS fs_;
+  hdfsFile file_;
+
+  bool is_open_;
+};
+
+// Private implementation for read-only files
+class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
+ public:
+  HdfsReadableFileImpl() {}
+
+  Status Close() {
+    if (is_open_) {
+      int ret = hdfsCloseFile(fs_, file_);
+      CHECK_FAILURE(ret, "CloseFile");
+      is_open_ = false;
+    }
+    return Status::OK();
+  }
+
+  Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+    tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
+        reinterpret_cast<void*>(buffer), nbytes);
+    RETURN_NOT_OK(CheckReadResult(ret));
+    *bytes_read = ret;
+    return Status::OK();
+  }
+
+  Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+    tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
+    RETURN_NOT_OK(CheckReadResult(ret));
+    *bytes_read = ret;
+    return Status::OK();
+  }
+
+  Status GetSize(int64_t* size) {
+    hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str());
+    if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
+
+    *size = entry->mSize;
+    hdfsFreeFileInfo(entry, 1);
+    return Status::OK();
+  }
+};
+
+HdfsReadableFile::HdfsReadableFile() {
+  impl_.reset(new HdfsReadableFileImpl());
+}
+
+HdfsReadableFile::~HdfsReadableFile() {
+  impl_->Close();
+}
+
+Status HdfsReadableFile::Close() {
+  return impl_->Close();
+}
+
+Status HdfsReadableFile::ReadAt(
+    int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+  return impl_->ReadAt(position, nbytes, bytes_read, buffer);
+}
+
+Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+  return impl_->Read(nbytes, bytes_read, buffer);
+}
+
+Status HdfsReadableFile::GetSize(int64_t* size) {
+  return impl_->GetSize(size);
+}
+
+Status HdfsReadableFile::Seek(int64_t position) {
+  return impl_->Seek(position);
+}
+
+Status HdfsReadableFile::Tell(int64_t* position) {
+  return impl_->Tell(position);
+}
+
+// ----------------------------------------------------------------------
+// File writing
+
+// Private implementation for writeable-only files
+class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
+ public:
+  HdfsWriteableFileImpl() {}
+
+  Status Close() {
+    if (is_open_) {
+      int ret = hdfsFlush(fs_, file_);
+      CHECK_FAILURE(ret, "Flush");
+      ret = hdfsCloseFile(fs_, file_);
+      CHECK_FAILURE(ret, "CloseFile");
+      is_open_ = false;
+    }
+    return Status::OK();
+  }
+
+  Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) {
+    tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes);
+    CHECK_FAILURE(ret, "Write");
+    *bytes_written = ret;
+    return Status::OK();
+  }
+};
+
+HdfsWriteableFile::HdfsWriteableFile() {
+  impl_.reset(new HdfsWriteableFileImpl());
+}
+
+HdfsWriteableFile::~HdfsWriteableFile() {
+  impl_->Close();
+}
+
+Status HdfsWriteableFile::Close() {
+  return impl_->Close();
+}
+
+Status HdfsWriteableFile::Write(
+    const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) {
+  return impl_->Write(buffer, nbytes, bytes_read);
+}
+
+Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) {
+  int32_t bytes_written_dummy = 0;
+  return Write(buffer, nbytes, &bytes_written_dummy);
+}
+
+Status HdfsWriteableFile::Tell(int64_t* position) {
+  return impl_->Tell(position);
+}
+
+// ----------------------------------------------------------------------
+// HDFS client
+
+// TODO(wesm): this could throw std::bad_alloc in the course of copying strings
+// into the path info object
+static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) {
+  out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY;
+  out->name = std::string(input->mName);
+  out->owner = std::string(input->mOwner);
+  out->group = std::string(input->mGroup);
+
+  out->last_access_time = static_cast<int32_t>(input->mLastAccess);
+  out->last_modified_time = static_cast<int32_t>(input->mLastMod);
+  out->size = static_cast<int64_t>(input->mSize);
+
+  out->replication = input->mReplication;
+  out->block_size = input->mBlockSize;
+
+  out->permissions = input->mPermissions;
+}
+
+// Private implementation
+class HdfsClient::HdfsClientImpl {
+ public:
+  HdfsClientImpl() {}
+
+  Status Connect(const HdfsConnectionConfig* config) {
+    RETURN_NOT_OK(ConnectLibHdfs());
+
+    fs_ = hdfsConnectAsUser(config->host.c_str(), config->port, config->user.c_str());
+
+    if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); }
+    namenode_host_ = config->host;
+    port_ = config->port;
+    user_ = config->user;
+
+    return Status::OK();
+  }
+
+  Status CreateDirectory(const std::string& path) {
+    int ret = hdfsCreateDirectory(fs_, path.c_str());
+    CHECK_FAILURE(ret, "create directory");
+    return Status::OK();
+  }
+
+  Status Delete(const std::string& path, bool recursive) {
+    int ret = hdfsDelete(fs_, path.c_str(), static_cast<int>(recursive));
+    CHECK_FAILURE(ret, "delete");
+    return Status::OK();
+  }
+
+  Status Disconnect() {
+    int ret = hdfsDisconnect(fs_);
+    CHECK_FAILURE(ret, "hdfsFS::Disconnect");
+    return Status::OK();
+  }
+
+  bool Exists(const std::string& path) {
+    // hdfsExists does not distinguish between RPC failure and the file not
+    // existing
+    int ret = hdfsExists(fs_, path.c_str());
+    return ret == 0;
+  }
+
+  Status GetCapacity(int64_t* nbytes) {
+    tOffset ret = hdfsGetCapacity(fs_);
+    CHECK_FAILURE(ret, "GetCapacity");
+    *nbytes = ret;
+    return Status::OK();
+  }
+
+  Status GetUsed(int64_t* nbytes) {
+    tOffset ret = hdfsGetUsed(fs_);
+    CHECK_FAILURE(ret, "GetUsed");
+    *nbytes = ret;
+    return Status::OK();
+  }
+
+  Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
+    hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path.c_str());
+
+    if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
+
+    SetPathInfo(entry, info);
+    hdfsFreeFileInfo(entry, 1);
+
+    return Status::OK();
+  }
+
+  Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
+    int num_entries = 0;
+    hdfsFileInfo* entries = hdfsListDirectory(fs_, path.c_str(), &num_entries);
+
+    if (entries == nullptr) {
+      // If the directory is empty, entries is NULL but errno is 0. Non-zero
+      // errno indicates error
+      //
+      // Note: errno is thread-locala
+      if (errno == 0) { num_entries = 0; }
+      { return Status::IOError("HDFS: list directory failed"); }
+    }
+
+    // Allocate additional space for elements
+
+    int vec_offset = listing->size();
+    listing->resize(vec_offset + num_entries);
+
+    for (int i = 0; i < num_entries; ++i) {
+      SetPathInfo(entries + i, &(*listing)[vec_offset + i]);
+    }
+
+    // Free libhdfs file info
+    hdfsFreeFileInfo(entries, num_entries);
+
+    return Status::OK();
+  }
+
+  Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
+    hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0);
+
+    if (handle == nullptr) {
+      // TODO(wesm): determine cause of failure
+      std::stringstream ss;
+      ss << "Unable to open file " << path;
+      return Status::IOError(ss.str());
+    }
+
+    // std::make_shared does not work with private ctors
+    *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
+    (*file)->impl_->set_members(path, fs_, handle);
+
+    return Status::OK();
+  }
+
+  Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
+      int16_t replication, int64_t default_block_size,
+      std::shared_ptr<HdfsWriteableFile>* file) {
+    int flags = O_WRONLY;
+    if (append) flags |= O_APPEND;
+
+    hdfsFile handle = hdfsOpenFile(
+        fs_, path.c_str(), flags, buffer_size, replication, default_block_size);
+
+    if (handle == nullptr) {
+      // TODO(wesm): determine cause of failure
+      std::stringstream ss;
+      ss << "Unable to open file " << path;
+      return Status::IOError(ss.str());
+    }
+
+    // std::make_shared does not work with private ctors
+    *file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
+    (*file)->impl_->set_members(path, fs_, handle);
+
+    return Status::OK();
+  }
+
+  Status Rename(const std::string& src, const std::string& dst) {
+    int ret = hdfsRename(fs_, src.c_str(), dst.c_str());
+    CHECK_FAILURE(ret, "Rename");
+    return Status::OK();
+  }
+
+ private:
+  std::string namenode_host_;
+  std::string user_;
+  int port_;
+
+  hdfsFS fs_;
+};
+
+// ----------------------------------------------------------------------
+// Public API for HDFSClient
+
+HdfsClient::HdfsClient() {
+  impl_.reset(new HdfsClientImpl());
+}
+
+HdfsClient::~HdfsClient() {}
+
+Status HdfsClient::Connect(
+    const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs) {
+  // ctor is private, make_shared will not work
+  *fs = std::shared_ptr<HdfsClient>(new HdfsClient());
+
+  RETURN_NOT_OK((*fs)->impl_->Connect(config));
+  return Status::OK();
+}
+
+Status HdfsClient::CreateDirectory(const std::string& path) {
+  return impl_->CreateDirectory(path);
+}
+
+Status HdfsClient::Delete(const std::string& path, bool recursive) {
+  return impl_->Delete(path, recursive);
+}
+
+Status HdfsClient::Disconnect() {
+  return impl_->Disconnect();
+}
+
+bool HdfsClient::Exists(const std::string& path) {
+  return impl_->Exists(path);
+}
+
+Status HdfsClient::GetPathInfo(const std::string& path, HdfsPathInfo* info) {
+  return impl_->GetPathInfo(path, info);
+}
+
+Status HdfsClient::GetCapacity(int64_t* nbytes) {
+  return impl_->GetCapacity(nbytes);
+}
+
+Status HdfsClient::GetUsed(int64_t* nbytes) {
+  return impl_->GetUsed(nbytes);
+}
+
+Status HdfsClient::ListDirectory(
+    const std::string& path, std::vector<HdfsPathInfo>* listing) {
+  return impl_->ListDirectory(path, listing);
+}
+
+Status HdfsClient::OpenReadable(
+    const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
+  return impl_->OpenReadable(path, file);
+}
+
+Status HdfsClient::OpenWriteable(const std::string& path, bool append,
+    int32_t buffer_size, int16_t replication, int64_t default_block_size,
+    std::shared_ptr<HdfsWriteableFile>* file) {
+  return impl_->OpenWriteable(
+      path, append, buffer_size, replication, default_block_size, file);
+}
+
+Status HdfsClient::OpenWriteable(
+    const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file) {
+  return OpenWriteable(path, append, 0, 0, 0, file);
+}
+
+Status HdfsClient::Rename(const std::string& src, const std::string& dst) {
+  return impl_->Rename(src, dst);
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
new file mode 100644
index 0000000..a1972db
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs.h
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_HDFS
+#define ARROW_IO_HDFS
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+Status ConnectLibHdfs();
+
+class HdfsClient;
+class HdfsReadableFile;
+class HdfsWriteableFile;
+
+struct HdfsPathInfo {
+  ObjectType::type kind;
+
+  std::string name;
+  std::string owner;
+  std::string group;
+
+  // Access times in UNIX timestamps (seconds)
+  int64_t size;
+  int64_t block_size;
+
+  int32_t last_modified_time;
+  int32_t last_access_time;
+
+  int16_t replication;
+  int16_t permissions;
+};
+
+struct HdfsConnectionConfig {
+  std::string host;
+  int port;
+  std::string user;
+
+  // TODO: Kerberos, etc.
+};
+
+class HdfsClient : public FileSystemClient {
+ public:
+  ~HdfsClient();
+
+  // Connect to an HDFS cluster at indicated host, port, and as user
+  //
+  // @param host (in)
+  // @param port (in)
+  // @param user (in): user to identify as
+  // @param fs (out): the created client
+  // @returns Status
+  static Status Connect(
+      const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs);
+
+  // Create directory and all parents
+  //
+  // @param path (in): absolute HDFS path
+  // @returns Status
+  Status CreateDirectory(const std::string& path);
+
+  // Delete file or directory
+  // @param path: absolute path to data
+  // @param recursive: if path is a directory, delete contents as well
+  // @returns error status on failure
+  Status Delete(const std::string& path, bool recursive = false);
+
+  // Disconnect from cluster
+  //
+  // @returns Status
+  Status Disconnect();
+
+  // @param path (in): absolute HDFS path
+  // @returns bool, true if the path exists, false if not (or on error)
+  bool Exists(const std::string& path);
+
+  // @param path (in): absolute HDFS path
+  // @param info (out)
+  // @returns Status
+  Status GetPathInfo(const std::string& path, HdfsPathInfo* info);
+
+  // @param nbytes (out): total capacity of the filesystem
+  // @returns Status
+  Status GetCapacity(int64_t* nbytes);
+
+  // @param nbytes (out): total bytes used of the filesystem
+  // @returns Status
+  Status GetUsed(int64_t* nbytes);
+
+  Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing);
+
+  // @param path file path to change
+  // @param owner pass nullptr for no change
+  // @param group pass nullptr for no change
+  Status Chown(const std::string& path, const char* owner, const char* group);
+
+  Status Chmod(const std::string& path, int mode);
+
+  // Move file or directory from source path to destination path within the
+  // current filesystem
+  Status Rename(const std::string& src, const std::string& dst);
+
+  // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory
+
+  // Open an HDFS file in READ mode. Returns error
+  // status if the file is not found.
+  //
+  // @param path complete file path
+  Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);
+
+  // FileMode::WRITE options
+  // @param path complete file path
+  // @param buffer_size, 0 for default
+  // @param replication, 0 for default
+  // @param default_block_size, 0 for default
+  Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
+      int16_t replication, int64_t default_block_size,
+      std::shared_ptr<HdfsWriteableFile>* file);
+
+  Status OpenWriteable(
+      const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file);
+
+ private:
+  friend class HdfsReadableFile;
+  friend class HdfsWriteableFile;
+
+  class HdfsClientImpl;
+  std::unique_ptr<HdfsClientImpl> impl_;
+
+  HdfsClient();
+  DISALLOW_COPY_AND_ASSIGN(HdfsClient);
+};
+
+class HdfsReadableFile : public RandomAccessFile {
+ public:
+  ~HdfsReadableFile();
+
+  Status Close() override;
+
+  Status GetSize(int64_t* size) override;
+
+  Status ReadAt(
+      int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+
+  Status Seek(int64_t position) override;
+  Status Tell(int64_t* position) override;
+
+  // NOTE: If you wish to read a particular range of a file in a multithreaded
+  // context, you may prefer to use ReadAt to avoid locking issues
+  Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+
+ private:
+  class HdfsReadableFileImpl;
+  std::unique_ptr<HdfsReadableFileImpl> impl_;
+
+  friend class HdfsClient::HdfsClientImpl;
+
+  HdfsReadableFile();
+  DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
+};
+
+class HdfsWriteableFile : public WriteableFile {
+ public:
+  ~HdfsWriteableFile();
+
+  Status Close() override;
+
+  Status Write(const uint8_t* buffer, int32_t nbytes) override;
+
+  Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written);
+
+  Status Tell(int64_t* position) override;
+
+ private:
+  class HdfsWriteableFileImpl;
+  std::unique_ptr<HdfsWriteableFileImpl> impl_;
+
+  friend class HdfsClient::HdfsClientImpl;
+
+  HdfsWriteableFile();
+
+  DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_HDFS

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
new file mode 100644
index 0000000..4bd8a8f
--- /dev/null
+++ b/cpp/src/arrow/io/interfaces.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_INTERFACES
+#define ARROW_IO_INTERFACES
+
+#include <cstdint>
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+struct FileMode {
+  enum type { READ, WRITE, READWRITE };
+};
+
+struct ObjectType {
+  enum type { FILE, DIRECTORY };
+};
+
+class FileSystemClient {
+ public:
+  virtual ~FileSystemClient() {}
+};
+
+class FileBase {
+  virtual Status Close() = 0;
+
+  virtual Status Tell(int64_t* position) = 0;
+};
+
+class ReadableFile : public FileBase {
+ public:
+  virtual Status ReadAt(
+      int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+
+  virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+
+  virtual Status GetSize(int64_t* size) = 0;
+};
+
+class RandomAccessFile : public ReadableFile {
+ public:
+  virtual Status Seek(int64_t position) = 0;
+};
+
+class WriteableFile : public FileBase {
+ public:
+  virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_INTERFACES

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
new file mode 100644
index 0000000..f752665
--- /dev/null
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This shim interface to libhdfs (for runtime shared library loading) has been
+// adapted from the SFrame project, released under the ASF-compatible 3-clause
+// BSD license
+//
+// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
+// variables set, so that libjvm and libhdfs can be located easily
+
+// Copyright (C) 2015 Dato, Inc.
+// All rights reserved.
+//
+// This software may be modified and distributed under the terms
+// of the BSD license. See the LICENSE file for details.
+
+#ifdef HAS_HADOOP
+
+#ifndef _WIN32
+#include <dlfcn.h>
+#else
+#include <winsock2.h>
+#include <windows.h>
+
+// TODO(wesm): address when/if we add windows support
+// #include <util/syserr_reporting.hpp>
+#endif
+
+extern "C" {
+#include <hdfs.h>
+}
+
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <boost/filesystem.hpp>        // NOLINT
+#include <boost/algorithm/string.hpp>  // NOLINT
+
+#include "arrow/util/status.h"
+
+namespace fs = boost::filesystem;
+
+extern "C" {
+
+#ifndef _WIN32
+static void* libhdfs_handle = NULL;
+static void* libjvm_handle = NULL;
+#else
+static HINSTANCE libhdfs_handle = NULL;
+static HINSTANCE libjvm_handle = NULL;
+#endif
+/*
+ * All the shim pointers
+ */
+
+// NOTE(wesm): cpplint does not like use of short and other imprecise C types
+
+static hdfsFS (*ptr_hdfsConnectAsUser)(
+    const char* host, tPort port, const char* user) = NULL;
+static hdfsFS (*ptr_hdfsConnect)(const char* host, tPort port) = NULL;
+static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL;
+
+static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags,
+    int bufferSize, short replication, tSize blocksize) = NULL;  // NOLINT
+
+static int (*ptr_hdfsCloseFile)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsExists)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos) = NULL;
+static tOffset (*ptr_hdfsTell)(hdfsFS fs, hdfsFile file) = NULL;
+static tSize (*ptr_hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length) = NULL;
+static tSize (*ptr_hdfsPread)(
+    hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) = NULL;
+static tSize (*ptr_hdfsWrite)(
+    hdfsFS fs, hdfsFile file, const void* buffer, tSize length) = NULL;
+static int (*ptr_hdfsFlush)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsAvailable)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsCopy)(
+    hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
+static int (*ptr_hdfsMove)(
+    hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
+static int (*ptr_hdfsDelete)(hdfsFS fs, const char* path, int recursive) = NULL;
+static int (*ptr_hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath) = NULL;
+static char* (*ptr_hdfsGetWorkingDirectory)(
+    hdfsFS fs, char* buffer, size_t bufferSize) = NULL;
+static int (*ptr_hdfsSetWorkingDirectory)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsCreateDirectory)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsSetReplication)(
+    hdfsFS fs, const char* path, int16_t replication) = NULL;
+static hdfsFileInfo* (*ptr_hdfsListDirectory)(
+    hdfsFS fs, const char* path, int* numEntries) = NULL;
+static hdfsFileInfo* (*ptr_hdfsGetPathInfo)(hdfsFS fs, const char* path) = NULL;
+static void (*ptr_hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries) = NULL;
+static char*** (*ptr_hdfsGetHosts)(
+    hdfsFS fs, const char* path, tOffset start, tOffset length) = NULL;
+static void (*ptr_hdfsFreeHosts)(char*** blockHosts) = NULL;
+static tOffset (*ptr_hdfsGetDefaultBlockSize)(hdfsFS fs) = NULL;
+static tOffset (*ptr_hdfsGetCapacity)(hdfsFS fs) = NULL;
+static tOffset (*ptr_hdfsGetUsed)(hdfsFS fs) = NULL;
+static int (*ptr_hdfsChown)(
+    hdfsFS fs, const char* path, const char* owner, const char* group) = NULL;
+static int (*ptr_hdfsChmod)(hdfsFS fs, const char* path, short mode) = NULL;  // NOLINT
+static int (*ptr_hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime) = NULL;
+
+// Helper functions for dlopens
+static std::vector<fs::path> get_potential_libjvm_paths();
+static std::vector<fs::path> get_potential_libhdfs_paths();
+static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name,
+#ifndef _WIN32
+    void*& out_handle);
+#else
+    HINSTANCE& out_handle);
+#endif
+
+#define GET_SYMBOL(SYMBOL_NAME)                                                  \
+  if (!ptr_##SYMBOL_NAME) {                                                      \
+    *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \
+  }
+
+static void* get_symbol(const char* symbol) {
+  if (libhdfs_handle == NULL) return NULL;
+#ifndef _WIN32
+  return dlsym(libhdfs_handle, symbol);
+#else
+
+  void* ret = reinterpret_cast<void*>(GetProcAddress(libhdfs_handle, symbol));
+  if (ret == NULL) {
+    // logstream(LOG_INFO) << "GetProcAddress error: "
+    //                     << get_last_err_str(GetLastError()) << std::endl;
+  }
+  return ret;
+#endif
+}
+
+hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char* user) {
+  return ptr_hdfsConnectAsUser(host, port, user);
+}
+
+// Returns NULL on failure
+hdfsFS hdfsConnect(const char* host, tPort port) {
+  if (ptr_hdfsConnect) {
+    return ptr_hdfsConnect(host, port);
+  } else {
+    // TODO: error reporting when shim setup fails
+    return NULL;
+  }
+}
+
+int hdfsDisconnect(hdfsFS fs) {
+  return ptr_hdfsDisconnect(fs);
+}
+
+hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
+    short replication, tSize blocksize) {  // NOLINT
+  return ptr_hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize);
+}
+
+int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
+  return ptr_hdfsCloseFile(fs, file);
+}
+
+int hdfsExists(hdfsFS fs, const char* path) {
+  return ptr_hdfsExists(fs, path);
+}
+
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
+  return ptr_hdfsSeek(fs, file, desiredPos);
+}
+
+tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
+  return ptr_hdfsTell(fs, file);
+}
+
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
+  return ptr_hdfsRead(fs, file, buffer, length);
+}
+
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) {
+  return ptr_hdfsPread(fs, file, position, buffer, length);
+}
+
+tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) {
+  return ptr_hdfsWrite(fs, file, buffer, length);
+}
+
+int hdfsFlush(hdfsFS fs, hdfsFile file) {
+  return ptr_hdfsFlush(fs, file);
+}
+
+int hdfsAvailable(hdfsFS fs, hdfsFile file) {
+  GET_SYMBOL(hdfsAvailable);
+  if (ptr_hdfsAvailable)
+    return ptr_hdfsAvailable(fs, file);
+  else
+    return 0;
+}
+
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
+  GET_SYMBOL(hdfsCopy);
+  if (ptr_hdfsCopy)
+    return ptr_hdfsCopy(srcFS, src, dstFS, dst);
+  else
+    return 0;
+}
+
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
+  GET_SYMBOL(hdfsMove);
+  if (ptr_hdfsMove)
+    return ptr_hdfsMove(srcFS, src, dstFS, dst);
+  else
+    return 0;
+}
+
+int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
+  return ptr_hdfsDelete(fs, path, recursive);
+}
+
+int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
+  GET_SYMBOL(hdfsRename);
+  if (ptr_hdfsRename)
+    return ptr_hdfsRename(fs, oldPath, newPath);
+  else
+    return 0;
+}
+
+char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) {
+  GET_SYMBOL(hdfsGetWorkingDirectory);
+  if (ptr_hdfsGetWorkingDirectory) {
+    return ptr_hdfsGetWorkingDirectory(fs, buffer, bufferSize);
+  } else {
+    return NULL;
+  }
+}
+
+int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
+  GET_SYMBOL(hdfsSetWorkingDirectory);
+  if (ptr_hdfsSetWorkingDirectory) {
+    return ptr_hdfsSetWorkingDirectory(fs, path);
+  } else {
+    return 0;
+  }
+}
+
+int hdfsCreateDirectory(hdfsFS fs, const char* path) {
+  return ptr_hdfsCreateDirectory(fs, path);
+}
+
+int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
+  GET_SYMBOL(hdfsSetReplication);
+  if (ptr_hdfsSetReplication) {
+    return ptr_hdfsSetReplication(fs, path, replication);
+  } else {
+    return 0;
+  }
+}
+
+hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries) {
+  return ptr_hdfsListDirectory(fs, path, numEntries);
+}
+
+hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path) {
+  return ptr_hdfsGetPathInfo(fs, path);
+}
+
+void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
+  ptr_hdfsFreeFileInfo(hdfsFileInfo, numEntries);
+}
+
+char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
+  GET_SYMBOL(hdfsGetHosts);
+  if (ptr_hdfsGetHosts) {
+    return ptr_hdfsGetHosts(fs, path, start, length);
+  } else {
+    return NULL;
+  }
+}
+
+void hdfsFreeHosts(char*** blockHosts) {
+  GET_SYMBOL(hdfsFreeHosts);
+  if (ptr_hdfsFreeHosts) { ptr_hdfsFreeHosts(blockHosts); }
+}
+
+tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
+  GET_SYMBOL(hdfsGetDefaultBlockSize);
+  if (ptr_hdfsGetDefaultBlockSize) {
+    return ptr_hdfsGetDefaultBlockSize(fs);
+  } else {
+    return 0;
+  }
+}
+
+tOffset hdfsGetCapacity(hdfsFS fs) {
+  return ptr_hdfsGetCapacity(fs);
+}
+
+tOffset hdfsGetUsed(hdfsFS fs) {
+  return ptr_hdfsGetUsed(fs);
+}
+
+int hdfsChown(hdfsFS fs, const char* path, const char* owner, const char* group) {
+  GET_SYMBOL(hdfsChown);
+  if (ptr_hdfsChown) {
+    return ptr_hdfsChown(fs, path, owner, group);
+  } else {
+    return 0;
+  }
+}
+
+int hdfsChmod(hdfsFS fs, const char* path, short mode) {  // NOLINT
+  GET_SYMBOL(hdfsChmod);
+  if (ptr_hdfsChmod) {
+    return ptr_hdfsChmod(fs, path, mode);
+  } else {
+    return 0;
+  }
+}
+
+int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
+  GET_SYMBOL(hdfsUtime);
+  if (ptr_hdfsUtime) {
+    return ptr_hdfsUtime(fs, path, mtime, atime);
+  } else {
+    return 0;
+  }
+}
+
+static std::vector<fs::path> get_potential_libhdfs_paths() {
+  std::vector<fs::path> libhdfs_potential_paths = {
+      // find one in the local directory
+      fs::path("./libhdfs.so"), fs::path("./hdfs.dll"),
+      // find a global libhdfs.so
+      fs::path("libhdfs.so"), fs::path("hdfs.dll"),
+  };
+
+  const char* hadoop_home = std::getenv("HADOOP_HOME");
+  if (hadoop_home != nullptr) {
+    auto path = fs::path(hadoop_home) / "lib/native/libhdfs.so";
+    libhdfs_potential_paths.push_back(path);
+  }
+  return libhdfs_potential_paths;
+}
+
+static std::vector<fs::path> get_potential_libjvm_paths() {
+  std::vector<fs::path> libjvm_potential_paths;
+
+  std::vector<fs::path> search_prefixes;
+  std::vector<fs::path> search_suffixes;
+  std::string file_name;
+
+// From heuristics
+#ifdef __WIN32
+  search_prefixes = {""};
+  search_suffixes = {"/jre/bin/server", "/bin/server"};
+  file_name = "jvm.dll";
+#elif __APPLE__
+  search_prefixes = {""};
+  search_suffixes = {""};
+  file_name = "libjvm.dylib";
+
+// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
+// expecting users to set an environment variable
+#else
+  search_prefixes = {
+      "/usr/lib/jvm/default-java",                // ubuntu / debian distros
+      "/usr/lib/jvm/java",                        // rhel6
+      "/usr/lib/jvm",                             // centos6
+      "/usr/lib64/jvm",                           // opensuse 13
+      "/usr/local/lib/jvm/default-java",          // alt ubuntu / debian distros
+      "/usr/local/lib/jvm/java",                  // alt rhel6
+      "/usr/local/lib/jvm",                       // alt centos6
+      "/usr/local/lib64/jvm",                     // alt opensuse 13
+      "/usr/local/lib/jvm/java-7-openjdk-amd64",  // alt ubuntu / debian distros
+      "/usr/lib/jvm/java-7-openjdk-amd64",        // alt ubuntu / debian distros
+      "/usr/local/lib/jvm/java-6-openjdk-amd64",  // alt ubuntu / debian distros
+      "/usr/lib/jvm/java-6-openjdk-amd64",        // alt ubuntu / debian distros
+      "/usr/lib/jvm/java-7-oracle",               // alt ubuntu
+      "/usr/lib/jvm/java-8-oracle",               // alt ubuntu
+      "/usr/lib/jvm/java-6-oracle",               // alt ubuntu
+      "/usr/local/lib/jvm/java-7-oracle",         // alt ubuntu
+      "/usr/local/lib/jvm/java-8-oracle",         // alt ubuntu
+      "/usr/local/lib/jvm/java-6-oracle",         // alt ubuntu
+      "/usr/lib/jvm/default",                     // alt centos
+      "/usr/java/latest",                         // alt centos
+  };
+  search_suffixes = {"/jre/lib/amd64/server"};
+  file_name = "libjvm.so";
+#endif
+  // From direct environment variable
+  char* env_value = NULL;
+  if ((env_value = getenv("JAVA_HOME")) != NULL) {
+    // logstream(LOG_INFO) << "Found environment variable " << env_name << ": " <<
+    // env_value << std::endl;
+    search_prefixes.insert(search_prefixes.begin(), env_value);
+  }
+
+  // Generate cross product between search_prefixes, search_suffixes, and file_name
+  for (auto& prefix : search_prefixes) {
+    for (auto& suffix : search_suffixes) {
+      auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name));
+      libjvm_potential_paths.push_back(path);
+    }
+  }
+
+  return libjvm_potential_paths;
+}
+
+#ifndef _WIN32
+static arrow::Status try_dlopen(
+    std::vector<fs::path> potential_paths, const char* name, void*& out_handle) {
+  std::vector<std::string> error_messages;
+
+  for (auto& i : potential_paths) {
+    i.make_preferred();
+    // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
+    out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL);
+
+    if (out_handle != NULL) {
+      // logstream(LOG_INFO) << "Success!" << std::endl;
+      break;
+    } else {
+      const char* err_msg = dlerror();
+      if (err_msg != NULL) {
+        error_messages.push_back(std::string(err_msg));
+      } else {
+        error_messages.push_back(std::string(" returned NULL"));
+      }
+    }
+  }
+
+  if (out_handle == NULL) {
+    std::stringstream ss;
+    ss << "Unable to load " << name;
+    return arrow::Status::IOError(ss.str());
+  }
+
+  return arrow::Status::OK();
+}
+
+#else
+static arrow::Status try_dlopen(
+    std::vector<fs::path> potential_paths, const char* name, HINSTANCE& out_handle) {
+  std::vector<std::string> error_messages;
+
+  for (auto& i : potential_paths) {
+    i.make_preferred();
+    // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
+
+    out_handle = LoadLibrary(i.string().c_str());
+
+    if (out_handle != NULL) {
+      // logstream(LOG_INFO) << "Success!" << std::endl;
+      break;
+    } else {
+      // error_messages.push_back(get_last_err_str(GetLastError()));
+    }
+  }
+
+  if (out_handle == NULL) {
+    std::stringstream ss;
+    ss << "Unable to load " << name;
+    return arrow::Status::IOError(ss.str());
+  }
+
+  return arrow::Status::OK();
+}
+#endif  // _WIN32
+
+}  // extern "C"
+
+#define GET_SYMBOL_REQUIRED(SYMBOL_NAME)                                           \
+  do {                                                                             \
+    if (!ptr_##SYMBOL_NAME) {                                                      \
+      *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \
+    }                                                                              \
+    if (!ptr_##SYMBOL_NAME)                                                        \
+      return Status::IOError("Getting symbol " #SYMBOL_NAME "failed");             \
+  } while (0)
+
+namespace arrow {
+namespace io {
+
+Status ConnectLibHdfs() {
+  static std::mutex lock;
+  std::lock_guard<std::mutex> guard(lock);
+
+  static bool shim_attempted = false;
+  if (!shim_attempted) {
+    shim_attempted = true;
+
+    std::vector<fs::path> libjvm_potential_paths = get_potential_libjvm_paths();
+    RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle));
+
+    std::vector<fs::path> libhdfs_potential_paths = get_potential_libhdfs_paths();
+    RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", libhdfs_handle));
+  } else if (libhdfs_handle == nullptr) {
+    return Status::IOError("Prior attempt to load libhdfs failed");
+  }
+
+  GET_SYMBOL_REQUIRED(hdfsConnect);
+  GET_SYMBOL_REQUIRED(hdfsConnectAsUser);
+  GET_SYMBOL_REQUIRED(hdfsCreateDirectory);
+  GET_SYMBOL_REQUIRED(hdfsDelete);
+  GET_SYMBOL_REQUIRED(hdfsDisconnect);
+  GET_SYMBOL_REQUIRED(hdfsExists);
+  GET_SYMBOL_REQUIRED(hdfsFreeFileInfo);
+  GET_SYMBOL_REQUIRED(hdfsGetCapacity);
+  GET_SYMBOL_REQUIRED(hdfsGetUsed);
+  GET_SYMBOL_REQUIRED(hdfsGetPathInfo);
+  GET_SYMBOL_REQUIRED(hdfsListDirectory);
+
+  // File methods
+  GET_SYMBOL_REQUIRED(hdfsCloseFile);
+  GET_SYMBOL_REQUIRED(hdfsFlush);
+  GET_SYMBOL_REQUIRED(hdfsOpenFile);
+  GET_SYMBOL_REQUIRED(hdfsRead);
+  GET_SYMBOL_REQUIRED(hdfsPread);
+  GET_SYMBOL_REQUIRED(hdfsSeek);
+  GET_SYMBOL_REQUIRED(hdfsTell);
+  GET_SYMBOL_REQUIRED(hdfsWrite);
+
+  return Status::OK();
+}
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // HAS_HADOOP

http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index db779d8..edcac88 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -126,8 +126,8 @@ class TestParquetIO : public ::testing::Test {
     size_t chunk_size = values.size() / num_chunks;
     for (int i = 0; i < num_chunks; i++) {
       auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
-      auto column_writer = static_cast<ParquetWriter<TestType>*>(
-          row_group_writer->NextColumn());
+      auto column_writer =
+          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
       T* data = values.data() + i * chunk_size;
       column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
       column_writer->Close();


Mime
View raw message