Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B306E200C26 for ; Sat, 11 Feb 2017 05:37:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B17EB160B69; Sat, 11 Feb 2017 04:37:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8428D160B5C for ; Sat, 11 Feb 2017 05:37:55 +0100 (CET) Received: (qmail 11545 invoked by uid 500); 11 Feb 2017 04:37:54 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 11536 invoked by uid 99); 11 Feb 2017 04:37:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Feb 2017 04:37:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 12A00C1446 for ; Sat, 11 Feb 2017 04:37:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Mpo6gp30EfN9 for ; Sat, 11 Feb 2017 04:37:50 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0A92C5F39F for ; Sat, 11 Feb 2017 04:37:48 +0000 (UTC) Received: (qmail 11524 invoked by uid 99); 11 Feb 2017 04:37:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Feb 2017 04:37:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F748F216D; Sat, 11 Feb 2017 04:37:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Sat, 11 Feb 2017 04:37:49 -0000 Message-Id: <8ef2aa90b06c49b183e0dc80c506dd39@git.apache.org> In-Reply-To: <6ff16b5f1fb543c0ba6e0dcf8e19c412@git.apache.org> References: <6ff16b5f1fb543c0ba6e0dcf8e19c412@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-quickstep git commit: Added HDFS Support For TextScanWorkOrder. archived-at: Sat, 11 Feb 2017 04:37:56 -0000 Added HDFS Support For TextScanWorkOrder. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ad8a0467 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ad8a0467 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ad8a0467 Branch: refs/heads/hdfs_text_scan Commit: ad8a0467ce3e4c5b02746420d8efb1e10297d3ff Parents: ab46d78 Author: Zuyu Zhang Authored: Mon Feb 6 14:42:42 2017 -0800 Committer: Zuyu Zhang Committed: Fri Feb 10 20:37:39 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Executor.cpp | 2 +- query_execution/CMakeLists.txt | 1 + query_execution/Shiftboss.cpp | 3 +- query_execution/Shiftboss.hpp | 14 +++ .../DistributedExecutionGeneratorTestRunner.cpp | 3 +- relational_operators/CMakeLists.txt | 5 + relational_operators/TextScanOperator.cpp | 107 ++++++++++++++++--- relational_operators/TextScanOperator.hpp | 10 +- relational_operators/WorkOrderFactory.cpp | 6 +- relational_operators/WorkOrderFactory.hpp | 4 +- storage/FileManagerHdfs.hpp | 9 ++ storage/StorageManager.cpp | 9 ++ storage/StorageManager.hpp | 8 +- 13 files changed, 160 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/cli/distributed/Executor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp index 1d03579..3485298 100644 --- a/cli/distributed/Executor.cpp +++ b/cli/distributed/Executor.cpp @@ -76,7 +76,7 @@ void Executor::init() { data_exchanger_.start(); shiftboss_ = - make_unique(&bus_, storage_manager_.get(), worker_directory_.get()); + make_unique(&bus_, storage_manager_.get(), worker_directory_.get(), storage_manager_->hdfs()); shiftboss_->start(); for (const auto &worker : workers_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 5ad6999..3a69f77 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -292,6 +292,7 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_WorkerMessage quickstep_relationaloperators_RebuildWorkOrder quickstep_relationaloperators_WorkOrderFactory + quickstep_storage_Flags quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageManager http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 2ed42d0..bae5205 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -104,7 +104,8 @@ void Shiftboss::run() { query_contexts_[query_id].get(), storage_manager_, shiftboss_client_id_, - bus_); + bus_, + hdfs_); unique_ptr worker_message( WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 6538d48..e0b4312 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -30,6 +30,8 @@ #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/WorkerDirectory.hpp" +#include "storage/Flags.hpp" +#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS. #include "threading/Thread.hpp" #include "utility/Macros.hpp" @@ -64,6 +66,7 @@ class Shiftboss : public Thread { * @param bus A pointer to the TMB. * @param storage_manager The StorageManager to use. * @param workers A pointer to the WorkerDirectory. + * @param hdfs The HDFS connector via libhdfs3. * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned. * * @note If cpu_id is not specified, Shiftboss thread can be possibly moved @@ -72,10 +75,12 @@ class Shiftboss : public Thread { Shiftboss(tmb::MessageBus *bus, StorageManager *storage_manager, WorkerDirectory *workers, + void *hdfs, const int cpu_id = -1) : bus_(DCHECK_NOTNULL(bus)), storage_manager_(DCHECK_NOTNULL(storage_manager)), workers_(DCHECK_NOTNULL(workers)), + hdfs_(hdfs), cpu_id_(cpu_id), shiftboss_client_id_(tmb::kClientIdNone), foreman_client_id_(tmb::kClientIdNone), @@ -84,6 +89,12 @@ class Shiftboss : public Thread { // Check to have at least one Worker. DCHECK_GT(workers->getNumWorkers(), 0u); +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (FLAGS_use_hdfs) { + CHECK(hdfs_); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + shiftboss_client_id_ = bus_->Connect(); LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_; DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone); @@ -228,6 +239,9 @@ class Shiftboss : public Thread { StorageManager *storage_manager_; WorkerDirectory *workers_; + // Not owned. + void *hdfs_; + // The ID of the CPU that the Shiftboss thread can optionally be pinned to. const int cpu_id_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 2e18467..c9f5a10 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -128,7 +128,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( - make_unique(&bus_, storage_manager.get(), worker_directories_.back().get())); + make_unique(&bus_, storage_manager.get(), worker_directories_.back().get(), + storage_manager->hdfs())); storage_managers_.push_back(move(storage_manager)); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 457d58a..1693ec2 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -491,6 +491,7 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_Flags quickstep_storage_InsertDestination quickstep_types_Type quickstep_types_TypedValue @@ -500,6 +501,10 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_utility_Glob quickstep_utility_Macros tmb) +if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) + target_link_libraries(quickstep_relationaloperators_TextScanOperator + ${LIBHDFS3_LIBRARIES}) +endif(QUICKSTEP_HAVE_FILE_MANAGER_HDFS) target_link_libraries(quickstep_relationaloperators_UpdateOperator glog quickstep_catalog_CatalogRelation http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index 0a83a85..6333813 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -41,7 +41,14 @@ #include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/WorkOrder.pb.h" +#include "storage/Flags.hpp" #include "storage/InsertDestination.hpp" +#include "storage/StorageConfig.h" // For QUICKSTEP_HAVE_FILE_MANAGER_HDFS. + +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS +#include +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -205,14 +212,56 @@ void TextScanWorkOrder::execute() { std::vector vector_tuple_returned; constexpr std::size_t kSmallBufferSize = 0x4000; - char *buffer = reinterpret_cast(malloc(std::max(text_segment_size_, kSmallBufferSize))); - - // Read text segment into buffer. - FILE *file = std::fopen(filename_.c_str(), "rb"); - std::fseek(file, text_offset_, SEEK_SET); - std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file); - if (bytes_read != text_segment_size_) { - throw TextScanReadError(filename_); + const size_t buffer_size = std::max(text_segment_size_, kSmallBufferSize); + char *buffer = reinterpret_cast(malloc(buffer_size)); + + bool use_hdfs = false; + std::size_t bytes_read; + +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + hdfsFS hdfs = nullptr; + hdfsFile file_handle = nullptr; + + if (FLAGS_use_hdfs) { + use_hdfs = true; + hdfs = static_cast(hdfs_); + + file_handle = hdfsOpenFile(hdfs, filename_.c_str(), O_RDONLY, buffer_size, + 0 /* default replication */, 0 /* default block size */); + if (file_handle == nullptr) { + LOG(ERROR) << "Failed to open file " << filename_ << " with error: " << strerror(errno); + return; + } + + if (hdfsSeek(hdfs, file_handle, text_offset_)) { + LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno); + + hdfsCloseFile(hdfs, file_handle); + return; + } + + bytes_read = hdfsRead(hdfs, file_handle, buffer, text_segment_size_); + if (bytes_read != text_segment_size_) { + hdfsCloseFile(hdfs, file_handle); + throw TextScanReadError(filename_); + } + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + + FILE *file = nullptr; + if (!use_hdfs) { + // Avoid unused-private-field warning. + (void) hdfs_; + + // Read text segment into buffer. + file = std::fopen(filename_.c_str(), "rb"); + std::fseek(file, text_offset_, SEEK_SET); + bytes_read = std::fread(buffer, 1, text_segment_size_, file); + + if (bytes_read != text_segment_size_) { + std::fclose(file); + throw TextScanReadError(filename_); + } } // Locate the first newline character. @@ -266,10 +315,36 @@ void TextScanWorkOrder::execute() { // that the last tuple is very small / very large. std::size_t dynamic_read_size = 1024; std::string row_string; - std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET); + + const size_t dynamic_read_offset = text_offset_ + (end_ptr - buffer); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (hdfsSeek(hdfs, file_handle, dynamic_read_offset)) { + LOG(ERROR) << "Failed to seek in file " << filename_ << " with error: " << strerror(errno); + + hdfsCloseFile(hdfs, file_handle); + return; + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + std::fseek(file, dynamic_read_offset, SEEK_SET); + } + bool has_reached_end = false; do { - bytes_read = std::fread(buffer, 1, dynamic_read_size, file); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + bytes_read = hdfsRead(hdfs, file_handle, buffer, dynamic_read_size); + + // Read again when acrossing the HDFS block boundary. + if (bytes_read != dynamic_read_size) { + bytes_read += hdfsRead(hdfs, file_handle, buffer + bytes_read, dynamic_read_size - bytes_read); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + bytes_read = std::fread(buffer, 1, dynamic_read_size, file); + } + std::size_t bytes_to_copy = bytes_read; for (std::size_t i = 0; i < bytes_read; ++i) { @@ -303,7 +378,14 @@ void TextScanWorkOrder::execute() { } } - std::fclose(file); + if (use_hdfs) { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + hdfsCloseFile(hdfs, file_handle); +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + } else { + std::fclose(file); + } + free(buffer); // Store the tuples in a ColumnVectorsValueAccessor for bulk insert. @@ -334,7 +416,8 @@ void TextScanWorkOrder::execute() { } std::vector TextScanWorkOrder::parseRow(const char **row_ptr, - const CatalogRelationSchema &relation, bool *is_faulty) const { + const CatalogRelationSchema &relation, + bool *is_faulty) const { std::vector attribute_values; // Always assume current row is not faulty initially. *is_faulty = false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index eada190..59821fc 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -189,6 +189,7 @@ class TextScanWorkOrder : public WorkOrder { * @param process_escape_sequences Whether to decode escape sequences in the * text file. * @param output_destination The InsertDestination to insert tuples. + * @param hdfs The HDFS connector via libhdfs3. **/ TextScanWorkOrder( const std::size_t query_id, @@ -197,14 +198,16 @@ class TextScanWorkOrder : public WorkOrder { const std::size_t text_segment_size, const char field_terminator, const bool process_escape_sequences, - InsertDestination *output_destination) + InsertDestination *output_destination, + void *hdfs = nullptr) : WorkOrder(query_id), filename_(filename), text_offset_(text_offset), text_segment_size_(text_segment_size), field_terminator_(field_terminator), process_escape_sequences_(process_escape_sequences), - output_destination_(DCHECK_NOTNULL(output_destination)) {} + output_destination_(DCHECK_NOTNULL(output_destination)), + hdfs_(hdfs) {} ~TextScanWorkOrder() override {} @@ -332,6 +335,9 @@ class TextScanWorkOrder : public WorkOrder { InsertDestination *output_destination_; + // Not owned. + void *hdfs_; + DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index d2c8251..cf0ee74 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -75,7 +75,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder QueryContext *query_context, StorageManager *storage_manager, const tmb::client_id shiftboss_client_id, - tmb::MessageBus *bus) { + tmb::MessageBus *bus, + void *hdfs) { DCHECK(query_context != nullptr); DCHECK(ProtoIsValid(proto, *catalog_database, *query_context)) << "Attempted to create WorkOrder from an invalid proto description:\n" @@ -473,7 +474,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), query_context->getInsertDestination( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))); + proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), + hdfs); } case serialization::UPDATE: { LOG(INFO) << "Creating UpdateWorkOrder in Shiftboss " << shiftboss_index; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/relational_operators/WorkOrderFactory.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp index acf3855..ece687b 100644 --- a/relational_operators/WorkOrderFactory.hpp +++ b/relational_operators/WorkOrderFactory.hpp @@ -59,6 +59,7 @@ class WorkOrderFactory { * @param storage_manager The StorageManager to use. * @param shiftboss_client_id The TMB client id of Shiftboss. * @param bus A pointer to the TMB. + * @param hdfs The HDFS connector via libhdfs3. * * @return A new WorkOrder reconstructed from the supplied Protocol Buffer. **/ @@ -68,7 +69,8 @@ class WorkOrderFactory { QueryContext *query_context, StorageManager *storage_manager, const tmb::client_id shiftboss_client_id, - tmb::MessageBus *bus); + tmb::MessageBus *bus, + void *hdfs); /** * @brief Check whether a serialization::WorkOrder is fully-formed and http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/storage/FileManagerHdfs.hpp ---------------------------------------------------------------------- diff --git a/storage/FileManagerHdfs.hpp b/storage/FileManagerHdfs.hpp index f47e4a8..a8feb50 100644 --- a/storage/FileManagerHdfs.hpp +++ b/storage/FileManagerHdfs.hpp @@ -55,6 +55,15 @@ class FileManagerHdfs : public FileManager { block_id_counter getMaxUsedBlockCounter(const block_id_domain block_domain) const override; + /** + * @brief Get the HDFS connector via libhdfs3. + * + * @return The HDFS connector. + **/ + void* hdfs() { + return static_cast(hdfs_); + } + private: // libhdfs3 has an API to release this pointer. hdfsFS hdfs_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index 6f7d38b..872e8cc 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -570,6 +570,15 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block, return true; } +void* StorageManager::hdfs() { +#ifdef QUICKSTEP_HAVE_FILE_MANAGER_HDFS + if (FLAGS_use_hdfs) { + return static_cast(file_manager_.get())->hdfs(); + } +#endif // QUICKSTEP_HAVE_FILE_MANAGER_HDFS + return nullptr; +} + vector StorageManager::getPeerDomainNetworkAddresses(const block_id block) { serialization::BlockMessage proto; proto.set_block_id(block); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ad8a0467/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index 42176ee..dc4b7e8 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -41,7 +41,6 @@ #include "storage/StorageBlob.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConfig.h" #include "storage/StorageConstants.hpp" #include "threading/SpinSharedMutex.hpp" #include "utility/Macros.hpp" @@ -395,6 +394,13 @@ class StorageManager { void pullBlockOrBlob(const block_id block, PullResponse *response) const; #endif + /** + * @brief Get the HDFS connector via libhdfs3. + * + * @return The HDFS connector. + **/ + void* hdfs(); + private: struct BlockHandle { void *block_memory;