hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [20/45] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified
Date Fri, 01 Apr 2016 09:36:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp b/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp
new file mode 100644
index 0000000..c0d0e10
--- /dev/null
+++ b/depends/libhdfs3/src/client/DataTransferProtocolSender.cpp
@@ -0,0 +1,203 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "client/Token.h"
+#include "datatransfer.pb.h"
+#include "DataTransferProtocolSender.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "hdfs.pb.h"
+#include "Security.pb.h"
+#include "WriteBuffer.h"
+
+using namespace google::protobuf;
+
+namespace Hdfs {
+namespace Internal {
+
+static inline void Send(Socket & sock, DataTransferOp op, Message * msg,
+                        int writeTimeout) {
+    WriteBuffer buffer;
+    buffer.writeBigEndian(static_cast<int16_t>(DATA_TRANSFER_VERSION));
+    buffer.write(static_cast<char>(op));
+    int msgSize = msg->ByteSize();
+    buffer.writeVarint32(msgSize);
+    char * b = buffer.alloc(msgSize);
+
+    if (!msg->SerializeToArray(b, msgSize)) {
+        THROW(HdfsIOException,
+              "DataTransferProtocolSender cannot serialize header to send buffer.");
+    }
+
+    sock.writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
+}
+
+static inline void BuildBaseHeader(const ExtendedBlock & block,
+                                   const Token & accessToken, BaseHeaderProto * header) {
+    ExtendedBlockProto * eb = header->mutable_block();
+    TokenProto * token = header->mutable_token();
+    eb->set_blockid(block.getBlockId());
+    eb->set_generationstamp(block.getGenerationStamp());
+    eb->set_numbytes(block.getNumBytes());
+    eb->set_poolid(block.getPoolId());
+    token->set_identifier(accessToken.getIdentifier());
+    token->set_password(accessToken.getPassword());
+    token->set_kind(accessToken.getKind());
+    token->set_service(accessToken.getService());
+}
+
+static inline void BuildClientHeader(const ExtendedBlock & block,
+                                     const Token & accessToken, const char * clientName,
+                                     ClientOperationHeaderProto * header) {
+    header->set_clientname(clientName);
+    BuildBaseHeader(block, accessToken, header->mutable_baseheader());
+}
+
+static inline void BuildNodeInfo(const DatanodeInfo & node,
+                                 DatanodeInfoProto * info) {
+    DatanodeIDProto * id = info->mutable_id();
+    id->set_hostname(node.getHostName());
+    id->set_infoport(node.getInfoPort());
+    id->set_ipaddr(node.getIpAddr());
+    id->set_ipcport(node.getIpcPort());
+    id->set_datanodeuuid(node.getDatanodeId());
+    id->set_xferport(node.getXferPort());
+    info->set_location(node.getLocation());
+}
+
+static inline void BuildNodesInfo(const std::vector<DatanodeInfo> & nodes,
+                                  RepeatedPtrField<DatanodeInfoProto> * infos) {
+    for (std::size_t i = 0; i < nodes.size(); ++i) {
+        BuildNodeInfo(nodes[i], infos->Add());
+    }
+}
+
+DataTransferProtocolSender::DataTransferProtocolSender(Socket & sock,
+        int writeTimeout, const std::string & datanodeAddr) :
+    sock(sock), writeTimeout(writeTimeout), datanode(datanodeAddr) {
+}
+
+DataTransferProtocolSender::~DataTransferProtocolSender() {
+}
+
+void DataTransferProtocolSender::readBlock(const ExtendedBlock & blk,
+        const Token & blockToken, const char * clientName,
+        int64_t blockOffset, int64_t length) {
+    try {
+        OpReadBlockProto op;
+        op.set_len(length);
+        op.set_offset(blockOffset);
+        BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
+        Send(sock, READ_BLOCK, &op, writeTimeout);
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "DataTransferProtocolSender cannot send read request to datanode %s.",
+                     datanode.c_str());
+    }
+}
+
+void DataTransferProtocolSender::writeBlock(const ExtendedBlock & blk,
+        const Token & blockToken, const char * clientName,
+        const std::vector<DatanodeInfo> & targets, int stage, int pipelineSize,
+        int64_t minBytesRcvd, int64_t maxBytesRcvd,
+        int64_t latestGenerationStamp, int checksumType, int bytesPerChecksum) {
+    try {
+        OpWriteBlockProto op;
+        op.set_latestgenerationstamp(latestGenerationStamp);
+        op.set_minbytesrcvd(minBytesRcvd);
+        op.set_maxbytesrcvd(maxBytesRcvd);
+        op.set_pipelinesize(targets.size());
+        op.set_stage((OpWriteBlockProto_BlockConstructionStage) stage);
+        BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
+        ChecksumProto * ck = op.mutable_requestedchecksum();
+        ck->set_bytesperchecksum(bytesPerChecksum);
+        ck->set_type((ChecksumTypeProto) checksumType);
+        BuildNodesInfo(targets, op.mutable_targets());
+        Send(sock, WRITE_BLOCK, &op, writeTimeout);
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "DataTransferProtocolSender cannot send write request to datanode %s.",
+                     datanode.c_str());
+    }
+}
+
+void DataTransferProtocolSender::transferBlock(const ExtendedBlock & blk,
+        const Token & blockToken, const char * clientName,
+        const std::vector<DatanodeInfo> & targets) {
+    try {
+        OpTransferBlockProto op;
+        BuildClientHeader(blk, blockToken, clientName, op.mutable_header());
+        BuildNodesInfo(targets, op.mutable_targets());
+        Send(sock, TRANSFER_BLOCK, &op, writeTimeout);
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "DataTransferProtocolSender cannot send transfer request to datanode %s.",
+                     datanode.c_str());
+    }
+}
+
+void DataTransferProtocolSender::blockChecksum(const ExtendedBlock & blk,
+        const Token & blockToken) {
+    try {
+        //TODO
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "DataTransferProtocolSender cannot send checksum request to datanode %s.",
+                     datanode.c_str());
+    }
+}
+
+void DataTransferProtocolSender::requestShortCircuitFds(const ExtendedBlock blk,
+                                                        const Token& blockToken,
+                                                        uint32_t maxVersion) {
+    try {
+        OpRequestShortCircuitAccessProto op;
+        BuildBaseHeader(blk, blockToken, op.mutable_header());
+        op.set_maxversion(maxVersion);
+
+        Send(sock, REQUEST_SHORT_CIRCUIT_FDS, &op, writeTimeout);
+    } catch (const HdfsCanceled& e) {
+        throw;
+    } catch (const HdfsException& e) {
+        NESTED_THROW(HdfsIOException,
+                     "DataTransferProtocolSender cannot send request "
+                     "short-circuit fds request "
+                     "to datanode %s.",
+                     datanode.c_str());
+    }
+}
+}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DataTransferProtocolSender.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/DataTransferProtocolSender.h b/depends/libhdfs3/src/client/DataTransferProtocolSender.h
new file mode 100644
index 0000000..63d1c36
--- /dev/null
+++ b/depends/libhdfs3/src/client/DataTransferProtocolSender.h
@@ -0,0 +1,143 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_
+#define _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_
+
+#include "DataTransferProtocol.h"
+#include "network/Socket.h"
+
+/**
+ * Version 28:
+ *    Declare methods in DataTransferProtocol interface.
+ */
+#define DATA_TRANSFER_VERSION 28
+
+namespace Hdfs {
+namespace Internal {
+
+enum DataTransferOp {
+    WRITE_BLOCK = 80,
+    READ_BLOCK = 81,
+    READ_METADATA = 82,
+    REPLACE_BLOCK = 83,
+    COPY_BLOCK = 84,
+    BLOCK_CHECKSUM = 85,
+    TRANSFER_BLOCK = 86,
+    REQUEST_SHORT_CIRCUIT_FDS = 87,
+    RELEASE_SHORT_CIRCUIT_FDS = 88
+};
+
+/**
+ * Transfer data to/from datanode using a streaming protocol.
+ */
+class DataTransferProtocolSender: public DataTransferProtocol {
+public:
+    DataTransferProtocolSender(Socket & sock, int writeTimeout,
+                               const std::string & datanodeAddr);
+
+    virtual ~DataTransferProtocolSender();
+
+    /**
+     * Read a block.
+     *
+     * @param blk the block being read.
+     * @param blockToken security token for accessing the block.
+     * @param clientName client's name.
+     * @param blockOffset offset of the block.
+     * @param length maximum number of bytes for this read.
+     */
+    virtual void readBlock(const ExtendedBlock & blk, const Token & blockToken,
+                           const char * clientName, int64_t blockOffset, int64_t length);
+
+    /**
+     * Write a block to a datanode pipeline.
+     *
+     * @param blk the block being written.
+     * @param blockToken security token for accessing the block.
+     * @param clientName client's name.
+     * @param targets target datanodes in the pipeline.
+     * @param source source datanode.
+     * @param stage pipeline stage.
+     * @param pipelineSize the size of the pipeline.
+     * @param minBytesRcvd minimum number of bytes received.
+     * @param maxBytesRcvd maximum number of bytes received.
+     * @param latestGenerationStamp the latest generation stamp of the block.
+     */
+    virtual void writeBlock(const ExtendedBlock & blk, const Token & blockToken,
+                            const char * clientName, const std::vector<DatanodeInfo> & targets,
+                            int stage, int pipelineSize, int64_t minBytesRcvd,
+                            int64_t maxBytesRcvd, int64_t latestGenerationStamp,
+                            int checksumType, int bytesPerChecksum);
+
+    /**
+     * Transfer a block to another datanode.
+     * The block stage must be
+     * either {@link BlockConstructionStage#TRANSFER_RBW}
+     * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+     *
+     * @param blk the block being transferred.
+     * @param blockToken security token for accessing the block.
+     * @param clientName client's name.
+     * @param targets target datanodes.
+     */
+    virtual void transferBlock(const ExtendedBlock & blk,
+                               const Token & blockToken, const char * clientName,
+                               const std::vector<DatanodeInfo> & targets);
+
+    /**
+     * Get block checksum (MD5 of CRC32).
+     *
+     * @param blk a block.
+     * @param blockToken security token for accessing the block.
+     * @throw HdfsIOException
+     */
+    virtual void blockChecksum(const ExtendedBlock & blk,
+                               const Token & blockToken);
+
+    /**
+     * Request short circuit access file descriptors from a DataNode.
+     *
+     * @param blk             The block to get file descriptors for.
+     * @param blockToken      Security token for accessing the block.
+     * @param maxVersion      Maximum version of the block data the client
+     *                          can understand.
+     */
+    virtual void requestShortCircuitFds(const ExtendedBlock blk,
+                                        const Token& blockToken,
+                                        uint32_t maxVersion);
+
+private:
+    Socket & sock;
+    int writeTimeout;
+    std::string datanode;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS_3_CLIENT_DATATRANSFERPROTOCOLSENDER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DirectoryIterator.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/DirectoryIterator.cpp b/depends/libhdfs3/src/client/DirectoryIterator.cpp
new file mode 100644
index 0000000..b5b5043
--- /dev/null
+++ b/depends/libhdfs3/src/client/DirectoryIterator.cpp
@@ -0,0 +1,100 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DirectoryIterator.h"
+#include "FileStatus.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemImpl.h"
+
+namespace Hdfs {
+
+DirectoryIterator::DirectoryIterator() :
+    needLocations(false), filesystem(NULL), next(0) {
+}
+
+DirectoryIterator::DirectoryIterator(Hdfs::Internal::FileSystemImpl * const fs,
+                                     std::string path, bool needLocations) :
+    needLocations(needLocations), filesystem(fs), next(0), path(path) {
+}
+
+DirectoryIterator::DirectoryIterator(const DirectoryIterator & it) :
+    needLocations(it.needLocations), filesystem(it.filesystem), next(it.next), path(it.path), startAfter(
+        it.startAfter), lists(it.lists) {
+}
+
+DirectoryIterator & DirectoryIterator::operator =(const DirectoryIterator & it) {
+    if (this == &it) {
+        return *this;
+    }
+
+    needLocations = it.needLocations;
+    filesystem = it.filesystem;
+    next = it.next;
+    path = it.path;
+    startAfter = it.startAfter;
+    lists = it.lists;
+    return *this;
+}
+
+bool DirectoryIterator::getListing() {
+    bool more;
+
+    if (NULL == filesystem) {
+        return false;
+    }
+
+    next = 0;
+    lists.clear();
+    more = filesystem->getListing(path, startAfter, needLocations, lists);
+
+    if (!lists.empty()) {
+        startAfter = lists.back().getPath();
+    }
+
+    return more || !lists.empty();
+}
+
+bool DirectoryIterator::hasNext() {
+    if (next >= lists.size()) {
+        return getListing();
+    }
+
+    return true;
+}
+
+Hdfs::FileStatus DirectoryIterator::getNext() {
+    if (next >= lists.size()) {
+        if (!getListing()) {
+            THROW(HdfsIOException, "End of the dir flow");
+        }
+    }
+
+    return lists[next++];
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/DirectoryIterator.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/DirectoryIterator.h b/depends/libhdfs3/src/client/DirectoryIterator.h
new file mode 100644
index 0000000..cb12ad8
--- /dev/null
+++ b/depends/libhdfs3/src/client/DirectoryIterator.h
@@ -0,0 +1,63 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_
+#define _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_
+
+#include "FileStatus.h"
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+class FileSystemImpl;
+}
+
+class DirectoryIterator {
+public:
+    DirectoryIterator();
+    DirectoryIterator(Hdfs::Internal::FileSystemImpl * const fs,
+                      std::string path, bool needLocations);
+    DirectoryIterator(const DirectoryIterator & it);
+    DirectoryIterator & operator = (const DirectoryIterator & it);
+    bool hasNext();
+    FileStatus getNext();
+
+private:
+    bool getListing();
+
+private:
+    bool needLocations;
+    Hdfs::Internal::FileSystemImpl * filesystem;
+    size_t next;
+    std::string path;
+    std::string startAfter;
+    std::vector<FileStatus> lists;
+};
+
+}
+
+#endif /* _HDFS_LIBHFDS3_CLIENT_DIRECTORY_ITERATOR_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileStatus.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileStatus.h b/depends/libhdfs3/src/client/FileStatus.h
new file mode 100644
index 0000000..53b855c
--- /dev/null
+++ b/depends/libhdfs3/src/client/FileStatus.h
@@ -0,0 +1,167 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_FILESTATUS_H_
+#define _HDFS_LIBHDFS3_CLIENT_FILESTATUS_H_
+
+#include "Permission.h"
+
+#include <string>
+
+namespace Hdfs {
+
+class FileStatus {
+public:
+    FileStatus() :
+        isdir(false), atime(0), blocksize(0), length(0), mtime(
+            0), permission(0644), replications(0) {
+    }
+
+    int64_t getAccessTime() const {
+        return atime;
+    }
+
+    void setAccessTime(int64_t accessTime) {
+        atime = accessTime;
+    }
+
+    short getReplication() const {
+        return replications;
+    }
+
+    void setReplication(short blockReplication) {
+        replications = blockReplication;
+    }
+
+    int64_t getBlockSize() const {
+        return blocksize;
+    }
+
+    void setBlocksize(int64_t blocksize) {
+        this->blocksize = blocksize;
+    }
+
+    const char * getGroup() const {
+        return group.c_str();
+    }
+
+    void setGroup(const char * group) {
+        this->group = group;
+    }
+
+    /**
+     * Is this a directory?
+     * @return true if this is a directory
+     */
+    bool isDirectory() const {
+        return isdir;
+    }
+
+    void setIsdir(bool isdir) {
+        this->isdir = isdir;
+    }
+
+    int64_t getLength() const {
+        return length;
+    }
+
+    void setLength(int64_t length) {
+        this->length = length;
+    }
+
+    int64_t getModificationTime() const {
+        return mtime;
+    }
+
+    void setModificationTime(int64_t modificationTime) {
+        mtime = modificationTime;
+    }
+
+    const char * getOwner() const {
+        return owner.c_str();
+    }
+
+    void setOwner(const char * owner) {
+        this->owner = owner;
+    }
+
+    const char * getPath() const {
+        return path.c_str();
+    }
+
+    void setPath(const char * path) {
+        this->path = path;
+    }
+
+    const Permission & getPermission() const {
+        return permission;
+    }
+
+    void setPermission(const Permission & permission) {
+        this->permission = permission;
+    }
+
+    const char * getSymlink() const {
+        return symlink.c_str();
+    }
+
+    void setSymlink(const char * symlink) {
+        this->symlink = symlink;
+    }
+
+    /**
+     * Is this a file?
+     * @return true if this is a file
+     */
+    bool isFile() {
+        return !isdir && !isSymlink();
+    }
+
+    /**
+     * Is this a symbolic link?
+     * @return true if this is a symbolic link
+     */
+    bool isSymlink() {
+        return !symlink.empty();
+    }
+
+private:
+    bool isdir;
+    int64_t atime;
+    int64_t blocksize;
+    int64_t length;
+    int64_t mtime;
+    Permission permission;
+    short replications;
+    std::string group;
+    std::string owner;
+    std::string path;
+    std::string symlink;
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystem.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileSystem.cpp b/depends/libhdfs3/src/client/FileSystem.cpp
new file mode 100644
index 0000000..0d0ef77
--- /dev/null
+++ b/depends/libhdfs3/src/client/FileSystem.cpp
@@ -0,0 +1,591 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DirectoryIterator.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystem.h"
+#include "FileSystemImpl.h"
+#include "FileSystemKey.h"
+#include "Hash.h"
+#include "SessionConfig.h"
+#include "Thread.h"
+#include "Token.h"
+#include "Unordered.h"
+#include "WritableUtils.h"
+
+#include <algorithm>
+#include <string>
+#include <krb5/krb5.h>
+
+using namespace Hdfs::Internal;
+
+namespace Hdfs {
+
+namespace Internal {
+
+static std::string ExtractPrincipalFromTicketCache(
+    const std::string & cachePath) {
+    krb5_context cxt = NULL;
+    krb5_ccache ccache = NULL;
+    krb5_principal principal = NULL;
+    krb5_error_code ec = 0;
+    std::string errmsg, retval;
+    char * priName = NULL;
+
+    if (!cachePath.empty()) {
+        if (0 != setenv("KRB5CCNAME", cachePath.c_str(), 1)) {
+            THROW(HdfsIOException, "Cannot set env parameter \"KRB5CCNAME\"");
+        }
+    }
+
+    do {
+        if (0 != (ec = krb5_init_context(&cxt))) {
+            break;
+        }
+
+        if (0 != (ec = krb5_cc_default(cxt, &ccache))) {
+            break;
+        }
+
+        if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) {
+            break;
+        }
+
+        if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) {
+            break;
+        }
+    } while (0);
+
+    if (!ec) {
+        retval = priName;
+    } else {
+        if (cxt) {
+            errmsg = krb5_get_error_message(cxt, ec);
+        } else {
+            errmsg = "Cannot initialize kerberos context";
+        }
+    }
+
+    if (priName != NULL) {
+        krb5_free_unparsed_name(cxt, priName);
+    }
+
+    if (principal != NULL) {
+        krb5_free_principal(cxt, principal);
+    }
+
+    if (ccache != NULL) {
+        krb5_cc_close(cxt, ccache);
+    }
+
+    if (cxt != NULL) {
+        krb5_free_context(cxt);
+    }
+
+    if (!errmsg.empty()) {
+        THROW(HdfsIOException,
+              "FileSystem: Failed to extract principal from ticket cache: %s",
+              errmsg.c_str());
+    }
+
+    return retval;
+}
+
+
+static std::string ExtractPrincipalFromToken(const Token & token) {
+    std::string realUser, owner;
+    std::string identifier = token.getIdentifier();
+    WritableUtils cin(&identifier[0], identifier.size());
+    char version;
+
+    try {
+        version = cin.readByte();
+
+        if (version != 0) {
+            THROW(HdfsIOException, "Unknown version of delegation token");
+        }
+
+        owner = cin.ReadText();
+        cin.ReadText();
+        realUser = cin.ReadText();
+        return realUser.empty() ? owner : realUser;
+    } catch (const std::range_error & e) {
+    }
+
+    THROW(HdfsIOException, "Cannot extract principal from token");
+}
+}
+
+FileSystem::FileSystem(const Config & conf) :
+    conf(conf), impl(NULL) {
+}
+
+FileSystem::FileSystem(const FileSystem & other) :
+    conf(other.conf), impl(NULL) {
+    if (other.impl) {
+        impl = new FileSystemWrapper(other.impl->filesystem);
+    }
+}
+
+FileSystem & FileSystem::operator =(const FileSystem & other) {
+    if (this == &other) {
+        return *this;
+    }
+
+    conf = other.conf;
+
+    if (impl) {
+        delete impl;
+        impl = NULL;
+    }
+
+    if (other.impl) {
+        impl = new FileSystemWrapper(other.impl->filesystem);
+    }
+
+    return *this;
+}
+
+FileSystem::~FileSystem() {
+    if (impl) {
+        try {
+            disconnect();
+        } catch (...) {
+        }
+    }
+}
+
+void FileSystem::connect() {
+    Internal::SessionConfig sconf(conf);
+    connect(sconf.getDefaultUri().c_str(), NULL, NULL);
+}
+
+/**
+ * Connect to hdfs
+ * @param uri hdfs connection uri, hdfs://host:port
+ */
+void FileSystem::connect(const char * uri) {
+    connect(uri, NULL, NULL);
+}
+
+static FileSystemWrapper * ConnectInternal(const char * uri,
+        const std::string & principal, const Token * token, Config & conf) {
+    if (NULL == uri || 0 == strlen(uri)) {
+        THROW(InvalidParameter, "Invalid HDFS uri.");
+    }
+
+    FileSystemKey key(uri, principal.c_str());
+
+    if (token) {
+        key.addToken(*token);
+    }
+
+    return new FileSystemWrapper(shared_ptr<FileSystemInter>(new FileSystemImpl(key, conf)));
+}
+
+/**
+ * Connect to hdfs with user or token
+ * 	username and token cannot be set at the same time
+ * @param uri connection uri.
+ * @param username user used to connect to hdfs
+ * @param token token used to connect to hdfs
+ */
+void FileSystem::connect(const char * uri, const char * username, const char * token) {
+    AuthMethod auth;
+    std::string principal;
+
+    if (impl) {
+        THROW(HdfsIOException, "FileSystem: already connected.");
+    }
+
+    try {
+        SessionConfig sconf(conf);
+        auth = RpcAuth::ParseMethod(sconf.getRpcAuthMethod());
+
+        if (token && auth != AuthMethod::SIMPLE) {
+            Token t;
+            t.fromString(token);
+            principal = ExtractPrincipalFromToken(t);
+            impl = ConnectInternal(uri, principal, &t, conf);
+            impl->filesystem->connect();
+            return;
+        } else if (username) {
+            principal = username;
+        }
+
+        if (auth == AuthMethod::KERBEROS) {
+            principal = ExtractPrincipalFromTicketCache(sconf.getKerberosCachePath());
+        }
+
+        impl = ConnectInternal(uri, principal, NULL, conf);
+        impl->filesystem->connect();
+    } catch (...) {
+        delete impl;
+        impl = NULL;
+        throw;
+    }
+}
+
+/**
+ * disconnect from hdfs
+ */
+void FileSystem::disconnect() {
+    delete impl;
+    impl = NULL;
+}
+
+/**
+ * To get default number of replication.
+ * @return the default number of replication.
+ */
+int FileSystem::getDefaultReplication() const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getDefaultReplication();
+}
+
+/**
+ * To get the default number of block size.
+ * @return the default block size.
+ */
+int64_t FileSystem::getDefaultBlockSize() const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getDefaultBlockSize();
+}
+
+/**
+ * To get the home directory.
+ * @return home directory.
+ */
+std::string FileSystem::getHomeDirectory() const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getHomeDirectory();
+}
+
+/**
+ * To delete a file or directory.
+ * @param path the path to be deleted.
+ * @param recursive if path is a directory, delete the contents recursively.
+ * @return return true if success.
+ */
+bool FileSystem::deletePath(const char * path, bool recursive) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->deletePath(path, recursive);
+}
+
+/**
+ * To create a directory which given permission.
+ * @param path the directory path which is to be created.
+ * @param permission directory permission.
+ * @return return true if success.
+ */
+bool FileSystem::mkdir(const char * path, const Permission & permission) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->mkdir(path, permission);
+}
+
+/**
+ * To create a directory which given permission.
+ * If parent path does not exits, create it.
+ * @param path the directory path which is to be created.
+ * @param permission directory permission.
+ * @return return true if success.
+ */
+bool FileSystem::mkdirs(const char * path, const Permission & permission) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->mkdirs(path, permission);
+}
+
+/**
+ * To get path information.
+ * @param path the path which information is to be returned.
+ * @return the path information.
+ */
+FileStatus FileSystem::getFileStatus(const char * path) const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getFileStatus(path);
+}
+
+/**
+ * Return an array containing hostnames, offset and size of
+ * portions of the given file.
+ *
+ * This call is most helpful with DFS, where it returns
+ * hostnames of machines that contain the given file.
+ *
+ * The FileSystem will simply return an elt containing 'localhost'.
+ *
+ * @param path path is used to identify an FS since an FS could have
+ *          another FS that it could be delegating the call to
+ * @param start offset into the given file
+ * @param len length for which to get locations for
+ */
+std::vector<BlockLocation> FileSystem::getFileBlockLocations(const char * path,
+        int64_t start, int64_t len) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getFileBlockLocations(path, start, len);
+}
+
+/**
+ * list the contents of a directory.
+ * @param path the directory path.
+ * @return Return a iterator to visit all elements in this directory.
+ */
+DirectoryIterator FileSystem::listDirectory(const char * path)  {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->listDirectory(path, false);
+}
+
+/**
+ * list all the contents of a directory.
+ * @param path The directory path.
+ * @return Return a vector of file informations in the directory.
+ */
+std::vector<FileStatus> FileSystem::listAllDirectoryItems(const char * path) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->listAllDirectoryItems(path, false);
+}
+
+/**
+ * To set the owner and the group of the path.
+ * username and groupname cannot be empty at the same time.
+ * @param path the path which owner of group is to be changed.
+ * @param username new user name.
+ * @param groupname new group.
+ */
+void FileSystem::setOwner(const char * path, const char * username,
+                          const char * groupname) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->filesystem->setOwner(path, username, groupname);
+}
+
+/**
+ * To set the access time or modification time of a path.
+ * @param path the path which access time or modification time is to be changed.
+ * @param mtime new modification time.
+ * @param atime new access time.
+ */
+void FileSystem::setTimes(const char * path, int64_t mtime, int64_t atime) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->filesystem->setTimes(path, mtime, atime);
+}
+
+/**
+ * To set the permission of a path.
+ * @param path the path which permission is to be changed.
+ * @param permission new permission.
+ */
+void FileSystem::setPermission(const char * path,
+                               const Permission & permission) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->filesystem->setPermission(path, permission);
+}
+
+/**
+ * To set the number of replication.
+ * @param path the path which number of replication is to be changed.
+ * @param replication new number of replication.
+ * @return return true if success.
+ */
+bool FileSystem::setReplication(const char * path, short replication) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->setReplication(path, replication);
+}
+
+/**
+ * To rename a path.
+ * @param src old path.
+ * @param dst new path.
+ * @return return true if success.
+ */
+bool FileSystem::rename(const char * src, const char * dst) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->rename(src, dst);
+}
+
+/**
+ * To set working directory.
+ * @param path new working directory.
+ */
+void FileSystem::setWorkingDirectory(const char * path) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->filesystem->setWorkingDirectory(path);
+}
+
+/**
+ * To get working directory.
+ * @return working directory.
+ */
+std::string FileSystem::getWorkingDirectory() const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getWorkingDirectory();
+}
+
+/**
+ * To test if the path exist.
+ * @param path the path which is to be tested.
+ * @return return true if the path exist.
+ */
+bool FileSystem::exist(const char * path) const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->exist(path);
+}
+
+/**
+ * To get the file system status.
+ * @return the file system status.
+ */
+FileSystemStats FileSystem::getStats() const {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getFsStats();
+}
+
+/**
+ * Truncate the file in the indicated path to the indicated size.
+ * @param src The path to the file to be truncated
+ * @param size The size the file is to be truncated to
+ *
+ * @return true if and client does not need to wait for block recovery,
+ * false if client needs to wait for block recovery.
+ */
+bool FileSystem::truncate(const char * src, int64_t size) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->truncate(src, size);
+}
+
+std::string FileSystem::getDelegationToken(const char * renewer) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getDelegationToken(renewer);
+}
+
+/**
+ * Get a valid Delegation Token using the default user as renewer.
+ *
+ * @return Token
+ * @throws IOException
+ */
+std::string FileSystem::getDelegationToken() {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return impl->filesystem->getDelegationToken();
+}
+
+/**
+ * Renew an existing delegation token.
+ *
+ * @param token delegation token obtained earlier
+ * @return the new expiration time
+ * @throws IOException
+ */
+int64_t FileSystem::renewDelegationToken(const std::string & token) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    return  impl->filesystem->renewDelegationToken(token);
+}
+
+/**
+ * Cancel an existing delegation token.
+ *
+ * @param token delegation token
+ * @throws IOException
+ */
+void FileSystem::cancelDelegationToken(const std::string & token) {
+    if (!impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->filesystem->cancelDelegationToken(token);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystem.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileSystem.h b/depends/libhdfs3/src/client/FileSystem.h
new file mode 100644
index 0000000..c74a9fd
--- /dev/null
+++ b/depends/libhdfs3/src/client/FileSystem.h
@@ -0,0 +1,294 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_
+#define _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_
+
+#include "BlockLocation.h"
+#include "DirectoryIterator.h"
+#include "FileStatus.h"
+#include "FileSystemStats.h"
+#include "Permission.h"
+#include "XmlConfig.h"
+
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+struct FileSystemWrapper;
+}
+
+class FileSystem {
+public:
+
+    /**
+     * Construct a FileSystem
+     * @param conf hdfs configuration
+     */
+    FileSystem(const Config & conf);
+
+    /**
+     * Copy construct of FileSystem
+     */
+    FileSystem(const FileSystem & other);
+
+    /**
+     * Assign operator of FileSystem
+     */
+    FileSystem & operator = (const FileSystem & other);
+
+    /**
+     * Destroy a HdfsFileSystem instance
+     */
+    ~FileSystem();
+
+    /**
+     * Connect to default hdfs.
+     */
+    void connect();
+
+    /**
+     * Connect to hdfs
+     * @param uri hdfs connection uri, hdfs://host:port
+     */
+    void connect(const char * uri);
+
+    /**
+     * Connect to hdfs with user or token
+     * 	username and token cannot be set at the same time
+     * @param uri connection uri.
+     * @param username user used to connect to hdfs
+     * @param token token used to connect to hdfs
+     */
+    void connect(const char * uri, const char * username, const char * token);
+
+    /**
+     * disconnect from hdfs
+     */
+    void disconnect();
+
+    /**
+     * To get default number of replication.
+     * @return the default number of replication.
+     */
+    int getDefaultReplication() const;
+
+    /**
+     * To get the default number of block size.
+     * @return the default block size.
+     */
+    int64_t getDefaultBlockSize() const;
+
+    /**
+     * To get the home directory.
+     * @return home directory.
+     */
+    std::string getHomeDirectory() const;
+
+    /**
+     * To delete a file or directory.
+     * @param path the path to be deleted.
+     * @param recursive if path is a directory, delete the contents recursively.
+     * @return return true if success.
+     */
+    bool deletePath(const char * path, bool recursive);
+
+    /**
+     * To create a directory which given permission.
+     * @param path the directory path which is to be created.
+     * @param permission directory permission.
+     * @return return true if success.
+     */
+    bool mkdir(const char * path, const Permission & permission);
+
+    /**
+     * To create a directory which given permission.
+     * If parent path does not exits, create it.
+     * @param path the directory path which is to be created.
+     * @param permission directory permission.
+     * @return return true if success.
+     */
+    bool mkdirs(const char * path, const Permission & permission);
+
+    /**
+     * To get path information.
+     * @param path the path which information is to be returned.
+     * @return the path information.
+     */
+    FileStatus getFileStatus(const char * path) const;
+
+    /**
+     * Return an array containing hostnames, offset and size of
+     * portions of the given file.
+     *
+     * This call is most helpful with DFS, where it returns
+     * hostnames of machines that contain the given file.
+     *
+     * The FileSystem will simply return an elt containing 'localhost'.
+     *
+     * @param path path is used to identify an FS since an FS could have
+     *          another FS that it could be delegating the call to
+     * @param start offset into the given file
+     * @param len length for which to get locations for
+     */
+    std::vector<BlockLocation> getFileBlockLocations(const char * path,
+            int64_t start, int64_t len);
+
+    /**
+     * list the contents of a directory.
+     * @param path The directory path.
+     * @return Return a iterator to visit all elements in this directory.
+     */
+    DirectoryIterator listDirectory(const char * path);
+
+    /**
+     * list all the contents of a directory.
+     * @param path The directory path.
+     * @return Return a vector of file informations in the directory.
+     */
+    std::vector<FileStatus> listAllDirectoryItems(const char * path);
+
+    /**
+     * To set the owner and the group of the path.
+     * username and groupname cannot be empty at the same time.
+     * @param path the path which owner of group is to be changed.
+     * @param username new user name.
+     * @param groupname new group.
+     */
+    void setOwner(const char * path, const char * username,
+                  const char * groupname);
+
+    /**
+     * To set the access time or modification time of a path.
+     * @param path the path which access time or modification time is to be changed.
+     * @param mtime new modification time.
+     * @param atime new access time.
+     */
+    void setTimes(const char * path, int64_t mtime, int64_t atime);
+
+    /**
+     * To set the permission of a path.
+     * @param path the path which permission is to be changed.
+     * @param permission new permission.
+     */
+    void setPermission(const char * path, const Permission & permission);
+
+    /**
+     * To set the number of replication.
+     * @param path the path which number of replication is to be changed.
+     * @param replication new number of replication.
+     * @return return true if success.
+     */
+    bool setReplication(const char * path, short replication);
+
+    /**
+     * To rename a path.
+     * @param src old path.
+     * @param dst new path.
+     * @return return true if success.
+     */
+    bool rename(const char * src, const char * dst);
+
+    /**
+     * To set working directory.
+     * @param path new working directory.
+     */
+    void setWorkingDirectory(const char * path);
+
+    /**
+     * To get working directory.
+     * @return working directory.
+     */
+    std::string getWorkingDirectory() const;
+
+    /**
+     * To test if the path exist.
+     * @param path the path which is to be tested.
+     * @return return true if the path exist.
+     */
+    bool exist(const char * path) const;
+
+    /**
+     * To get the file system status.
+     * @return the file system status.
+     */
+    FileSystemStats getStats() const;
+
+    /**
+     * Truncate the file in the indicated path to the indicated size.
+     * @param src The path to the file to be truncated
+     * @param size The size the file is to be truncated to
+     *
+     * @return true if and client does not need to wait for block recovery,
+     * false if client needs to wait for block recovery.
+     */
+    bool truncate(const char * src, int64_t size);
+
+    /**
+     * Get a valid Delegation Token.
+     *
+     * @param renewer the designated renewer for the token
+     * @return Token string
+     * @throws IOException
+     */
+    std::string getDelegationToken(const char * renewer);
+
+    /**
+     * Get a valid Delegation Token using the default user as renewer.
+     *
+     * @return Token string
+     * @throws IOException
+     */
+    std::string getDelegationToken();
+
+    /**
+     * Renew an existing delegation token.
+     *
+     * @param token delegation token obtained earlier
+     * @return the new expiration time
+     * @throws IOException
+     */
+    int64_t renewDelegationToken(const std::string & token);
+
+    /**
+     * Cancel an existing delegation token.
+     *
+     * @param token delegation token
+     * @throws IOException
+     */
+    void cancelDelegationToken(const std::string & token);
+
+private:
+    Config conf;
+    Internal::FileSystemWrapper * impl;
+
+    friend class InputStream;
+    friend class OutputStream;
+};
+
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_FILESYSTEM_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/FileSystemImpl.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileSystemImpl.cpp b/depends/libhdfs3/src/client/FileSystemImpl.cpp
new file mode 100644
index 0000000..c9daeb3
--- /dev/null
+++ b/depends/libhdfs3/src/client/FileSystemImpl.cpp
@@ -0,0 +1,785 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Atomic.h"
+#include "BlockLocation.h"
+#include "DirectoryIterator.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileStatus.h"
+#include "FileSystemImpl.h"
+#include "FileSystemStats.h"
+#include "InputStream.h"
+#include "LeaseRenewer.h"
+#include "Logger.h"
+#include "OutputStream.h"
+#include "OutputStreamImpl.h"
+#include "server/LocatedBlocks.h"
+#include "server/NamenodeInfo.h"
+#include "server/NamenodeProxy.h"
+#include "StringUtil.h"
+
+#include <cstring>
+#include <inttypes.h>
+#include <libxml/uri.h>
+#include <strings.h>
+
+namespace Hdfs {
+namespace Internal {
+
+static const std::string GetAbsPath(const std::string & prefix,
+                                    const std::string & path) {
+    if (path.empty()) {
+        return prefix;
+    }
+
+    if ('/' == path[0]) {
+        return path;
+    } else {
+        return prefix + "/" + path;
+    }
+}
+
+/*
+ * Return the canonical absolute name of file NAME.
+ * A canonical name does not contain any `.', `..' components nor any repeated path separators ('/')
+ */
+static const std::string CanonicalizePath(const std::string & path) {
+    int skip = 0;
+    std::string retval;
+    std::vector<std::string> components = StringSplit(path, "/");
+    std::deque<std::string> tmp;
+    std::vector<std::string>::reverse_iterator s = components.rbegin();
+
+    while (s != components.rend()) {
+        if (s->empty() || *s == ".") {
+            ++s;
+        } else if (*s == "..") {
+            ++skip;
+            ++s;
+        } else {
+            if (skip <= 0) {
+                tmp.push_front(*s);
+            } else {
+                --skip;
+            }
+
+            ++s;
+        }
+    }
+
+    for (size_t i = 0; i < tmp.size(); ++i) {
+        retval += "/";
+        retval += tmp[i];
+    }
+
+    return retval.empty() ? "/" : retval;
+}
+
+FileSystemImpl::FileSystemImpl(const FileSystemKey& key, const Config& c)
+    : conf(c),
+      key(key),
+      openedOutputStream(0),
+      nn(NULL),
+      sconf(c),
+      user(key.getUser()) {
+    static atomic<uint32_t> count(0);
+    std::stringstream ss;
+    ss.imbue(std::locale::classic());
+    srand((unsigned int) time(NULL));
+    ss << "libhdfs3_client_random_" << rand() << "_count_" << ++count << "_pid_"
+       << getpid() << "_tid_" << pthread_self();
+    clientName = ss.str();
+    workingDir = std::string("/user/") + user.getEffectiveUser();
+    peerCache = shared_ptr<PeerCache>(new PeerCache(sconf));
+#ifdef MOCK
+    stub = NULL;
+#endif
+    //set log level
+    RootLogger.setLogSeverity(sconf.getLogSeverity());
+}
+
+/**
+ * Destroy a FileSystemBase instance
+ */
+FileSystemImpl::~FileSystemImpl() {
+    try {
+        disconnect();
+    } catch (...) {
+    }
+}
+
+const std::string FileSystemImpl::getStandardPath(const char * path) {
+    std::string base;
+    {
+        lock_guard<mutex> lock(mutWorkingDir);
+        base = workingDir;
+    }
+    return CanonicalizePath(GetAbsPath(base, path));
+}
+
+const char * FileSystemImpl::getClientName() {
+    return clientName.c_str();
+}
+
+void FileSystemImpl::connect() {
+    std::string host, port, uri;
+    std::vector<NamenodeInfo> namenodeInfos;
+
+    if (nn) {
+        THROW(HdfsIOException, "FileSystemImpl: already connected.");
+    }
+
+    host = key.getHost();
+    port = key.getPort();
+    uri += key.getScheme() + "://" + host;
+
+    if (port.empty()) {
+        try {
+            namenodeInfos = NamenodeInfo::GetHANamenodeInfo(key.getHost(), conf);
+        } catch (const HdfsConfigNotFound & e) {
+            NESTED_THROW(InvalidParameter, "Cannot parse URI: %s, missing port or invalid HA configuration", uri.c_str());
+        }
+
+        tokenService = "ha-hdfs:";
+        tokenService += host;
+    } else {
+        std::stringstream ss;
+        ss.imbue(std::locale::classic());
+        ss << host << ":" << port;
+        namenodeInfos.resize(1);
+        namenodeInfos[0].setRpcAddr(ss.str());
+        tokenService = namenodeInfos[0].getRpcAddr();
+    }
+
+#ifdef MOCK
+    nn = stub->getNamenode();
+#else
+    nn = new NamenodeProxy(namenodeInfos, tokenService, sconf, RpcAuth(user, RpcAuth::ParseMethod(sconf.getRpcAuthMethod())));
+#endif
+    /*
+     * To test if the connection is ok
+     */
+    getFsStats();
+}
+
+/**
+ * disconnect from hdfs
+ */
+void FileSystemImpl::disconnect() {
+    if (nn) {
+        nn->close();
+        delete nn;
+    }
+
+    nn = NULL;
+}
+
+/**
+ * To get default number of replication.
+ * @return the default number of replication.
+ */
+int FileSystemImpl::getDefaultReplication() const {
+    return sconf.getDefaultReplica();
+}
+
+/**
+ * To get the default number of block size.
+ * @return the default block size.
+ */
+int64_t FileSystemImpl::getDefaultBlockSize() const {
+    return sconf.getDefaultBlockSize();
+}
+
+/**
+ * To get the home directory.
+ * @return home directory.
+ */
+std::string FileSystemImpl::getHomeDirectory() const {
+    return std::string("/user/") + user.getEffectiveUser();
+}
+
+/**
+ * To delete a file or directory.
+ * @param path the path to be deleted.
+ * @param recursive if path is a directory, delete the contents recursively.
+ * @return return true if success.
+ */
+
+bool FileSystemImpl::deletePath(const char * path, bool recursive) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return nn->deleteFile(getStandardPath(path), recursive);
+}
+
+/**
+ * To create a directory which given permission.
+ * @param path the directory path which is to be created.
+ * @param permission directory permission.
+ * @return return true if success.
+ */
+
+bool FileSystemImpl::mkdir(const char * path, const Permission & permission) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return nn->mkdirs(getStandardPath(path), permission, false);
+}
+
+/**
+ * To create a directory which given permission.
+ * If parent path does not exits, create it.
+ * @param path the directory path which is to be created.
+ * @param permission directory permission.
+ * @return return true if success.
+ */
+
+bool FileSystemImpl::mkdirs(const char * path, const Permission & permission) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return nn->mkdirs(getStandardPath(path), permission, true);
+}
+
+/**
+ * To get path information.
+ * @param path the path which information is to be returned.
+ * @return the path information.
+ */
+FileStatus FileSystemImpl::getFileStatus(const char * path) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return nn->getFileInfo(getStandardPath(path), NULL);
+}
+
+static void Convert(BlockLocation & bl, const LocatedBlock & lb) {
+    const std::vector<DatanodeInfo> & nodes = lb.getLocations();
+    bl.setCorrupt(lb.isCorrupt());
+    bl.setLength(lb.getNumBytes());
+    bl.setOffset(lb.getOffset());
+    std::vector<std::string> hosts(nodes.size());
+    std::vector<std::string> names(nodes.size());
+    std::vector<std::string> topologyPaths(nodes.size());
+
+    for (size_t i = 0 ; i < nodes.size() ; ++i) {
+        hosts[i] = nodes[i].getHostName();
+        names[i] = nodes[i].getXferAddr();
+        topologyPaths[i] = nodes[i].getLocation() + '/' + nodes[i].getXferAddr();
+    }
+
+    bl.setNames(names);
+    bl.setHosts(hosts);
+    bl.setTopologyPaths(topologyPaths);
+}
+
+std::vector<BlockLocation> FileSystemImpl::getFileBlockLocations(
+    const char * path, int64_t start, int64_t len) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    if (start < 0) {
+        THROW(InvalidParameter, "Invalid input: start offset should be positive");
+    }
+
+    if (len < 0) {
+        THROW(InvalidParameter, "Invalid input: length should be positive");
+    }
+
+    LocatedBlocksImpl lbs;
+    nn->getBlockLocations(getStandardPath(path), start, len, lbs);
+    std::vector<LocatedBlock> blocks = lbs.getBlocks();
+    std::vector<BlockLocation> retval(blocks.size());
+
+    for (size_t i = 0; i < blocks.size(); ++i) {
+        Convert(retval[i], blocks[i]);
+    }
+
+    return retval;
+}
+
+/**
+ * list the contents of a directory.
+ * @param path the directory path.
+ * @return return the path informations in the given directory.
+ */
+DirectoryIterator FileSystemImpl::listDirectory(const char * path,
+        bool needLocation) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return DirectoryIterator(this, getStandardPath(path), needLocation);
+}
+
+/**
+ * list all the contents of a directory.
+ * @param path The directory path.
+ * @return Return a vector of file informations in the directory.
+ */
+std::vector<FileStatus> FileSystemImpl::listAllDirectoryItems(const char * path,
+        bool needLocation) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    std::string startAfter;
+    std::string p = getStandardPath(path);
+    std::vector<FileStatus> retval;
+
+    while (getListing(p, startAfter, needLocation, retval)) {
+        startAfter = retval.back().getPath();
+    }
+
+    return retval;
+}
+
+/**
+ * To set the owner and the group of the path.
+ * username and groupname cannot be empty at the same time.
+ * @param path the path which owner of group is to be changed.
+ * @param username new user name.
+ * @param groupname new group.
+ */
+void FileSystemImpl::setOwner(const char * path, const char * username,
+                              const char * groupname) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    if ((NULL == username || !strlen(username))
+            && (NULL == groupname || !strlen(groupname))) {
+        THROW(InvalidParameter,
+              "Invalid input: username and groupname should not be empty");
+    }
+
+    nn->setOwner(getStandardPath(path), username != NULL ? username : "",
+                 groupname != NULL ? groupname : "");
+}
+
+/**
+ * To set the access time or modification time of a path.
+ * @param path the path which access time or modification time is to be changed.
+ * @param mtime new modification time.
+ * @param atime new access time.
+ */
+void FileSystemImpl::setTimes(const char * path, int64_t mtime, int64_t atime) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    nn->setTimes(getStandardPath(path), mtime, atime);
+}
+
+/**
+ * To set the permission of a path.
+ * @param path the path which permission is to be changed.
+ * @param permission new permission.
+ */
+void FileSystemImpl::setPermission(const char * path,
+                                   const Permission & permission) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    nn->setPermission(getStandardPath(path), permission);
+}
+
+/**
+ * To set the number of replication.
+ * @param path the path which number of replication is to be changed.
+ * @param replication new number of replication.
+ * @return return true if success.
+ */
+
+bool FileSystemImpl::setReplication(const char * path, short replication) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    return nn->setReplication(getStandardPath(path), replication);
+}
+
+/**
+ * To rename a path.
+ * @param src old path.
+ * @param dst new path.
+ * @return return true if success.
+ */
+
+bool FileSystemImpl::rename(const char * src, const char * dst) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == src || !strlen(src)) {
+        THROW(InvalidParameter, "Invalid input: src should not be empty");
+    }
+
+    if (NULL == dst || !strlen(dst)) {
+        THROW(InvalidParameter, "Invalid input: dst should not be empty");
+    }
+
+    return nn->rename(getStandardPath(src), getStandardPath(dst));
+}
+
+/**
+ * To set working directory.
+ * @param path new working directory.
+ */
+void FileSystemImpl::setWorkingDirectory(const char * path) {
+    if (NULL == path) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    if (!strlen(path) || '/' != path[0]) {
+        THROW(InvalidParameter,
+              "Invalid input: path should be an absolute path");
+    }
+
+    lock_guard<mutex> lock(mutWorkingDir);
+    workingDir = path;
+}
+
+/**
+ * To get working directory.
+ * @return working directory.
+ */
+std::string FileSystemImpl::getWorkingDirectory() const {
+    return workingDir;
+}
+
+/**
+ * To test if the path exist.
+ * @param path the path which is to be tested.
+ * @return return true if the path exist.
+ */
+
+bool FileSystemImpl::exist(const char * path) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: path should not be empty");
+    }
+
+    try {
+        bool retval = true;
+        nn->getFileInfo(getStandardPath(path), &retval);
+        return retval;
+    } catch (const FileNotFoundException & e) {
+        return false;
+    }
+
+    return true;
+}
+
+/**
+ * To get the file system status.
+ * @return the file system status.
+ */
+FileSystemStats FileSystemImpl::getFsStats() {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    std::vector<int64_t> retval = nn->getFsStats();
+    assert(retval.size() >= 3);
+    return FileSystemStats(retval[0], retval[1], retval[2]);
+}
+
+/**
+ * Truncate the file in the indicated path to the indicated size.
+ * @param path The path to the file to be truncated
+ * @param size The size the file is to be truncated to
+ *
+ * @return true if and client does not need to wait for block recovery,
+ * false if client needs to wait for block recovery.
+ */
+bool FileSystemImpl::truncate(const char * path, int64_t size) {
+    LOG(DEBUG1, "truncate file %s to length %" PRId64, path, size);
+
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == path || !strlen(path)) {
+        THROW(InvalidParameter, "Invalid input: src should not be empty.");
+    }
+
+    std::string absPath = getStandardPath(path);
+
+    return nn->truncate(absPath, size, clientName);
+}
+
+std::string FileSystemImpl::getDelegationToken(const char * renewer) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    if (NULL == renewer || !strlen(renewer)) {
+        THROW(InvalidParameter, "Invalid input: renewer should not be empty.");
+    }
+
+    Token retval = nn->getDelegationToken(renewer);
+    retval.setService(tokenService);
+    return retval.toString();
+}
+
+std::string FileSystemImpl::getDelegationToken() {
+    return getDelegationToken(key.getUser().getPrincipal().c_str());
+}
+
+int64_t FileSystemImpl::renewDelegationToken(const std::string & token) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    Token t;
+    t.fromString(token);
+    return  nn->renewDelegationToken(t);
+}
+
+void FileSystemImpl::cancelDelegationToken(const std::string & token) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    Token t;
+    t.fromString(token);
+    nn->cancelDelegationToken(t);
+}
+
+void FileSystemImpl::getBlockLocations(const std::string & src, int64_t offset,
+                                       int64_t length, LocatedBlocks & lbs) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->getBlockLocations(src, offset, length, lbs);
+}
+
+void FileSystemImpl::create(const std::string & src, const Permission & masked,
+                            int flag, bool createParent, short replication, int64_t blockSize) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->create(src, masked, clientName, flag, createParent, replication,
+               blockSize);
+}
+
+std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
+FileSystemImpl::append(const std::string& src) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->append(src, clientName);
+}
+
+void FileSystemImpl::abandonBlock(const ExtendedBlock & b,
+                                  const std::string & src) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->abandonBlock(b, src, clientName);
+}
+
+shared_ptr<LocatedBlock> FileSystemImpl::addBlock(const std::string & src,
+        const ExtendedBlock * previous,
+        const std::vector<DatanodeInfo> & excludeNodes) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->addBlock(src, clientName, previous, excludeNodes);
+}
+
+shared_ptr<LocatedBlock> FileSystemImpl::getAdditionalDatanode(
+    const std::string & src, const ExtendedBlock & blk,
+    const std::vector<DatanodeInfo> & existings,
+    const std::vector<std::string> & storageIDs,
+    const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->getAdditionalDatanode(src, blk, existings, storageIDs, excludes,
+                                     numAdditionalNodes, clientName);
+}
+
+bool FileSystemImpl::complete(const std::string & src,
+                              const ExtendedBlock * last) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->complete(src, clientName, last);
+}
+
+/*void FileSystemImpl::reportBadBlocks(const std::vector<LocatedBlock> & blocks) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->reportBadBlocks(blocks);
+}*/
+
+void FileSystemImpl::fsync(const std::string & src) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->fsync(src, clientName);
+}
+
+shared_ptr<LocatedBlock> FileSystemImpl::updateBlockForPipeline(
+    const ExtendedBlock & block) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->updateBlockForPipeline(block, clientName);
+}
+
+void FileSystemImpl::updatePipeline(const ExtendedBlock & oldBlock,
+                                    const ExtendedBlock & newBlock,
+                                    const std::vector<DatanodeInfo> & newNodes,
+                                    const std::vector<std::string> & storageIDs) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    nn->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
+}
+
+bool FileSystemImpl::getListing(const std::string & src,
+                                const std::string & startAfter, bool needLocation,
+                                std::vector<FileStatus> & dl) {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    return nn->getListing(src, startAfter, needLocation, dl);
+}
+
+bool FileSystemImpl::renewLease() {
+    if (!nn) {
+        THROW(HdfsIOException, "FileSystemImpl: not connected.");
+    }
+
+    //protected by LeaseRenewer's lock
+    if (0 == openedOutputStream) {
+        return false;
+    }
+
+    try {
+        nn->renewLease(clientName);
+        return true;
+    } catch (const HdfsException & e) {
+        std::string buffer;
+        LOG(LOG_ERROR,
+            "Failed to renew lease for filesystem which client name is %s, since:\n%s",
+            getClientName(), GetExceptionDetail(e, buffer));
+    } catch (const std::exception & e) {
+        LOG(LOG_ERROR,
+            "Failed to renew lease for filesystem which client name is %s, since:\n%s",
+            getClientName(), e.what());
+    }
+
+    return false;
+}
+
+void FileSystemImpl::registerOpenedOutputStream() {
+    //protected by LeaseRenewer's lock
+    ++openedOutputStream;
+}
+
+bool FileSystemImpl::unregisterOpenedOutputStream() {
+    //protected by LeaseRenewer's lock
+    if (openedOutputStream > 0) {
+        --openedOutputStream;
+    }
+
+    return  openedOutputStream == 0;
+}
+
+}
+}


Mime
View raw message