hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/4] hadoop git commit: HDFS-10785: libhdfs++: Implement the rest of the tools. Contributed by Anatoli Schein
Date Tue, 13 Jun 2017 15:28:16 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 5ae34ac0d -> 40e3290b9


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40e3290b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc
new file mode 100644
index 0000000..133bbc9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc
@@ -0,0 +1,115 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+*/
+
+#include "tools_common.h"
+
+namespace hdfs {
+
+  std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout)
{
+    hdfs::Options options;
+    //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
+    hdfs::ConfigurationLoader loader;
+    //Loading default config files core-site.xml and hdfs-site.xml from the config path
+    hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
+    //TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
+    if(config){
+      //Loading options from the config
+      options = config->GetOptions();
+    }
+    if(max_timeout){
+      //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async
calls to finish
+      options.rpc_timeout = std::numeric_limits<int>::max();
+    }
+    IoService * io_service = IoService::New();
+    //Wrapping fs into a shared pointer to guarantee deletion
+    std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
+    if (!fs) {
+      std::cerr << "Could not create FileSystem object. " << std::endl;
+      exit(EXIT_FAILURE);
+    }
+    Status status;
+    //Check if the user supplied the host
+    if(!uri.get_host().empty()){
+      //If port is supplied we use it, otherwise we use the empty string so that it will
be looked up in configs.
+      std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : "";
+      status = fs->Connect(uri.get_host(), port);
+      if (!status.ok()) {
+        std::cerr << "Could not connect to " << uri.get_host() << ":" <<
port << ". " << status.ToString() << std::endl;
+        exit(EXIT_FAILURE);
+      }
+    } else {
+      status = fs->ConnectToDefaultFs();
+      if (!status.ok()) {
+        if(!options.defaultFS.get_host().empty()){
+          std::cerr << "Error connecting to " << options.defaultFS << ".
" << status.ToString() << std::endl;
+        } else {
+          std::cerr << "Error connecting to the cluster: defaultFS is empty. " <<
status.ToString() << std::endl;
+        }
+        exit(EXIT_FAILURE);
+      }
+    }
+    return fs;
+  }
+
+  #define BUF_SIZE 1048576 //1 MB
+  static char input_buffer[BUF_SIZE];
+
+  void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset,
std::FILE* dst_file, bool to_delete) {
+    ssize_t total_bytes_read = 0;
+    size_t last_bytes_read = 0;
+
+    hdfs::FileHandle *file_raw = nullptr;
+    hdfs::Status status = fs->Open(path, &file_raw);
+    if (!status.ok()) {
+      std::cerr << "Could not open file " << path << ". " << status.ToString()
<< std::endl;
+      exit(EXIT_FAILURE);
+    }
+    //wrapping file_raw into a unique pointer to guarantee deletion
+    std::unique_ptr<hdfs::FileHandle> file(file_raw);
+
+    do{
+      //Reading file chunks
+      status = file->PositionRead(input_buffer, sizeof(input_buffer), offset, &last_bytes_read);
+      if(status.ok()) {
+        //Writing file chunks to stdout
+        fwrite(input_buffer, last_bytes_read, 1, dst_file);
+        total_bytes_read += last_bytes_read;
+        offset += last_bytes_read;
+      } else {
+        if(status.is_invalid_offset()){
+          //Reached the end of the file
+          if(to_delete) {
+            //Deleting the file (recursive set to false)
+            hdfs::Status status = fs->Delete(path, false);
+            if (!status.ok()) {
+              std::cerr << "Error deleting the source file: " << path
+                << " " << status.ToString() << std::endl;
+              exit(EXIT_FAILURE);
+            }
+          }
+          break;
+        } else {
+          std::cerr << "Error reading the file: " << status.ToString() <<
std::endl;
+          exit(EXIT_FAILURE);
+        }
+      }
+    } while (last_bytes_read > 0);
+    return;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40e3290b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp
deleted file mode 100644
index af882ce..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-*/
-
-#include "tools_common.h"
-
-namespace hdfs {
-
-  std::shared_ptr<hdfs::Options> getOptions() {
-    std::shared_ptr<hdfs::Options> options = std::make_shared<hdfs::Options>();
-    //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
-    hdfs::ConfigurationLoader loader;
-    //Loading default config files core-site.xml and hdfs-site.xml from the config path
-    hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
-    //TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
-    if(config){
-      //Loading options from the config
-      *options = config->GetOptions();
-    }
-    return options;
-  }
-
-  std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options &
options) {
-    IoService * io_service = IoService::New();
-    //Wrapping fs into a shared pointer to guarantee deletion
-    std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
-    if (!fs) {
-      std::cerr << "Could not create FileSystem object. " << std::endl;
-      exit(EXIT_FAILURE);
-    }
-    Status status;
-    //Check if the user supplied the host
-    if(!uri.get_host().empty()){
-      //If port is supplied we use it, otherwise we use the empty string so that it will
be looked up in configs.
-      std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : "";
-      status = fs->Connect(uri.get_host(), port);
-      if (!status.ok()) {
-        std::cerr << "Could not connect to " << uri.get_host() << ":" <<
port << ". " << status.ToString() << std::endl;
-        exit(EXIT_FAILURE);
-      }
-    } else {
-      status = fs->ConnectToDefaultFs();
-      if (!status.ok()) {
-        if(!options.defaultFS.get_host().empty()){
-          std::cerr << "Error connecting to " << options.defaultFS << ".
" << status.ToString() << std::endl;
-        } else {
-          std::cerr << "Error connecting to the cluster: defaultFS is empty. " <<
status.ToString() << std::endl;
-        }
-        exit(EXIT_FAILURE);
-      }
-    }
-    return fs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40e3290b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h
index 858fc4b..f7c5e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h
@@ -23,17 +23,15 @@
 #include "hdfspp/hdfspp.h"
 #include "common/hdfs_configuration.h"
 #include "common/configuration_loader.h"
-
 #include <mutex>
 
 namespace hdfs {
 
-  //Pull configurations and get the Options object
-  std::shared_ptr<hdfs::Options> getOptions();
-
   //Build all necessary objects and perform the connection
-  std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options &
options);
+  std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout);
 
+  //Open HDFS file at offset, read it to destination file, optionally delete source file
+  void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset,
std::FILE* dst_file, bool to_delete);
 }
 
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message