Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 17A0B19B67 for ; Fri, 1 Apr 2016 09:36:31 +0000 (UTC) Received: (qmail 22189 invoked by uid 500); 1 Apr 2016 09:36:31 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 22149 invoked by uid 500); 1 Apr 2016 09:36:30 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 22140 invoked by uid 99); 1 Apr 2016 09:36:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 09:36:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 5EEC3C293C for ; Fri, 1 Apr 2016 09:36:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id W2ZUxb6ItEn8 for ; Fri, 1 Apr 2016 09:36:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id DEEE05FE65 for ; Fri, 1 Apr 2016 09:36:11 +0000 (UTC) Received: (qmail 17839 invoked by uid 99); 1 Apr 2016 09:36:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 09:36:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 867C4E78A5; Fri, 1 Apr 2016 09:36:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mli@apache.org To: commits@hawq.incubator.apache.org Date: Fri, 01 Apr 2016 09:36:27 -0000 Message-Id: In-Reply-To: <06948843b782473ca90f8f39e9e4d9ca@git.apache.org> References: <06948843b782473ca90f8f39e9e4d9ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/45] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStream.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/InputStream.cpp b/depends/libhdfs3/src/client/InputStream.cpp new file mode 100644 index 0000000..6cbf46d --- /dev/null +++ b/depends/libhdfs3/src/client/InputStream.cpp @@ -0,0 +1,107 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FileSystemImpl.h" +#include "FileSystemInter.h" +#include "InputStream.h" +#include "InputStreamImpl.h" +#include "InputStreamInter.h" + +using namespace Hdfs::Internal; + +namespace Hdfs { + +InputStream::InputStream() { + impl = new Internal::InputStreamImpl; +} + +InputStream::~InputStream() { + delete impl; +} + +/** + * Open a file to read + * @param fs hdfs file system. + * @param path the file to be read. + * @param verifyChecksum verify the checksum. + */ +void InputStream::open(FileSystem & fs, const char * path, + bool verifyChecksum) { + if (!fs.impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->open(fs.impl->filesystem, path, verifyChecksum); +} + +/** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than size. + */ +int32_t InputStream::read(char * buf, int32_t size) { + return impl->read(buf, size); +} + +/** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ +void InputStream::readFully(char * buf, int64_t size) { + impl->readFully(buf, size); +} + +int64_t InputStream::available() { + return impl->available(); +} + +/** + * To move the file point to the given position. + * @param pos the given position. + */ +void InputStream::seek(int64_t pos) { + impl->seek(pos); +} + +/** + * To get the current file point position. + * @return the position of current file point. + */ +int64_t InputStream::tell() { + return impl->tell(); +} + +/** + * Close the sthream. + */ +void InputStream::close() { + impl->close(); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStream.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/InputStream.h b/depends/libhdfs3/src/client/InputStream.h new file mode 100644 index 0000000..73f45ca --- /dev/null +++ b/depends/libhdfs3/src/client/InputStream.h @@ -0,0 +1,99 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ +#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ + +#include "FileSystem.h" + +namespace Hdfs { +namespace Internal { +class InputStreamInter; +} + +/** + * A input stream used read data from hdfs. + */ +class InputStream { +public: + InputStream(); + + ~InputStream(); + + /** + * Open a file to read + * @param fs hdfs file system. + * @param path the file to be read. + * @param verifyChecksum verify the checksum. + */ + void open(FileSystem & fs, const char * path, bool verifyChecksum = true); + + /** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than size. + */ + int32_t read(char * buf, int32_t size); + + /** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ + void readFully(char * buf, int64_t size); + + /** + * Get how many bytes can be read without blocking. + * @return The number of bytes can be read without blocking. + */ + int64_t available(); + + /** + * To move the file point to the given position. + * @param pos the given position. + */ + void seek(int64_t pos); + + /** + * To get the current file point position. + * @return the position of current file point. + */ + int64_t tell(); + + /** + * Close the stream. + */ + void close(); + +private: + Internal::InputStreamInter * impl; +}; + +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamImpl.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/InputStreamImpl.cpp b/depends/libhdfs3/src/client/InputStreamImpl.cpp new file mode 100644 index 0000000..6bb3e18 --- /dev/null +++ b/depends/libhdfs3/src/client/InputStreamImpl.cpp @@ -0,0 +1,812 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemInter.h" +#include "InputStreamImpl.h" +#include "InputStreamInter.h" +#include "LocalBlockReader.h" +#include "Logger.h" +#include "RemoteBlockReader.h" +#include "server/Datanode.h" +#include "Thread.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace Hdfs { +namespace Internal { + +unordered_set BuildLocalAddrSet() { + unordered_set set; + struct ifaddrs * ifAddr = NULL; + struct ifaddrs * pifAddr = NULL; + struct sockaddr * addr; + + if (getifaddrs(&ifAddr)) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get local network interface: %s", + GetSystemErrorInfo(errno)); + } + + try { + std::vector host; + const char * pHost; + host.resize(INET6_ADDRSTRLEN + 1); + + for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) { + addr = pifAddr->ifa_addr; + + if (!addr) { + continue; + } + + memset(&host[0], 0, INET6_ADDRSTRLEN + 1); + + if (addr->sa_family == AF_INET) { + pHost = + inet_ntop(addr->sa_family, + &(reinterpret_cast(addr))->sin_addr, + &host[0], INET6_ADDRSTRLEN); + } else if (addr->sa_family == AF_INET6) { + pHost = + inet_ntop(addr->sa_family, + &(reinterpret_cast(addr))->sin6_addr, + &host[0], INET6_ADDRSTRLEN); + } else { + continue; + } + + if (NULL == pHost) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get convert network address to textual form: %s", + GetSystemErrorInfo(errno)); + } + + set.insert(pHost); + } + + /* + * add hostname. + */ + long hostlen = sysconf(_SC_HOST_NAME_MAX); + host.resize(hostlen + 1); + + if (gethostname(&host[0], host.size())) { + THROW(HdfsNetworkException, + "InputStreamImpl: cannot get hostname: %s", + GetSystemErrorInfo(errno)); + } + + set.insert(&host[0]); + } catch (...) { + if (ifAddr != NULL) { + freeifaddrs(ifAddr); + } + + throw; + } + + if (ifAddr != NULL) { + freeifaddrs(ifAddr); + } + + return set; +} + +InputStreamImpl::InputStreamImpl() : + closed(true), localRead(true), readFromUnderConstructedBlock(false), verify( + true), maxGetBlockInfoRetry(3), cursor(0), endOfCurBlock(0), lastBlockBeingWrittenLength( + 0), prefetchSize(0), peerCache(NULL) { +#ifdef MOCK + stub = NULL; +#endif +} + +InputStreamImpl::~InputStreamImpl() { +} + +void InputStreamImpl::checkStatus() { + if (closed) { + THROW(HdfsIOException, "InputStreamImpl: stream is not opened."); + } + + if (lastError != exception_ptr()) { + rethrow_exception(lastError); + } +} + + +int64_t InputStreamImpl::readBlockLength(const LocatedBlock & b) { + const std::vector & nodes = b.getLocations(); + int replicaNotFoundCount = nodes.size(); + + for (size_t i = 0; i < nodes.size(); ++i) { + try { + int64_t n = 0; + shared_ptr dn; + RpcAuth a = auth; + a.getUser().addToken(b.getToken()); +#ifdef MOCK + + if (stub) { + dn = stub->getDatanode(); + } else { + dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(), + nodes[i].getIpcPort(), *conf, a)); + } + +#else + dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(), + nodes[i].getIpcPort(), *conf, a)); +#endif + n = dn->getReplicaVisibleLength(b); + + if (n >= 0) { + return n; + } + } catch (const ReplicaNotFoundException & e) { + std::string buffer; + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s", + b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer)); + LOG(INFO, + "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode", + b.toString().c_str(), path.c_str()); + --replicaNotFoundCount; + } catch (const HdfsIOException & e) { + std::string buffer; + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s", + b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer)); + LOG(INFO, + "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode", + b.toString().c_str(), path.c_str()); + } + } + + // Namenode told us about these locations, but none know about the replica + // means that we hit the race between pipeline creation start and end. + // we require all 3 because some other exception could have happened + // on a DN that has it. we want to report that error + if (replicaNotFoundCount == 0) { + return 0; + } + + return -1; +} + +/** + * Getting blocks locations'information from namenode + */ +void InputStreamImpl::updateBlockInfos() { + int retry = maxGetBlockInfoRetry; + + for (int i = 0; i < retry; ++i) { + try { + if (!lbs) { + lbs = shared_ptr < LocatedBlocksImpl > (new LocatedBlocksImpl); + } + + filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs); + + if (lbs->isLastBlockComplete()) { + lastBlockBeingWrittenLength = 0; + } else { + shared_ptr last = lbs->getLastBlock(); + + if (!last) { + lastBlockBeingWrittenLength = 0; + } else { + lastBlockBeingWrittenLength = readBlockLength(*last); + + if (lastBlockBeingWrittenLength == -1) { + if (i + 1 >= retry) { + THROW(HdfsIOException, + "InputStreamImpl: failed to get block visible length for Block: %s from all Datanode.", + last->toString().c_str()); + } else { + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block visible length for Block: %s file %s from all Datanode.", + last->toString().c_str(), path.c_str()); + + try { + sleep_for(milliseconds(4000)); + } catch (...) { + } + + continue; + } + } + + last->setNumBytes(lastBlockBeingWrittenLength); + } + } + + return; + } catch (const HdfsRpcException & e) { + std::string buffer; + LOG(LOG_ERROR, + "InputStreamImpl: failed to get block information for file %s, %s", + path.c_str(), GetExceptionDetail(e, buffer)); + + if (i + 1 >= retry) { + throw; + } + } + + LOG(INFO, + "InputStreamImpl: retry to get block information for file: %s, already tried %d time(s).", + path.c_str(), i + 1); + } +} + +int64_t InputStreamImpl::getFileLength() { + int64_t length = lbs->getFileLength(); + + if (!lbs->isLastBlockComplete()) { + length += lastBlockBeingWrittenLength; + } + + return length; +} + +void InputStreamImpl::seekToBlock(const LocatedBlock & lb) { + if (cursor >= lbs->getFileLength()) { + assert(!lbs->isLastBlockComplete()); + readFromUnderConstructedBlock = true; + } else { + readFromUnderConstructedBlock = false; + } + + assert(cursor >= lb.getOffset() + && cursor < lb.getOffset() + lb.getNumBytes()); + curBlock = shared_ptr < LocatedBlock > (new LocatedBlock(lb)); + int64_t blockSize = curBlock->getNumBytes(); + assert(blockSize > 0); + endOfCurBlock = blockSize + curBlock->getOffset(); + failedNodes.clear(); + blockReader.reset(); +} + +bool InputStreamImpl::choseBestNode() { + const std::vector & nodes = curBlock->getLocations(); + + for (size_t i = 0; i < nodes.size(); ++i) { + if (std::binary_search(failedNodes.begin(), failedNodes.end(), + nodes[i])) { + continue; + } + + curNode = nodes[i]; + return true; + } + + return false; +} + +bool InputStreamImpl::isLocalNode() { + static const unordered_set LocalAddrSet = BuildLocalAddrSet(); + bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end(); + return retval; +} + +void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) { + bool lastReadFromLocal = false; + exception_ptr lastException; + + while (true) { + if (!choseBestNode()) { + try { + if (lastException) { + rethrow_exception(lastException); + } + } catch (...) { + NESTED_THROW(HdfsIOException, + "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.", + curBlock->toString().c_str()); + } + + THROW(HdfsIOException, + "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.", + curBlock->toString().c_str()); + } + + try { + int64_t offset, len; + offset = cursor - curBlock->getOffset(); + assert(offset >= 0); + len = curBlock->getNumBytes() - offset; + assert(len > 0); + + if (!temporaryDisableLocalRead && !lastReadFromLocal && + !readFromUnderConstructedBlock && localRead && isLocalNode()) { + lastReadFromLocal = true; + + shared_ptr info; + ReadShortCircuitInfoBuilder builder(curNode, auth, *conf); + + try { + info = builder.fetchOrCreate(*curBlock, curBlock->getToken()); + + if (!info) { + continue; + } + + assert(info->isValid()); + blockReader = shared_ptr( + new LocalBlockReader(info, *curBlock, offset, verify, + *conf, localReaderBuffer)); + } catch (...) { + if (info) { + info->setValid(false); + } + + throw; + } + } else { + const char * clientName = filesystem->getClientName(); + lastReadFromLocal = false; + blockReader = shared_ptr(new RemoteBlockReader( + *curBlock, curNode, *peerCache, offset, len, + curBlock->getToken(), clientName, verify, *conf)); + } + + break; + } catch (const HdfsIOException & e) { + lastException = current_exception(); + std::string buffer; + + if (lastReadFromLocal) { + LOG(LOG_ERROR, + "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\n" + "retry the same node but disable read shortcircuit feature", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); + /* + * do not add node into failedNodes since we will retry the same node but + * disable local block reading + */ + } else { + LOG(LOG_ERROR, + "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\nretry another node", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); + failedNodes.push_back(curNode); + std::sort(failedNodes.begin(), failedNodes.end()); + } + } + } +} + +void InputStreamImpl::open(shared_ptr fs, const char * path, + bool verifyChecksum) { + if (NULL == path || 0 == strlen(path)) { + THROW(InvalidParameter, "path is invalid."); + } + + try { + openInternal(fs, path, verifyChecksum); + } catch (...) { + close(); + throw; + } +} + +void InputStreamImpl::openInternal(shared_ptr fs, const char * path, + bool verifyChecksum) { + try { + filesystem = fs; + verify = verifyChecksum; + this->path = fs->getStandardPath(path); + LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, this->path.c_str(), (verifyChecksum ? "true" : "false")); + conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf())); + this->auth = RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getRpcAuthMethod())); + prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize(); + localRead = conf->isReadFromLocal(); + maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry(); + peerCache = &fs->getPeerCache(); + updateBlockInfos(); + closed = false; + } catch (const HdfsCanceled & e) { + throw; + } catch (const FileNotFoundException & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.", + this->path.c_str()); + } +} + +int32_t InputStreamImpl::read(char * buf, int32_t size) { + checkStatus(); + + try { + int64_t prvious = cursor; + int32_t done = readInternal(buf, size); + LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 " done %d, next pos %" PRId64, this, path.c_str(), size, + prvious, done, cursor); + return done; + } catch (const HdfsEndOfStream & e) { + throw; + } catch (...) { + lastError = current_exception(); + throw; + } +} + +int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure) { + bool temporaryDisableLocalRead = false; + std::string buffer; + + while (true) { + try { + /* + * Setup block reader here and handle failure. + */ + if (!blockReader) { + setupBlockReader(temporaryDisableLocalRead); + temporaryDisableLocalRead = false; + } + } catch (const HdfsInvalidBlockToken & e) { + std::string buffer; + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.", + curBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); + return -1; + } catch (const HdfsIOException & e) { + /* + * In setupBlockReader, we have tried all the replicas. + * We now update block informations once, and try again. + */ + if (shouldUpdateMetadataOnFailure) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.", + curBlock->toString().c_str(), path.c_str(), + GetExceptionDetail(e, buffer)); + return -1; + } else { + /* + * We have updated block informations and failed again. + */ + throw; + } + } + + /* + * Block reader has been setup, read from block reader. + */ + try { + int32_t todo = size; + todo = todo < endOfCurBlock - cursor ? + todo : static_cast(endOfCurBlock - cursor); + assert(blockReader); + todo = blockReader->read(buf, todo); + cursor += todo; + /* + * Exit the loop and function from here if success. + */ + return todo; + } catch (const HdfsIOException & e) { + /* + * Failed to read from current block reader, + * add the current datanode to invalid node list and try again. + */ + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, " + "retry read again from another Datanode.", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); + + if (conf->doesNotRetryAnotherNode()) { + throw; + } + } catch (const ChecksumException & e) { + LOG(LOG_ERROR, + "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, " + "retry read again from another Datanode.", + curBlock->toString().c_str(), path.c_str(), + curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); + } + + /* + * Successfully create the block reader but failed to read. + * Disable the local block reader and try the same node again. + */ + if (!blockReader || dynamic_cast(blockReader.get())) { + temporaryDisableLocalRead = true; + } else { + /* + * Remote block reader failed to read, try another node. + */ + LOG(INFO, "IntputStreamImpl: Add invalid datanode %s to failed datanodes and try another datanode again for file %s.", + curNode.formatAddress().c_str(), path.c_str()); + failedNodes.push_back(curNode); + std::sort(failedNodes.begin(), failedNodes.end()); + } + + blockReader.reset(); + } +} + +/** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than size. + */ +int32_t InputStreamImpl::readInternal(char * buf, int32_t size) { + int updateMetadataOnFailure = conf->getMaxReadBlockRetry(); + + try { + do { + const LocatedBlock * lb = NULL; + + /* + * Check if we have got the block information we need. + */ + if (!lbs || cursor >= getFileLength() + || (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) { + /* + * Get block information from namenode. + * Do RPC failover work in updateBlockInfos. + */ + updateBlockInfos(); + + /* + * We already have the up-to-date block information, + * Check if we reach the end of file. + */ + if (cursor >= getFileLength()) { + THROW(HdfsEndOfStream, + "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %d, from file: %s", + cursor, size, path.c_str()); + } + } + + /* + * If we reach the end of block or the block information has just updated, + * seek to the right block to read. + */ + if (cursor >= endOfCurBlock) { + lb = lbs->findBlock(cursor); + + if (!lb) { + THROW(HdfsIOException, + "InputStreamImpl: cannot find block information at position: %" PRId64 " for file: %s", + cursor, path.c_str()); + } + + /* + * Seek to the right block, setup all needed variable, + * but do not setup block reader, setup it latter. + */ + seekToBlock(*lb); + } + + int32_t retval = readOneBlock(buf, size, updateMetadataOnFailure > 0); + + /* + * Now we have tried all replicas and failed. + * We will update metadata once and try again. + */ + if (retval < 0) { + lbs.reset(); + endOfCurBlock = 0; + --updateMetadataOnFailure; + + try { + sleep_for(seconds(1)); + } catch (...) { + } + + continue; + } + + return retval; + } while (true); + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsEndOfStream & e) { + throw; + } catch (const HdfsException & e) { + /* + * wrap the underlying error and rethrow. + */ + NESTED_THROW(HdfsIOException, + "InputStreamImpl: cannot read file: %s, from position %" PRId64 ", size: %d.", + path.c_str(), cursor, size); + } +} + +/** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ +void InputStreamImpl::readFully(char * buf, int64_t size) { + LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, path.c_str(), size, cursor); + checkStatus(); + + try { + return readFullyInternal(buf, size); + } catch (const HdfsEndOfStream & e) { + throw; + } catch (...) { + lastError = current_exception(); + throw; + } +} + +void InputStreamImpl::readFullyInternal(char * buf, int64_t size) { + int32_t done; + int64_t pos = cursor, todo = size; + + try { + while (todo > 0) { + done = todo < std::numeric_limits::max() ? + static_cast(todo) : + std::numeric_limits::max(); + done = readInternal(buf + (size - todo), done); + todo -= done; + } + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsEndOfStream & e) { + THROW(HdfsEndOfStream, + "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %" PRId64 ", from file: %s", + pos, size, path.c_str()); + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "InputStreamImpl: cannot read fully from file: %s, from position %" PRId64 ", size: %" PRId64 ".", + path.c_str(), pos, size); + } +} + +int64_t InputStreamImpl::available() { + checkStatus(); + + try { + if (blockReader) { + return blockReader->available(); + } + } catch (...) { + lastError = current_exception(); + throw; + } + + return 0; +} + +/** + * To move the file point to the given position. + * @param size the given position. + */ +void InputStreamImpl::seek(int64_t pos) { + LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, path.c_str(), pos, cursor); + checkStatus(); + + try { + seekInternal(pos); + } catch (...) { + lastError = current_exception(); + throw; + } +} + +void InputStreamImpl::seekInternal(int64_t pos) { + if (cursor == pos) { + return; + } + + if (!lbs || pos > getFileLength()) { + updateBlockInfos(); + + if (pos > getFileLength()) { + THROW(HdfsEndOfStream, + "InputStreamImpl: seek over EOF, current position: %" PRId64 ", seek target: %" PRId64 ", in file: %s", + cursor, pos, path.c_str()); + } + } + + try { + if (blockReader && pos > cursor && pos < endOfCurBlock) { + blockReader->skip(pos - cursor); + cursor = pos; + return; + } + } catch (const HdfsIOException & e) { + std::string buffer; + LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s", + pos - cursor, path.c_str(), GetExceptionDetail(e, buffer)); + LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str()); + } catch (const ChecksumException & e) { + std::string buffer; + LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s", + pos - cursor, path.c_str(), GetExceptionDetail(e, buffer)); + LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str()); + } + + /** + * the seek target exceed the current block or skip failed in current block reader. + * reset current block reader and set the cursor to the target position to seek. + */ + endOfCurBlock = 0; + blockReader.reset(); + cursor = pos; +} + +/** + * To get the current file point position. + * @return the position of current file point. + */ +int64_t InputStreamImpl::tell() { + checkStatus(); + LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor); + return cursor; +} + +/** + * Close the stream. + */ +void InputStreamImpl::close() { + LOG(DEBUG2, "%p close file %s for read", this, path.c_str()); + closed = true; + localRead = true; + readFromUnderConstructedBlock = false; + verify = true; + filesystem.reset(); + cursor = 0; + endOfCurBlock = 0; + lastBlockBeingWrittenLength = 0; + prefetchSize = 0; + blockReader.reset(); + curBlock.reset(); + lbs.reset(); + conf.reset(); + failedNodes.clear(); + path.clear(); + localReaderBuffer.resize(0); + lastError = exception_ptr(); +} + +std::string InputStreamImpl::toString() { + if (path.empty()) { + return std::string("InputStream for path ") + path; + } else { + return std::string("InputStream (not opened)"); + } +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamImpl.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/InputStreamImpl.h b/depends/libhdfs3/src/client/InputStreamImpl.h new file mode 100644 index 0000000..c4128f6 --- /dev/null +++ b/depends/libhdfs3/src/client/InputStreamImpl.h @@ -0,0 +1,160 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ +#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ + +#include "platform.h" + +#include "BlockReader.h" +#include "ExceptionInternal.h" +#include "FileSystem.h" +#include "Hash.h" +#include "InputStreamInter.h" +#include "Memory.h" +#include "PeerCache.h" +#include "rpc/RpcAuth.h" +#include "server/Datanode.h" +#include "server/LocatedBlock.h" +#include "server/LocatedBlocks.h" +#include "SessionConfig.h" +#include "Unordered.h" + +#ifdef MOCK +#include "TestDatanodeStub.h" +#endif + +namespace Hdfs { +namespace Internal { + +/** + * A input stream used read data from hdfs. + */ +class InputStreamImpl: public InputStreamInter { +public: + InputStreamImpl(); + ~InputStreamImpl(); + + /** + * Open a file to read + * @param fs hdfs file system. + * @param path the file to be read. + * @param verifyChecksum verify the checksum. + */ + void open(shared_ptr fs, const char * path, bool verifyChecksum); + + /** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than size. + */ + int32_t read(char * buf, int32_t size); + + /** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ + void readFully(char * buf, int64_t size); + + int64_t available(); + + /** + * To move the file point to the given position. + * @param pos the given position. + */ + void seek(int64_t pos); + + /** + * To get the current file point position. + * @return the position of current file point. + */ + int64_t tell(); + + /** + * Close the stream. + */ + void close(); + + /** + * Convert to a printable string + * + * @return return a printable string + */ + std::string toString(); + +private: + bool choseBestNode(); + bool isLocalNode(); + int32_t readInternal(char * buf, int32_t size); + int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure); + int64_t getFileLength(); + int64_t readBlockLength(const LocatedBlock & b); + void checkStatus(); + void openInternal(shared_ptr fs, const char * path, + bool verifyChecksum); + void readFullyInternal(char * buf, int64_t size); + void seekInternal(int64_t pos); + void seekToBlock(const LocatedBlock & lb); + void setupBlockReader(bool temporaryDisableLocalRead); + void updateBlockInfos(); + +private: + bool closed; + bool localRead; + bool readFromUnderConstructedBlock; + bool verify; + DatanodeInfo curNode; + exception_ptr lastError; + FileStatus fileInfo; + int maxGetBlockInfoRetry; + int64_t cursor; + int64_t endOfCurBlock; + int64_t lastBlockBeingWrittenLength; + int64_t prefetchSize; + PeerCache *peerCache; + RpcAuth auth; + shared_ptr blockReader; + shared_ptr filesystem; + shared_ptr curBlock; + shared_ptr lbs; + shared_ptr conf; + std::string path; + std::vector failedNodes; + std::vector localReaderBuffer; + +#ifdef MOCK +private: + Hdfs::Mock::TestDatanodeStub * stub; +#endif +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamInter.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/InputStreamInter.h b/depends/libhdfs3/src/client/InputStreamInter.h new file mode 100644 index 0000000..470f8d2 --- /dev/null +++ b/depends/libhdfs3/src/client/InputStreamInter.h @@ -0,0 +1,104 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_ +#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_ + +#include + +#include + +namespace Hdfs { +namespace Internal { + +class FileSystemInter; + +/** + * A input stream used read data from hdfs. + */ +class InputStreamInter { +public: + + virtual ~InputStreamInter() { + } + + /** + * Open a file to read + * @param fs hdfs file system. + * @param path the file to be read. + * @param verifyChecksum verify the checksum. + */ + virtual void open(shared_ptr fs, const char * path, + bool verifyChecksum) = 0; + + /** + * To read data from hdfs. + * @param buf the buffer used to filled. + * @param size buffer size. + * @return return the number of bytes filled in the buffer, it may less than size. + */ + virtual int32_t read(char * buf, int32_t size) = 0; + + /** + * To read data from hdfs, block until get the given size of bytes. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + */ + virtual void readFully(char * buf, int64_t size) = 0; + + /** + * Get how many bytes can be read without blocking. + * @return The number of bytes can be read without blocking. + */ + virtual int64_t available() = 0; + + /** + * To move the file point to the given position. + * @param pos the given position. + */ + virtual void seek(int64_t pos) = 0; + + /** + * To get the current file point position. + * @return the position of current file point. + */ + virtual int64_t tell() = 0; + + /** + * Close the stream. + */ + virtual void close() = 0; + + /** + * Output a readable string of this input stream. + */ + virtual std::string toString() = 0; +}; + +} +} +#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/KerberosName.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/KerberosName.cpp b/depends/libhdfs3/src/client/KerberosName.cpp new file mode 100644 index 0000000..1890b88 --- /dev/null +++ b/depends/libhdfs3/src/client/KerberosName.cpp @@ -0,0 +1,121 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "KerberosName.h" + +#include "Exception.h" +#include "ExceptionInternal.h" + +#include +#include +#include + +namespace Hdfs { +namespace Internal { + +static void HandleRegError(int rc, regex_t * comp) { + std::vector buffer; + size_t size = regerror(rc, comp, NULL, 0); + buffer.resize(size + 1); + regerror(rc, comp, &buffer[0], buffer.size()); + THROW(HdfsIOException, "KerberosName: Failed to parse Kerberos principal."); +} + +KerberosName::KerberosName() { +} + +KerberosName::KerberosName(const std::string & principal) { + parse(principal); +} + +void KerberosName::parse(const std::string & principal) { + int rc; + static const char * pattern = "([^/@]*)(/([^/@]*))?@([^/@]*)"; + regex_t comp; + regmatch_t pmatch[5]; + + if (principal.empty()) { + return; + } + + memset(&comp, 0, sizeof(regex_t)); + rc = regcomp(&comp, pattern, REG_EXTENDED); + + if (rc) { + HandleRegError(rc, &comp); + } + + try { + memset(pmatch, 0, sizeof(pmatch)); + rc = regexec(&comp, principal.c_str(), + sizeof(pmatch) / sizeof(pmatch[1]), pmatch, 0); + + if (rc && rc != REG_NOMATCH) { + HandleRegError(rc, &comp); + } + + if (rc == REG_NOMATCH) { + if (principal.find('@') != principal.npos) { + THROW(HdfsIOException, + "KerberosName: Malformed Kerberos name: %s", + principal.c_str()); + } else { + name = principal; + } + } else { + if (pmatch[1].rm_so != -1) { + name = principal.substr(pmatch[1].rm_so, + pmatch[1].rm_eo - pmatch[1].rm_so); + } + + if (pmatch[3].rm_so != -1) { + host = principal.substr(pmatch[3].rm_so, + pmatch[3].rm_eo - pmatch[3].rm_so); + } + + if (pmatch[4].rm_so != -1) { + realm = principal.substr(pmatch[4].rm_so, + pmatch[4].rm_eo - pmatch[4].rm_so); + } + } + } catch (...) { + regfree(&comp); + throw; + } + + regfree(&comp); +} + +size_t KerberosName::hash_value() const { + size_t values[] = { StringHasher(name), StringHasher(host), StringHasher( + realm) + }; + return CombineHasher(values, sizeof(values) / sizeof(values[0])); +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/KerberosName.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/KerberosName.h b/depends/libhdfs3/src/client/KerberosName.h new file mode 100644 index 0000000..cbae1eb --- /dev/null +++ b/depends/libhdfs3/src/client/KerberosName.h @@ -0,0 +1,104 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_ +#define _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_ + +#include +#include + +#include "Hash.h" + +namespace Hdfs { +namespace Internal { + +class KerberosName { +public: + KerberosName(); + KerberosName(const std::string & principal); + + std::string getPrincipal() const { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << name; + + if (!host.empty()) { + ss << "/" << host; + } + + if (!realm.empty()) { + ss << '@' << realm; + } + + return ss.str(); + } + + const std::string & getHost() const { + return host; + } + + void setHost(const std::string & host) { + this->host = host; + } + + const std::string & getName() const { + return name; + } + + void setName(const std::string & name) { + this->name = name; + } + + const std::string & getRealm() const { + return realm; + } + + void setRealm(const std::string & realm) { + this->realm = realm; + } + + size_t hash_value() const; + + bool operator ==(const KerberosName & other) const { + return name == other.name && host == other.host && realm == other.realm; + } + +private: + void parse(const std::string & principal); + +private: + std::string name; + std::string host; + std::string realm; +}; + +} +} + +HDFS_HASH_DEFINE(::Hdfs::Internal::KerberosName); + +#endif /* _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LeaseRenewer.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/LeaseRenewer.cpp b/depends/libhdfs3/src/client/LeaseRenewer.cpp new file mode 100644 index 0000000..2d4523c --- /dev/null +++ b/depends/libhdfs3/src/client/LeaseRenewer.cpp @@ -0,0 +1,167 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DateTime.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "FileSystemInter.h" +#include "LeaseRenewer.h" +#include "Logger.h" + +#define DEFAULT_LEASE_RENEW_INTERVAL (60 * 1000) + +namespace Hdfs { +namespace Internal { + +once_flag LeaseRenewer::once; +shared_ptr LeaseRenewer::renewer; + +LeaseRenewer & LeaseRenewer::GetLeaseRenewer() { + call_once(once, &LeaseRenewer::CreateSinglten); + assert(renewer); + return *renewer; +} + +void LeaseRenewer::CreateSinglten() { + renewer = shared_ptr < LeaseRenewer > (new LeaseRenewerImpl()); +} + +LeaseRenewerImpl::LeaseRenewerImpl() : + stop(true), interval(DEFAULT_LEASE_RENEW_INTERVAL) { +} + +LeaseRenewerImpl::~LeaseRenewerImpl() { + stop = true; + cond.notify_all(); + + if (worker.joinable()) { + worker.join(); + } +} + +int LeaseRenewerImpl::getInterval() const { + return interval; +} + +void LeaseRenewerImpl::setInterval(int interval) { + this->interval = interval; +} + +void LeaseRenewerImpl::StartRenew(shared_ptr filesystem) { + lock_guard lock(mut); + const char * clientName = filesystem->getClientName(); + + if (maps.find(clientName) == maps.end()) { + maps[clientName] = filesystem; + } + + filesystem->registerOpenedOutputStream(); + + if (stop && !maps.empty()) { + if (worker.joinable()) { + worker.join(); + } + + stop = false; + CREATE_THREAD(worker, bind(&LeaseRenewerImpl::renewer, this)); + } +} + +void LeaseRenewerImpl::StopRenew(shared_ptr filesystem) { + lock_guard lock(mut); + const char * clientName = filesystem->getClientName(); + + if (filesystem->unregisterOpenedOutputStream() + && maps.find(clientName) != maps.end()) { + maps.erase(clientName); + } +} + +void LeaseRenewerImpl::renewer() { + assert(stop == false); + + while (!stop) { + try { + unique_lock < mutex > lock(mut); + cond.wait_for(lock, milliseconds(interval)); + + if (stop || maps.empty()) { + break; + } + + std::map >::iterator s, e, d; + e = maps.end(); + + for (s = maps.begin(); s != e;) { + shared_ptr fs = s->second; + + try { + if (!fs->renewLease()) { + d = s++; + maps.erase(d); + } else { + ++s; + } + + continue; + } catch (const HdfsException & e) { + std::string buffer; + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name is %s, since:\n%s", + fs->getClientName(), GetExceptionDetail(e, buffer)); + } catch (const std::exception & e) { + LOG(LOG_ERROR, + "Failed to renew lease for filesystem which client name is %s, since:\n%s", + fs->getClientName(), e.what()); + break; + } + + ++s; + } + + if (maps.empty()) { + break; + } + } catch (const std::bad_alloc & e) { + /* + * keep quiet if we run out of memory, since writing log needs memory, + * that may cause the process terminated. + */ + break; + } catch (const std::exception & e) { + LOG(LOG_ERROR, + "Lease renewer will exit since unexpected exception: %s", + e.what()); + break; + } + } + + stop = true; +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LeaseRenewer.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/LeaseRenewer.h b/depends/libhdfs3/src/client/LeaseRenewer.h new file mode 100644 index 0000000..84c02ff --- /dev/null +++ b/depends/libhdfs3/src/client/LeaseRenewer.h @@ -0,0 +1,82 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_ +#define _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_ + +#include + +#include "Atomic.h" +#include "Memory.h" +#include "Thread.h" + +namespace Hdfs { +namespace Internal { + +class FileSystemInter; + +class LeaseRenewer { +public: + virtual ~LeaseRenewer() { + } + + virtual void StartRenew(shared_ptr filesystem) = 0; + virtual void StopRenew(shared_ptr filesystem) = 0; + +public: + static LeaseRenewer & GetLeaseRenewer(); + static void CreateSinglten(); + +private: + static once_flag once; + static shared_ptr renewer; +}; + +class LeaseRenewerImpl: public LeaseRenewer { +public: + LeaseRenewerImpl(); + ~LeaseRenewerImpl(); + int getInterval() const; + void setInterval(int interval); + void StartRenew(shared_ptr filesystem); + void StopRenew(shared_ptr filesystem); + +private: + void renewer(); + +private: + atomic stop; + condition_variable cond; + int interval; + mutex mut; + std::map > maps; + thread worker; +}; + +} +} +#endif /* _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LocalBlockReader.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/LocalBlockReader.cpp b/depends/libhdfs3/src/client/LocalBlockReader.cpp new file mode 100644 index 0000000..b0cb8a4 --- /dev/null +++ b/depends/libhdfs3/src/client/LocalBlockReader.cpp @@ -0,0 +1,288 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "BigEndian.h" +#include "datatransfer.pb.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "HWCrc32c.h" +#include "LocalBlockReader.h" +#include "SWCrc32c.h" + +#include +#include + +#define BMVERSION 1 +#define BMVERSION_SIZE 2 + +#define HEADER_SIZE (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE) + +namespace Hdfs { +namespace Internal { + +LocalBlockReader::LocalBlockReader(const shared_ptr& info, + const ExtendedBlock& block, int64_t offset, + bool verify, SessionConfig& conf, + std::vector& buffer) + : verify(verify), + pbuffer(NULL), + pMetaBuffer(NULL), + block(block), + checksumSize(0), + chunkSize(0), + position(0), + size(0), + cursor(0), + length(block.getNumBytes()), + info(info), + buffer(buffer) { + try { + metaFd = info->getMetaFile(); + dataFd = info->getDataFile(); + + std::vector header; + pMetaBuffer = metaFd->read(header, HEADER_SIZE); + int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]); + + if (BMVERSION != version) { + THROW(HdfsIOException, + "LocalBlockReader get an unmatched block, expected block version %d, real version is %d", + BMVERSION, static_cast(version)); + } + + switch (pMetaBuffer[BMVERSION_SIZE]) { + case ChecksumTypeProto::CHECKSUM_NULL: + this->verify = false; + checksumSize = 0; + metaFd.reset(); + break; + + case ChecksumTypeProto::CHECKSUM_CRC32: + THROW(HdfsIOException, + "LocalBlockReader does not support CRC32 checksum."); + break; + + case ChecksumTypeProto::CHECKSUM_CRC32C: + if (HWCrc32c::available()) { + checksum = shared_ptr(new HWCrc32c()); + } else { + checksum = shared_ptr(new SWCrc32c()); + } + + chunkSize = ReadBigEndian32FromArray( + &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]); + checksumSize = sizeof(int32_t); + break; + + default: + THROW(HdfsIOException, + "LocalBlockReader cannot recognize checksum type: %d.", + static_cast(pMetaBuffer[BMVERSION_SIZE])); + } + + if (verify && chunkSize <= 0) { + THROW(HdfsIOException, + "LocalBlockReader get an invalid checksum parameter, bytes per check: %d.", + chunkSize); + } + + localBufferSize = conf.getLocalReadBufferSize(); + + if (verify) { + localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize; + } + + if (offset > 0) { + skip(offset); + } + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + NESTED_THROW(HdfsIOException, + "Failed to construct LocalBlockReader for block: %s.", + block.toString().c_str()); + } +} + +LocalBlockReader::~LocalBlockReader() { +} + +void LocalBlockReader::readAndVerify(int32_t bufferSize) { + assert(true == verify); + assert(cursor % chunkSize == 0); + int chunks = (bufferSize + chunkSize - 1) / chunkSize; + pbuffer = dataFd->read(buffer, bufferSize); + pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize); + + for (int i = 0; i < chunks; ++i) { + checksum->reset(); + int chunk = chunkSize; + + if (chunkSize * (i + 1) > bufferSize) { + chunk = bufferSize % chunkSize; + } + + checksum->update(&pbuffer[i * chunkSize], chunk); + uint32_t target = ReadBigEndian32FromArray( + &pMetaBuffer[i * checksumSize]); + + if (target != checksum->getValue()) { + THROW(ChecksumException, + "LocalBlockReader checksum not match for block: %s", + block.toString().c_str()); + } + } +} + +int32_t LocalBlockReader::readInternal(char * buf, int32_t len) { + int32_t todo = len; + + /* + * read from buffer. + */ + if (position < size) { + todo = todo < size - position ? todo : size - position; + memcpy(buf, &pbuffer[position], todo); + position += todo; + cursor += todo; + return todo; + } + + /* + * end of block + */ + todo = todo < length - cursor ? todo : length - cursor; + + if (0 == todo) { + return 0; + } + + /* + * bypass the buffer + */ + if (!verify + && (todo > localBufferSize || todo == length - cursor)) { + dataFd->copy(buf, todo); + cursor += todo; + return todo; + } + + /* + * fill buffer. + */ + int bufferSize = localBufferSize; + bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor; + assert(bufferSize > 0); + + if (verify) { + readAndVerify(bufferSize); + } else { + pbuffer = dataFd->read(buffer, bufferSize); + } + + position = 0; + size = bufferSize; + assert(position < size); + return readInternal(buf, todo); +} + +int32_t LocalBlockReader::read(char * buf, int32_t size) { + try { + return readInternal(buf, size); + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + info->setValid(false); + NESTED_THROW(HdfsIOException, + "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s.", + cursor, size, block.toString().c_str()); + } + + assert(!"cannot reach here"); + return 0; +} + +void LocalBlockReader::skip(int64_t len) { + assert(len < length - cursor); + + try { + int64_t todo = len; + + while (todo > 0) { + /* + * skip the data in buffer. + */ + if (size - position > 0) { + int batch = todo < size - position ? todo : size - position; + position += batch; + todo -= batch; + cursor += batch; + continue; + } + + if (verify) { + int64_t lastChunkSize = (cursor + todo) % chunkSize; + cursor = (cursor + todo) / chunkSize * chunkSize; + int64_t metaCursor = HEADER_SIZE + + checksumSize * (cursor / chunkSize); + metaFd->seek(metaCursor); + todo = lastChunkSize; + } else { + cursor += todo; + todo = 0; + } + + if (cursor > 0) { + dataFd->seek(cursor); + } + + /* + * fill buffer again and verify checksum + */ + if (todo > 0) { + assert(true == verify); + int bufferSize = localBufferSize; + bufferSize = + bufferSize < length - cursor ? + bufferSize : length - cursor; + readAndVerify(bufferSize); + position = 0; + size = bufferSize; + } + } + } catch (const HdfsCanceled & e) { + throw; + } catch (const HdfsException & e) { + info->setValid(false); + NESTED_THROW(HdfsIOException, + "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s.", + cursor, size, block.toString().c_str()); + } +} + +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LocalBlockReader.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/LocalBlockReader.h b/depends/libhdfs3/src/client/LocalBlockReader.h new file mode 100644 index 0000000..facdb4c --- /dev/null +++ b/depends/libhdfs3/src/client/LocalBlockReader.h @@ -0,0 +1,105 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_ +#define _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_ + +#include "BlockReader.h" +#include "Checksum.h" +#include "FileWrapper.h" +#include "Memory.h" +#include "ReadShortCircuitInfo.h" +#include "SessionConfig.h" + +#include + +namespace Hdfs { +namespace Internal { + +class LocalBlockReader: public BlockReader { +public: + LocalBlockReader(const shared_ptr& info, + const ExtendedBlock & block, int64_t offset, bool verify, + SessionConfig & conf, std::vector & buffer); + + ~LocalBlockReader(); + + /** + * Get how many bytes can be read without blocking. + * @return The number of bytes can be read without blocking. + */ + virtual int64_t available() { + return length - cursor; + } + + /** + * To read data from block. + * @param buf the buffer used to filled. + * @param size the number of bytes to be read. + * @return return the number of bytes filled in the buffer, + * it may less than size. Return 0 if reach the end of block. + */ + virtual int32_t read(char * buf, int32_t size); + + /** + * Move the cursor forward len bytes. + * @param len The number of bytes to skip. + */ + virtual void skip(int64_t len); + +private: + /** + * Fill buffer and verify checksum. + * @param bufferSize The size of buffer. + */ + void readAndVerify(int32_t bufferSize); + int32_t readInternal(char * buf, int32_t len); + +private: + bool verify; //verify checksum or not. + const char * pbuffer; + const char * pMetaBuffer; + const ExtendedBlock & block; + int checksumSize; + int chunkSize; + int localBufferSize; + int position; //point in buffer. + int size; //data size in buffer. + int64_t cursor; //point in block. + int64_t length; //data size of block. + shared_ptr checksum; + shared_ptr dataFd; + shared_ptr metaFd; + shared_ptr info; + std::vector & buffer; + std::vector metaBuffer; +}; + +} +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStream.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStream.cpp b/depends/libhdfs3/src/client/OutputStream.cpp new file mode 100644 index 0000000..227ba71 --- /dev/null +++ b/depends/libhdfs3/src/client/OutputStream.cpp @@ -0,0 +1,96 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Atomic.h" +#include "FileSystemImpl.h" +#include "Memory.h" +#include "OutputStream.h" +#include "OutputStreamImpl.h" + +using namespace Hdfs::Internal; + +namespace Hdfs { + +OutputStream::OutputStream() { + impl = new Internal::OutputStreamImpl; +} + +OutputStream::~OutputStream() { + delete impl; +} + +void OutputStream::open(FileSystem & fs, const char * path, int flag, + const Permission permission, bool createParent, int replication, + int64_t blockSize) { + if (!fs.impl) { + THROW(HdfsIOException, "FileSystem: not connected."); + } + + impl->open(fs.impl->filesystem, path, flag, permission, createParent, replication, + blockSize); +} + +/** + * To append data to file. + * @param buf the data used to append. + * @param size the data size. + */ +void OutputStream::append(const char * buf, int64_t size) { + impl->append(buf, size); +} + +/** + * Flush all data in buffer and waiting for ack. + * Will block until get all acks. + */ +void OutputStream::flush() { + impl->flush(); +} + +/** + * return the current file length. + * @return current file length. + */ +int64_t OutputStream::tell() { + return impl->tell(); +} + +/** + * the same as flush right now. + */ +void OutputStream::sync() { + impl->sync(); +} + +/** + * close the stream. + */ +void OutputStream::close() { + impl->close(); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStream.h ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/OutputStream.h b/depends/libhdfs3/src/client/OutputStream.h new file mode 100644 index 0000000..14ae6a7 --- /dev/null +++ b/depends/libhdfs3/src/client/OutputStream.h @@ -0,0 +1,132 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_ +#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_ + +#include "FileSystem.h" + +namespace Hdfs { + +/** + * Use the CreateFlag as follows: + *
    + *
  1. CREATE - to create a file if it does not exist, + * else throw FileAlreadyExists.
  2. + *
  3. APPEND - to append to a file if it exists, + * else throw FileNotFoundException.
  4. + *
  5. OVERWRITE - to truncate a file if it exists, + * else throw FileNotFoundException.
  6. + *
  7. CREATE|APPEND - to create a file if it does not exist, + * else append to an existing file.
  8. + *
  9. CREATE|OVERWRITE - to create a file if it does not exist, + * else overwrite an existing file.
  10. + *
  11. SyncBlock - to force closed blocks to the disk device. + * In addition {@link OutputStream::sync()} should be called after each write, + * if true synchronous behavior is required.
  12. + *
+ * + * Following combination is not valid and will result in + * {@link InvalidParameter}: + *
    + *
  1. APPEND|OVERWRITE
  2. + *
  3. CREATE|APPEND|OVERWRITE
  4. + *
+ */ +enum CreateFlag { + Create = 0x01, Overwrite = 0x02, Append = 0x04, SyncBlock = 0x08 +}; + +namespace Internal { +class OutputStreamInter; +} + +/** + * A output stream used to write data to hdfs. + */ +class OutputStream { +public: + /** + * Construct a new OutputStream. + */ + OutputStream(); + /** + * Destroy a OutputStream instance. + */ + ~OutputStream(); + + /** + * To create or append a file. + * @param fs hdfs file system. + * @param path the file path. + * @param flag creation flag, can be Create, Append or Create|Overwrite. + * @param permission create a new file with given permission. + * @param createParent if the parent does not exist, create it. + * @param replication create a file with given number of replication. + * @param blockSize create a file with given block size. + */ + void open(FileSystem & fs, const char * path, int flag = Create, + const Permission permission = Permission(0644), bool createParent = + false, int replication = 0, int64_t blockSize = 0); + + /** + * To append data to file. + * @param buf the data used to append. + * @param size the data size. + */ + void append(const char * buf, int64_t size); + + /** + * Flush all data in buffer and waiting for ack. + * Will block until get all acks. + */ + void flush(); + + /** + * return the current file length. + * @return current file length. + */ + int64_t tell(); + + /** + * the same as flush right now. + */ + void sync(); + + /** + * close the stream. + */ + void close(); + +private: + Internal::OutputStreamInter * impl; + +}; + +} + +#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_ */