hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject hadoop git commit: HDFS-11106: libhdfs++: Some refactoring to better organize files. Contributed by James Clampffer.
Date Tue, 29 Nov 2016 23:10:29 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 c252ac225 -> e658f07ad


HDFS-11106: libhdfs++: Some refactoring to better organize files.  Contributed by James Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e658f07a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e658f07a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e658f07a

Branch: refs/heads/HDFS-8707
Commit: e658f07ad1b1a33d13c446dadf592bba700e696a
Parents: c252ac2
Author: James <jhc@apache.org>
Authored: Tue Nov 29 18:09:53 2016 -0500
Committer: James <jhc@apache.org>
Committed: Tue Nov 29 18:09:53 2016 -0500

----------------------------------------------------------------------
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../main/native/libhdfspp/lib/common/base64.cc  |  73 ---
 .../libhdfspp/lib/common/hdfs_ioservice.cc      |  51 ++
 .../libhdfspp/lib/common/hdfs_ioservice.h       |  46 ++
 .../libhdfspp/lib/common/hdfs_public_api.cc     |  47 --
 .../libhdfspp/lib/common/hdfs_public_api.h      |  39 --
 .../main/native/libhdfspp/lib/common/util.cc    |  51 ++
 .../lib/connection/datanodeconnection.h         |   2 +-
 .../main/native/libhdfspp/lib/fs/CMakeLists.txt |   2 +-
 .../main/native/libhdfspp/lib/fs/filehandle.h   |   2 +-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  | 492 +---------------
 .../native/libhdfspp/lib/fs/filesystem_sync.cc  | 555 +++++++++++++++++++
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   2 +-
 .../libhdfspp/tests/sasl_digest_md5_test.cc     |   3 +
 14 files changed, 712 insertions(+), 655 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 501d3b5..bdeb068 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc)
+add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc)
 add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc
deleted file mode 100644
index 76d10e6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/base64.cc
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "util.h"
-
-#include <array>
-#include <functional>
-#include <algorithm>
-
-namespace hdfs {
-
-std::string Base64Encode(const std::string &src) {
-  //encoded size is (sizeof(buf) + 2) / 3 * 4
-  static const std::string base64_chars =
-               "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
-               "abcdefghijklmnopqrstuvwxyz"
-               "0123456789+/";
-  std::string ret;
-  int i = 0;
-  int j = 0;
-  unsigned char char_array_3[3];
-  unsigned char char_array_4[4];
-  unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]);
-  unsigned int in_len = src.size();
-
-  while (in_len--) {
-    char_array_3[i++] = *(bytes_to_encode++);
-    if (i == 3) {
-      char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
-      char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
-      char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
-      char_array_4[3] = char_array_3[2] & 0x3f;
-
-      for(i = 0; (i <4) ; i++)
-        ret += base64_chars[char_array_4[i]];
-      i = 0;
-    }
-  }
-
-  if (i)  {
-    for(j = i; j < 3; j++)
-      char_array_3[j] = '\0';
-
-    char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
-    char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
-    char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
-    char_array_4[3] = char_array_3[2] & 0x3f;
-
-    for (j = 0; (j < i + 1); j++)
-      ret += base64_chars[char_array_4[j]];
-
-    while((i++ < 3))
-      ret += '=';
-  }
-  return ret;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
new file mode 100644
index 0000000..f6876cd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hdfs_ioservice.h"
+
+#include "common/logging.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() { return new IoServiceImpl(); }
+
+void IoServiceImpl::Run() {
+  // The IoService executes callbacks provided by library users in the context of worker threads,
+  // there is no way of preventing those callbacks from throwing but we can at least prevent them
+  // from escaping this library and crashing the process.
+
+  // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
+  asio::io_service::work work(io_service_);
+  for(;;)
+  {
+    try
+    {
+      io_service_.run();
+      break;
+    } catch (const std::exception & e) {
+      LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what());
+    } catch (...) {
+      LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread");
+    }
+  }
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
new file mode 100644
index 0000000..73d167e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_IOSERVICE_H_
+#define COMMON_HDFS_IOSERVICE_H_
+
+#include "hdfspp/hdfspp.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+/*
+ *  A thin wrapper over the asio::io_service.
+ *    -In the future this could own the worker threads that execute io tasks which
+ *     makes it easier to share IoServices between FileSystems. See HDFS-10796 for
+ *     rationale.
+ */
+
+class IoServiceImpl : public IoService {
+ public:
+  virtual void Run() override;
+  virtual void Stop() override { io_service_.stop(); }
+  ::asio::io_service &io_service() { return io_service_; }
+ private:
+  ::asio::io_service io_service_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
deleted file mode 100644
index 188071d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "hdfs_public_api.h"
-
-#include "common/logging.h"
-
-namespace hdfs {
-
-IoService::~IoService() {}
-
-IoService *IoService::New() { return new IoServiceImpl(); }
-
-void IoServiceImpl::Run() {
-  // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
-  asio::io_service::work work(io_service_);
-  for(;;)
-  {
-    try
-    {
-      io_service_.run();
-      break;
-    } catch (const std::exception & e) {
-      LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what());
-    } catch (...) {
-      LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread");
-    }
-  }
-}
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h
deleted file mode 100644
index c8fea5e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef COMMON_HDFS_PUBLIC_API_H_
-#define COMMON_HDFS_PUBLIC_API_H_
-
-#include "hdfspp/hdfspp.h"
-
-#include <asio/io_service.hpp>
-
-namespace hdfs {
-
-class IoServiceImpl : public IoService {
- public:
-  virtual void Run() override;
-  virtual void Stop() override { io_service_.stop(); }
-  ::asio::io_service &io_service() { return io_service_; }
- private:
-  ::asio::io_service io_service_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
index ede6acd..375f951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -20,12 +20,14 @@
 #include "common/util_c.h"
 
 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
 #include <exception>
 #include <sstream>
 #include <iostream>
 #include <iomanip>
 #include <thread>
 
+
 namespace hdfs {
 
 bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
@@ -73,6 +75,53 @@ std::string GetRandomClientName() {
   return oss.str();
 }
 
+std::string Base64Encode(const std::string &src) {
+  //encoded size is (sizeof(buf) + 2) / 3 * 4
+  static const std::string base64_chars =
+               "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+               "abcdefghijklmnopqrstuvwxyz"
+               "0123456789+/";
+  std::string ret;
+  int i = 0;
+  int j = 0;
+  unsigned char char_array_3[3];
+  unsigned char char_array_4[4];
+  unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]);
+  unsigned int in_len = src.size();
+
+  while (in_len--) {
+    char_array_3[i++] = *(bytes_to_encode++);
+    if (i == 3) {
+      char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
+      char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
+      char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
+      char_array_4[3] = char_array_3[2] & 0x3f;
+
+      for(i = 0; (i <4) ; i++)
+        ret += base64_chars[char_array_4[i]];
+      i = 0;
+    }
+  }
+
+  if (i)  {
+    for(j = i; j < 3; j++)
+      char_array_3[j] = '\0';
+
+    char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
+    char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
+    char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
+    char_array_4[3] = char_array_3[2] & 0x3f;
+
+    for (j = 0; (j < i + 1); j++)
+      ret += base64_chars[char_array_4[j]];
+
+    while((i++ < 3))
+      ret += '=';
+  }
+  return ret;
+}
+
+
 std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
   std::string err;
   if(sock && sock->is_open()) {
@@ -117,3 +166,5 @@ bool IsHighBitSet(uint64_t num) {
 void ShutdownProtobufLibrary_C() {
   google::protobuf::ShutdownProtobufLibrary();
 }
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
index 0ff6fc4..21193b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -18,7 +18,7 @@
 #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
 #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
 
-#include "common/hdfs_public_api.h"
+#include "common/hdfs_ioservice.h"
 #include "common/async_stream.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "common/libhdfs_events_impl.h"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
index 0bce70d..624cda5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -16,6 +16,6 @@
 # limitations under the License.
 #
 
-add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc)
+add_library(fs_obj OBJECT filesystem.cc filesystem_sync.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc)
 add_dependencies(fs_obj proto)
 add_library(fs $<TARGET_OBJECTS:fs_obj>)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index 7e7c79d..14f7ea8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -18,7 +18,7 @@
 #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
 #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
 
-#include "common/hdfs_public_api.h"
+#include "common/hdfs_ioservice.h"
 #include "common/async_stream.h"
 #include "common/cancel_tracker.h"
 #include "common/libhdfs_events_impl.h"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 5d5b9f2..d805716 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -32,8 +32,7 @@
 
 namespace hdfs {
 
-static const char kNamenodeProtocol[] =
-    "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
 static const int kNamenodeProtocolVersion = 1;
 
 using ::asio::ip::tcp;
@@ -203,26 +202,6 @@ void FileSystemImpl::Connect(const std::string &server,
   });
 }
 
-Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
-  LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
-                        << ", server=" << server << ", service=" << service << ") called");
-
-  /* synchronized */
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future = stat->get_future();
-
-  auto callback = [stat](const Status &s, FileSystem *fs) {
-    (void)fs;
-    stat->set_value(s);
-  };
-
-  Connect(server, service, callback);
-
-  /* block until promise is set */
-  auto s = future.get();
-
-  return s;
-}
 
 void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
   std::string scheme = options_.defaultFS.get_scheme();
@@ -248,25 +227,6 @@ void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &,
   Connect(host, port_as_string, handler);
 }
 
-Status FileSystemImpl::ConnectToDefaultFs() {
-  auto stat = std::make_shared<std::promise<Status>>();
-  std::future<Status> future = stat->get_future();
-
-  auto callback = [stat](const Status &s, FileSystem *fs) {
-    (void)fs;
-    stat->set_value(s);
-  };
-
-  ConnectToDefaultFs(callback);
-
-  /* block until promise is set */
-  auto s = future.get();
-
-  return s;
-}
-
-
-
 int FileSystemImpl::AddWorkerThread() {
   LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
                                   << FMT_THIS_ADDR << ") called."
@@ -297,38 +257,6 @@ void FileSystemImpl::Open(
   });
 }
 
-Status FileSystemImpl::Open(const std::string &path,
-                                         FileHandle **handle) {
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ") called");
-
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
-  std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
-
-  /* wrap async FileSystem::Open with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, FileHandle *is) {
-    callstate->set_value(std::make_tuple(s, is));
-  };
-
-  Open(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  FileHandle *file_handle = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    delete file_handle;
-    return stat;
-  }
-  if (!file_handle) {
-    return stat;
-  }
-
-  *handle = file_handle;
-  return stat;
-}
 
 BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
 {
@@ -411,39 +339,6 @@ void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset
   nn_.GetBlockLocations(path, offset, length, conversion);
 }
 
-Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
-  std::shared_ptr<FileBlockLocation> * fileBlockLocations)
-{
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ") called");
-
-  if (!fileBlockLocations)
-    return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
-
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>();
-  std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future());
-
-  /* wrap async call with promise/future to make it blocking */
-  auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) {
-    callstate->set_value(std::make_tuple(s,blockInfo));
-  };
-
-  GetBlockLocations(path, offset, length, callback);
-
-  /* wait for async to finish */
-  auto returnstate = future.get();
-  auto stat = std::get<0>(returnstate);
-
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  *fileBlockLocations = std::get<1>(returnstate);
-
-  return stat;
-}
-
 void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
     const std::function<void(const Status &, const uint64_t &)> &handler) {
   LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
@@ -453,33 +348,6 @@ void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
   nn_.GetPreferredBlockSize(path, handler);
 }
 
-Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) {
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ") called");
-
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>();
-  std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
-
-  /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, const uint64_t & bsize) {
-    callstate->set_value(std::make_tuple(s, bsize));
-  };
-
-  GetPreferredBlockSize(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  uint64_t size = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  block_size = size;
-  return stat;
-}
 
 void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) {
   LOG_DEBUG(kFileSystem,
@@ -499,27 +367,6 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio
   nn_.SetReplication(path, replication, handler);
 }
 
-Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
-      ", replication=" << replication << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::SetReplication with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  SetReplication(path, replication, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
     std::function<void(const Status &)> handler) {
@@ -535,27 +382,6 @@ void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t
   nn_.SetTimes(path, mtime, atime, handler);
 }
 
-Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
-      ", mtime=" << mtime << ", atime=" << atime << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::SetTimes with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  SetTimes(path, mtime, atime, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::GetFileInfo(
     const std::string &path,
@@ -567,34 +393,6 @@ void FileSystemImpl::GetFileInfo(
   nn_.GetFileInfo(path, handler);
 }
 
-Status FileSystemImpl::GetFileInfo(const std::string &path,
-                                         StatInfo & stat_info) {
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ") called");
-
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
-  std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
-
-  /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, const StatInfo &si) {
-    callstate->set_value(std::make_tuple(s, si));
-  };
-
-  GetFileInfo(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  StatInfo info = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  stat_info = info;
-  return stat;
-}
 
 void FileSystemImpl::GetFsStats(
     const std::function<void(const Status &, const FsInfo &)> &handler) {
@@ -604,32 +402,6 @@ void FileSystemImpl::GetFsStats(
   nn_.GetFsStats(handler);
 }
 
-Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
-
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>();
-  std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
-
-  /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */
-  auto h = [callstate](const Status &s, const FsInfo &si) {
-    callstate->set_value(std::make_tuple(s, si));
-  };
-
-  GetFsStats(h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = std::get<0>(returnstate);
-  FsInfo info = std::get<1>(returnstate);
-
-  if (!stat.ok()) {
-    return stat;
-  }
-
-  fs_info = info;
-  return stat;
-}
 
 /**
  * Helper function for recursive GetListing calls.
@@ -666,43 +438,6 @@ void FileSystemImpl::GetListing(
   nn_.GetListing(path, callback);
 }
 
-Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) {
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ") called");
-
-  if (!stat_infos) {
-    return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL");
-  }
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::GetListing with promise to make it a blocking call.
-   *
-     Keep requesting more until we get the entire listing, and don't set the promise
-   * until we have the entire listing.
-   */
-  auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool {
-    if (!si.empty()) {
-      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
-    }
-
-    bool done = !s.ok() || !has_more;
-    if (done) {
-      callstate->set_value(s);
-      return false;
-    }
-    return true;
-  };
-
-  GetListing(path, h);
-
-  /* block until promise is set */
-  Status stat = future.get();
-
-  return stat;
-}
 
 void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
     std::function<void(const Status &)> handler) {
@@ -724,27 +459,6 @@ void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool
   nn_.Mkdirs(path, permissions, createparent, handler);
 }
 
-Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
-      ", permissions=" << permissions << ", createparent=" << createparent << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  Mkdirs(path, permissions, createparent, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::Delete(const std::string &path, bool recursive,
     const std::function<void(const Status &)> &handler) {
@@ -759,26 +473,6 @@ void FileSystemImpl::Delete(const std::string &path, bool recursive,
   nn_.Delete(path, recursive, handler);
 }
 
-Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::Delete with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  Delete(path, recursive, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath,
     const std::function<void(const Status &)> &handler) {
@@ -798,26 +492,6 @@ void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPa
   nn_.Rename(oldPath, newPath, handler);
 }
 
-Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::Rename with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  Rename(oldPath, newPath, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::SetPermission(const std::string & path,
     uint16_t permissions, const std::function<void(const Status &)> &handler) {
@@ -837,25 +511,6 @@ void FileSystemImpl::SetPermission(const std::string & path,
   nn_.SetPermission(path, permissions, handler);
 }
 
-Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::SetPermission with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  SetPermission(path, permissions, h);
-
-  /* block until promise is set */
-  Status stat = future.get();
-
-  return stat;
-}
 
 void FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
     const std::string & groupname, const std::function<void(const Status &)> &handler) {
@@ -870,25 +525,6 @@ void FileSystemImpl::SetOwner(const std::string & path, const std::string & user
   nn_.SetOwner(path, username, groupname, handler);
 }
 
-Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
-                                const std::string & groupname) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::SetOwner with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  SetOwner(path, username, groupname, h);
-
-  /* block until promise is set */
-  Status stat = future.get();
-  return stat;
-}
 
 /**
  * Helper function for recursive Find calls.
@@ -1016,50 +652,6 @@ void FileSystemImpl::Find(
   nn_.GetListing("/", callback);
 }
 
-Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
-  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find("
-                                 << FMT_THIS_ADDR << ", path="
-                                 << path << ", name="
-                                 << name << ") called");
-
-  if (!stat_infos) {
-    return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL");
-  }
-
-  // In this case, we're going to have the async code populate stat_infos.
-
-  std::promise<void> promise = std::promise<void>();
-  std::future<void> future(promise.get_future());
-  Status status = Status::OK();
-
-  /**
-    * Keep requesting more until we get the entire listing. Set the promise
-    * when we have the entire listing to stop.
-    *
-    * Find guarantees that the handler will only be called once at a time,
-    * so we do not need any locking here
-    */
-  auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool {
-    if (!si.empty()) {
-      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
-    }
-    if (!s.ok() && status.ok()){
-      //We make sure we set 'status' only on the first error.
-      status = s;
-    }
-    if (!has_more_results) {
-      promise.set_value();
-      return false;
-    }
-    return true;
-  };
-
-  Find(path, name, maxdepth, h);
-
-  /* block until promise is set */
-  future.get();
-  return status;
-}
 
 void FileSystemImpl::CreateSnapshot(const std::string &path,
     const std::string &name,
@@ -1075,27 +667,6 @@ void FileSystemImpl::CreateSnapshot(const std::string &path,
   nn_.CreateSnapshot(path, name, handler);
 }
 
-Status FileSystemImpl::CreateSnapshot(const std::string &path,
-    const std::string &name) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  CreateSnapshot(path, name, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::DeleteSnapshot(const std::string &path,
     const std::string &name,
@@ -1115,27 +686,6 @@ void FileSystemImpl::DeleteSnapshot(const std::string &path,
   nn_.DeleteSnapshot(path, name, handler);
 }
 
-Status FileSystemImpl::DeleteSnapshot(const std::string &path,
-    const std::string &name) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  DeleteSnapshot(path, name, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::AllowSnapshot(const std::string &path,
     const std::function<void(const Status &)> &handler) {
@@ -1150,26 +700,6 @@ void FileSystemImpl::AllowSnapshot(const std::string &path,
   nn_.AllowSnapshot(path, handler);
 }
 
-Status FileSystemImpl::AllowSnapshot(const std::string &path) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  AllowSnapshot(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::DisallowSnapshot(const std::string &path,
     const std::function<void(const Status &)> &handler) {
@@ -1184,26 +714,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path,
   nn_.DisallowSnapshot(path, handler);
 }
 
-Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
-  LOG_DEBUG(kFileSystem,
-      << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
-
-  auto callstate = std::make_shared<std::promise<Status>>();
-  std::future<Status> future(callstate->get_future());
-
-  /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
-  auto h = [callstate](const Status &s) {
-    callstate->set_value(s);
-  };
-
-  DisallowSnapshot(path, h);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-  Status stat = returnstate;
-
-  return stat;
-}
 
 void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
   // It is far too easy to destroy the filesystem (and thus the threadpool)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
new file mode 100644
index 0000000..73be538
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem_sync.cc
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include <future>
+#include <tuple>
+
+#define FMT_THIS_ADDR "this=" << (void*)this
+
+// Note: This is just a place to hold boilerplate async to sync shim code,
+//       place actual filesystem logic in filesystem.cc
+//
+//
+// Shim pattern pseudocode
+//
+// Status MySynchronizedMethod(method_args):
+//  let stat = a promise<Status> wrapped in a shared_ptr
+//
+//  Create a lambda that captures stat and any other variables that need to
+//  be set based on the async operation.  When invoked set variables with the
+//  arguments passed (possibly do some translation), then set stat to indicate
+//  the return status of the async call.
+//
+//  invoke MyAsyncMethod(method_args, handler_lambda)
+//
+//  block until stat value has been set while async work takes place
+//
+//  return stat
+
+namespace hdfs {
+
+Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
+  LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
+                        << ", server=" << server << ", service=" << service << ") called");
+
+  /* synchronized */
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
+    stat->set_value(s);
+  };
+
+  Connect(server, service, callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+Status FileSystemImpl::ConnectToDefaultFs() {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future = stat->get_future();
+
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
+    stat->set_value(s);
+  };
+
+  ConnectToDefaultFs(callback);
+
+  /* block until promise is set */
+  auto s = future.get();
+
+  return s;
+}
+
+
+Status FileSystemImpl::Open(const std::string &path,
+                                         FileHandle **handle) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
+  std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
+
+  /* wrap async FileSystem::Open with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, FileHandle *is) {
+    callstate->set_value(std::make_tuple(s, is));
+  };
+
+  Open(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FileHandle *file_handle = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    delete file_handle;
+    return stat;
+  }
+  if (!file_handle) {
+    return stat;
+  }
+
+  *handle = file_handle;
+  return stat;
+}
+
+Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
+  std::shared_ptr<FileBlockLocation> * fileBlockLocations)
+{
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  if (!fileBlockLocations)
+    return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>();
+  std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future());
+
+  /* wrap async call with promise/future to make it blocking */
+  auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) {
+    callstate->set_value(std::make_tuple(s,blockInfo));
+  };
+
+  GetBlockLocations(path, offset, length, callback);
+
+  /* wait for async to finish */
+  auto returnstate = future.get();
+  auto stat = std::get<0>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *fileBlockLocations = std::get<1>(returnstate);
+
+  return stat;
+}
+
+Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>();
+  std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, const uint64_t & bsize) {
+    callstate->set_value(std::make_tuple(s, bsize));
+  };
+
+  GetPreferredBlockSize(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  uint64_t size = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  block_size = size;
+  return stat;
+}
+
+Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", replication=" << replication << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetReplication with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetReplication(path, replication, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", mtime=" << mtime << ", atime=" << atime << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetTimes with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetTimes(path, mtime, atime, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::GetFileInfo(const std::string &path,
+                                         StatInfo & stat_info) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
+  std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, const StatInfo &si) {
+    callstate->set_value(std::make_tuple(s, si));
+  };
+
+  GetFileInfo(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  StatInfo info = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  stat_info = info;
+  return stat;
+}
+
+Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
+
+  auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>();
+  std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetFsStats with promise to make it a blocking call */
+  auto h = [callstate](const Status &s, const FsInfo &si) {
+    callstate->set_value(std::make_tuple(s, si));
+  };
+
+  GetFsStats(h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = std::get<0>(returnstate);
+  FsInfo info = std::get<1>(returnstate);
+
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  fs_info = info;
+  return stat;
+}
+
+Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ") called");
+
+  if (!stat_infos) {
+    return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL");
+  }
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::GetListing with promise to make it a blocking call.
+   *
+     Keep requesting more until we get the entire listing, and don't set the promise
+   * until we have the entire listing.
+   */
+  auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool {
+    if (!si.empty()) {
+      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
+    }
+
+    bool done = !s.ok() || !has_more;
+    if (done) {
+      callstate->set_value(s);
+      return false;
+    }
+    return true;
+  };
+
+  GetListing(path, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+
+  return stat;
+}
+
+Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
+      ", permissions=" << permissions << ", createparent=" << createparent << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Mkdirs(path, permissions, createparent, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Delete with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Delete(path, recursive, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::Rename with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  Rename(oldPath, newPath, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetPermission with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetPermission(path, permissions, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+
+  return stat;
+}
+
+Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
+                                const std::string & groupname) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::SetOwner with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  SetOwner(path, username, groupname, h);
+
+  /* block until promise is set */
+  Status stat = future.get();
+  return stat;
+}
+
+Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
+  LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find("
+                                 << FMT_THIS_ADDR << ", path="
+                                 << path << ", name="
+                                 << name << ") called");
+
+  if (!stat_infos) {
+    return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL");
+  }
+
+  // In this case, we're going to have the async code populate stat_infos.
+
+  std::promise<void> promise = std::promise<void>();
+  std::future<void> future(promise.get_future());
+  Status status = Status::OK();
+
+  /**
+    * Keep requesting more until we get the entire listing. Set the promise
+    * when we have the entire listing to stop.
+    *
+    * Find guarantees that the handler will only be called once at a time,
+    * so we do not need any locking here
+    */
+  auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool {
+    if (!si.empty()) {
+      stat_infos->insert(stat_infos->end(), si.begin(), si.end());
+    }
+    if (!s.ok() && status.ok()){
+      //We make sure we set 'status' only on the first error.
+      status = s;
+    }
+    if (!has_more_results) {
+      promise.set_value();
+      return false;
+    }
+    return true;
+  };
+
+  Find(path, name, maxdepth, h);
+
+  /* block until promise is set */
+  future.get();
+  return status;
+}
+
+Status FileSystemImpl::CreateSnapshot(const std::string &path,
+    const std::string &name) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  CreateSnapshot(path, name, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::DeleteSnapshot(const std::string &path,
+    const std::string &name) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  DeleteSnapshot(path, name, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::AllowSnapshot(const std::string &path) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  AllowSnapshot(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
+  LOG_DEBUG(kFileSystem,
+      << "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
+
+  auto callstate = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(callstate->get_future());
+
+  /* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
+  auto h = [callstate](const Status &s) {
+    callstate->set_value(s);
+  };
+
+  DisallowSnapshot(path, h);
+
+  /* block until promise is set */
+  auto returnstate = future.get();
+  Status stat = returnstate;
+
+  return stat;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 032dfc8..3dd4aae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -72,7 +72,7 @@ target_link_libraries(remote_block_reader_test test_common reader proto common c
 add_memcheck_test(remote_block_reader remote_block_reader_test)
 
 add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
-target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(sasl_digest_md5_test common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(sasl_digest_md5 sasl_digest_md5_test)
 
 add_executable(retry_policy_test retry_policy_test.cc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e658f07a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
index 0797853..553ffa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc
@@ -18,6 +18,7 @@
 #include "common/sasl_authenticator.h"
 
 #include <gtest/gtest.h>
+#include <google/protobuf/stubs/common.h>
 
 namespace hdfs {
 
@@ -40,5 +41,7 @@ TEST(DigestMD5AuthenticatorTest, TestResponse) {
   ASSERT_TRUE(status.ok());
   ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") !=
               std::string::npos);
+
+  google::protobuf::ShutdownProtobufLibrary();
 }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message