hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhuvnesh2...@apache.org
Subject [19/48] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified
Date Mon, 04 Apr 2016 05:09:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStream.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/InputStream.cpp b/depends/libhdfs3/src/client/InputStream.cpp
new file mode 100644
index 0000000..6cbf46d
--- /dev/null
+++ b/depends/libhdfs3/src/client/InputStream.cpp
@@ -0,0 +1,107 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FileSystemImpl.h"
+#include "FileSystemInter.h"
+#include "InputStream.h"
+#include "InputStreamImpl.h"
+#include "InputStreamInter.h"
+
+using namespace Hdfs::Internal;
+
+namespace Hdfs {
+
+InputStream::InputStream() {
+    impl = new Internal::InputStreamImpl;
+}
+
+InputStream::~InputStream() {
+    delete impl;
+}
+
+/**
+ * Open a file to read
+ * @param fs hdfs file system.
+ * @param path the file to be read.
+ * @param verifyChecksum verify the checksum.
+ */
+void InputStream::open(FileSystem & fs, const char * path,
+                       bool verifyChecksum) {
+    if (!fs.impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->open(fs.impl->filesystem, path, verifyChecksum);
+}
+
+/**
+ * To read data from hdfs.
+ * @param buf the buffer used to filled.
+ * @param size buffer size.
+ * @return return the number of bytes filled in the buffer, it may less than size.
+ */
+int32_t InputStream::read(char * buf, int32_t size) {
+    return impl->read(buf, size);
+}
+
+/**
+ * To read data from hdfs, block until get the given size of bytes.
+ * @param buf the buffer used to filled.
+ * @param size the number of bytes to be read.
+ */
+void InputStream::readFully(char * buf, int64_t size) {
+    impl->readFully(buf, size);
+}
+
+int64_t InputStream::available() {
+    return impl->available();
+}
+
+/**
+ * To move the file point to the given position.
+ * @param pos the given position.
+ */
+void InputStream::seek(int64_t pos) {
+    impl->seek(pos);
+}
+
+/**
+ * To get the current file point position.
+ * @return the position of current file point.
+ */
+int64_t InputStream::tell() {
+    return impl->tell();
+}
+
+/**
+ * Close the sthream.
+ */
+void InputStream::close() {
+    impl->close();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStream.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/InputStream.h b/depends/libhdfs3/src/client/InputStream.h
new file mode 100644
index 0000000..73f45ca
--- /dev/null
+++ b/depends/libhdfs3/src/client/InputStream.h
@@ -0,0 +1,99 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
+#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_
+
+#include "FileSystem.h"
+
+namespace Hdfs {
+namespace Internal {
+class InputStreamInter;
+}
+
+/**
+ * A input stream used read data from hdfs.
+ */
+class InputStream {
+public:
+    InputStream();
+
+    ~InputStream();
+
+    /**
+     * Open a file to read
+     * @param fs hdfs file system.
+     * @param path the file to be read.
+     * @param verifyChecksum verify the checksum.
+     */
+    void open(FileSystem & fs, const char * path, bool verifyChecksum = true);
+
+    /**
+     * To read data from hdfs.
+     * @param buf the buffer used to filled.
+     * @param size buffer size.
+     * @return return the number of bytes filled in the buffer, it may less than size.
+     */
+    int32_t read(char * buf, int32_t size);
+
+    /**
+     * To read data from hdfs, block until get the given size of bytes.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     */
+    void readFully(char * buf, int64_t size);
+
+    /**
+     * Get how many bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking.
+     */
+    int64_t available();
+
+    /**
+     * To move the file point to the given position.
+     * @param pos the given position.
+     */
+    void seek(int64_t pos);
+
+    /**
+     * To get the current file point position.
+     * @return the position of current file point.
+     */
+    int64_t tell();
+
+    /**
+     * Close the stream.
+     */
+    void close();
+
+private:
+    Internal::InputStreamInter * impl;
+};
+
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamImpl.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/InputStreamImpl.cpp b/depends/libhdfs3/src/client/InputStreamImpl.cpp
new file mode 100644
index 0000000..6bb3e18
--- /dev/null
+++ b/depends/libhdfs3/src/client/InputStreamImpl.cpp
@@ -0,0 +1,812 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemInter.h"
+#include "InputStreamImpl.h"
+#include "InputStreamInter.h"
+#include "LocalBlockReader.h"
+#include "Logger.h"
+#include "RemoteBlockReader.h"
+#include "server/Datanode.h"
+#include "Thread.h"
+
+#include <algorithm>
+#include <ifaddrs.h>
+#include <inttypes.h>
+#include <iostream>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace Hdfs {
+namespace Internal {
+
+unordered_set<std::string> BuildLocalAddrSet() {
+    unordered_set<std::string> set;
+    struct ifaddrs * ifAddr = NULL;
+    struct ifaddrs * pifAddr = NULL;
+    struct sockaddr * addr;
+
+    if (getifaddrs(&ifAddr)) {
+        THROW(HdfsNetworkException,
+              "InputStreamImpl: cannot get local network interface: %s",
+              GetSystemErrorInfo(errno));
+    }
+
+    try {
+        std::vector<char> host;
+        const char * pHost;
+        host.resize(INET6_ADDRSTRLEN + 1);
+
+        for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) {
+            addr = pifAddr->ifa_addr;
+
+            if (!addr) {
+                continue;
+            }
+
+            memset(&host[0], 0, INET6_ADDRSTRLEN + 1);
+
+            if (addr->sa_family == AF_INET) {
+                pHost =
+                    inet_ntop(addr->sa_family,
+                              &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr,
+                              &host[0], INET6_ADDRSTRLEN);
+            } else if (addr->sa_family == AF_INET6) {
+                pHost =
+                    inet_ntop(addr->sa_family,
+                              &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr,
+                              &host[0], INET6_ADDRSTRLEN);
+            } else {
+                continue;
+            }
+
+            if (NULL == pHost) {
+                THROW(HdfsNetworkException,
+                      "InputStreamImpl: cannot get convert network address to textual form: %s",
+                      GetSystemErrorInfo(errno));
+            }
+
+            set.insert(pHost);
+        }
+
+        /*
+         * add hostname.
+         */
+        long hostlen = sysconf(_SC_HOST_NAME_MAX);
+        host.resize(hostlen + 1);
+
+        if (gethostname(&host[0], host.size())) {
+            THROW(HdfsNetworkException,
+                  "InputStreamImpl: cannot get hostname: %s",
+                  GetSystemErrorInfo(errno));
+        }
+
+        set.insert(&host[0]);
+    } catch (...) {
+        if (ifAddr != NULL) {
+            freeifaddrs(ifAddr);
+        }
+
+        throw;
+    }
+
+    if (ifAddr != NULL) {
+        freeifaddrs(ifAddr);
+    }
+
+    return set;
+}
+
+InputStreamImpl::InputStreamImpl() :
+    closed(true), localRead(true), readFromUnderConstructedBlock(false), verify(
+        true), maxGetBlockInfoRetry(3), cursor(0), endOfCurBlock(0), lastBlockBeingWrittenLength(
+            0), prefetchSize(0), peerCache(NULL) {
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+InputStreamImpl::~InputStreamImpl() {
+}
+
+void InputStreamImpl::checkStatus() {
+    if (closed) {
+        THROW(HdfsIOException, "InputStreamImpl: stream is not opened.");
+    }
+
+    if (lastError != exception_ptr()) {
+        rethrow_exception(lastError);
+    }
+}
+
+
+int64_t InputStreamImpl::readBlockLength(const LocatedBlock & b) {
+    const std::vector<DatanodeInfo> & nodes = b.getLocations();
+    int replicaNotFoundCount = nodes.size();
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        try {
+            int64_t n = 0;
+            shared_ptr<Datanode> dn;
+            RpcAuth a = auth;
+            a.getUser().addToken(b.getToken());
+#ifdef MOCK
+
+            if (stub) {
+                dn = stub->getDatanode();
+            } else {
+                dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
+                                              nodes[i].getIpcPort(), *conf, a));
+            }
+
+#else
+            dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(),
+                                          nodes[i].getIpcPort(), *conf, a));
+#endif
+            n = dn->getReplicaVisibleLength(b);
+
+            if (n >= 0) {
+                return n;
+            }
+        } catch (const ReplicaNotFoundException & e) {
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
+                b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
+            LOG(INFO,
+                "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
+                b.toString().c_str(), path.c_str());
+            --replicaNotFoundCount;
+        } catch (const HdfsIOException & e) {
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s",
+                b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer));
+            LOG(INFO,
+                "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode",
+                b.toString().c_str(), path.c_str());
+        }
+    }
+
+    // Namenode told us about these locations, but none know about the replica
+    // means that we hit the race between pipeline creation start and end.
+    // we require all 3 because some other exception could have happened
+    // on a DN that has it.  we want to report that error
+    if (replicaNotFoundCount == 0) {
+        return 0;
+    }
+
+    return -1;
+}
+
+/**
+ * Getting blocks locations'information from namenode
+ */
+void InputStreamImpl::updateBlockInfos() {
+    int retry = maxGetBlockInfoRetry;
+
+    for (int i = 0; i < retry; ++i) {
+        try {
+            if (!lbs) {
+                lbs = shared_ptr < LocatedBlocksImpl > (new LocatedBlocksImpl);
+            }
+
+            filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs);
+
+            if (lbs->isLastBlockComplete()) {
+                lastBlockBeingWrittenLength = 0;
+            } else {
+                shared_ptr<LocatedBlock> last = lbs->getLastBlock();
+
+                if (!last) {
+                    lastBlockBeingWrittenLength = 0;
+                } else {
+                    lastBlockBeingWrittenLength = readBlockLength(*last);
+
+                    if (lastBlockBeingWrittenLength == -1) {
+                        if (i + 1 >= retry) {
+                            THROW(HdfsIOException,
+                                  "InputStreamImpl: failed to get block visible length for Block: %s from all Datanode.",
+                                  last->toString().c_str());
+                        } else {
+                            LOG(LOG_ERROR,
+                                "InputStreamImpl: failed to get block visible length for Block: %s file %s from all Datanode.",
+                                last->toString().c_str(), path.c_str());
+
+                            try {
+                                sleep_for(milliseconds(4000));
+                            } catch (...) {
+                            }
+
+                            continue;
+                        }
+                    }
+
+                    last->setNumBytes(lastBlockBeingWrittenLength);
+                }
+            }
+
+            return;
+        } catch (const HdfsRpcException & e) {
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to get block information for file %s, %s",
+                path.c_str(), GetExceptionDetail(e, buffer));
+
+            if (i + 1 >= retry) {
+                throw;
+            }
+        }
+
+        LOG(INFO,
+            "InputStreamImpl: retry to get block information for file: %s, already tried %d time(s).",
+            path.c_str(), i + 1);
+    }
+}
+
+int64_t InputStreamImpl::getFileLength() {
+    int64_t length = lbs->getFileLength();
+
+    if (!lbs->isLastBlockComplete()) {
+        length += lastBlockBeingWrittenLength;
+    }
+
+    return length;
+}
+
+void InputStreamImpl::seekToBlock(const LocatedBlock & lb) {
+    if (cursor >= lbs->getFileLength()) {
+        assert(!lbs->isLastBlockComplete());
+        readFromUnderConstructedBlock = true;
+    } else {
+        readFromUnderConstructedBlock = false;
+    }
+
+    assert(cursor >= lb.getOffset()
+           && cursor < lb.getOffset() + lb.getNumBytes());
+    curBlock = shared_ptr < LocatedBlock > (new LocatedBlock(lb));
+    int64_t blockSize = curBlock->getNumBytes();
+    assert(blockSize > 0);
+    endOfCurBlock = blockSize + curBlock->getOffset();
+    failedNodes.clear();
+    blockReader.reset();
+}
+
+bool InputStreamImpl::choseBestNode() {
+    const std::vector<DatanodeInfo> & nodes = curBlock->getLocations();
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        if (std::binary_search(failedNodes.begin(), failedNodes.end(),
+                               nodes[i])) {
+            continue;
+        }
+
+        curNode = nodes[i];
+        return true;
+    }
+
+    return false;
+}
+
+bool InputStreamImpl::isLocalNode() {
+    static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet();
+    bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end();
+    return retval;
+}
+
+void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) {
+    bool lastReadFromLocal = false;
+    exception_ptr lastException;
+
+    while (true) {
+        if (!choseBestNode()) {
+            try {
+                if (lastException) {
+                    rethrow_exception(lastException);
+                }
+            } catch (...) {
+                NESTED_THROW(HdfsIOException,
+                             "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
+                             curBlock->toString().c_str());
+            }
+
+            THROW(HdfsIOException,
+                  "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.",
+                  curBlock->toString().c_str());
+        }
+
+        try {
+            int64_t offset, len;
+            offset = cursor - curBlock->getOffset();
+            assert(offset >= 0);
+            len = curBlock->getNumBytes() - offset;
+            assert(len > 0);
+
+            if (!temporaryDisableLocalRead && !lastReadFromLocal &&
+                !readFromUnderConstructedBlock && localRead && isLocalNode()) {
+                lastReadFromLocal = true;
+
+                shared_ptr<ReadShortCircuitInfo> info;
+                ReadShortCircuitInfoBuilder builder(curNode, auth, *conf);
+
+                try {
+                    info = builder.fetchOrCreate(*curBlock, curBlock->getToken());
+
+                    if (!info) {
+                        continue;
+                    }
+
+                    assert(info->isValid());
+                    blockReader = shared_ptr<BlockReader>(
+                        new LocalBlockReader(info, *curBlock, offset, verify,
+                                             *conf, localReaderBuffer));
+                } catch (...) {
+                    if (info) {
+                        info->setValid(false);
+                    }
+
+                    throw;
+                }
+            } else {
+                const char * clientName = filesystem->getClientName();
+                lastReadFromLocal = false;
+                blockReader = shared_ptr<BlockReader>(new RemoteBlockReader(
+                    *curBlock, curNode, *peerCache, offset, len,
+                    curBlock->getToken(), clientName, verify, *conf));
+            }
+
+            break;
+        } catch (const HdfsIOException & e) {
+            lastException = current_exception();
+            std::string buffer;
+
+            if (lastReadFromLocal) {
+                LOG(LOG_ERROR,
+                    "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\n"
+                    "retry the same node but disable read shortcircuit feature",
+                    curBlock->toString().c_str(), path.c_str(),
+                    curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
+                /*
+                 * do not add node into failedNodes since we will retry the same node but
+                 * disable local block reading
+                 */
+            } else {
+                LOG(LOG_ERROR,
+                    "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\nretry another node",
+                    curBlock->toString().c_str(), path.c_str(),
+                    curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
+                failedNodes.push_back(curNode);
+                std::sort(failedNodes.begin(), failedNodes.end());
+            }
+        }
+    }
+}
+
+void InputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path,
+                           bool verifyChecksum) {
+    if (NULL == path || 0 == strlen(path)) {
+        THROW(InvalidParameter, "path is invalid.");
+    }
+
+    try {
+        openInternal(fs, path, verifyChecksum);
+    } catch (...) {
+        close();
+        throw;
+    }
+}
+
+void InputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path,
+                                   bool verifyChecksum) {
+    try {
+        filesystem = fs;
+        verify = verifyChecksum;
+        this->path = fs->getStandardPath(path);
+        LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, this->path.c_str(), (verifyChecksum ? "true" : "false"));
+        conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf()));
+        this->auth = RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getRpcAuthMethod()));
+        prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize();
+        localRead = conf->isReadFromLocal();
+        maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry();
+        peerCache = &fs->getPeerCache();
+        updateBlockInfos();
+        closed = false;
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const FileNotFoundException & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.",
+                     this->path.c_str());
+    }
+}
+
+int32_t InputStreamImpl::read(char * buf, int32_t size) {
+    checkStatus();
+
+    try {
+        int64_t prvious = cursor;
+        int32_t done = readInternal(buf, size);
+        LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 " done %d, next pos %" PRId64, this, path.c_str(), size,
+            prvious, done, cursor);
+        return done;
+    } catch (const HdfsEndOfStream & e) {
+        throw;
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure) {
+    bool temporaryDisableLocalRead = false;
+    std::string buffer;
+
+    while (true) {
+        try {
+            /*
+             * Setup block reader here and handle failure.
+             */
+            if (!blockReader) {
+                setupBlockReader(temporaryDisableLocalRead);
+                temporaryDisableLocalRead = false;
+            }
+        } catch (const HdfsInvalidBlockToken & e) {
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
+                curBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
+            return -1;
+        } catch (const HdfsIOException & e) {
+            /*
+             * In setupBlockReader, we have tried all the replicas.
+             * We now update block informations once, and try again.
+             */
+            if (shouldUpdateMetadataOnFailure) {
+                LOG(LOG_ERROR,
+                    "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.",
+                    curBlock->toString().c_str(), path.c_str(),
+                    GetExceptionDetail(e, buffer));
+                return -1;
+            } else {
+                /*
+                 * We have updated block informations and failed again.
+                 */
+                throw;
+            }
+        }
+
+        /*
+         * Block reader has been setup, read from block reader.
+         */
+        try {
+            int32_t todo = size;
+            todo = todo < endOfCurBlock - cursor ?
+                   todo : static_cast<int32_t>(endOfCurBlock - cursor);
+            assert(blockReader);
+            todo = blockReader->read(buf, todo);
+            cursor += todo;
+            /*
+             * Exit the loop and function from here if success.
+             */
+            return todo;
+        } catch (const HdfsIOException & e) {
+            /*
+             * Failed to read from current block reader,
+             * add the current datanode to invalid node list and try again.
+             */
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
+                "retry read again from another Datanode.",
+                curBlock->toString().c_str(), path.c_str(),
+                curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
+
+            if (conf->doesNotRetryAnotherNode()) {
+                throw;
+            }
+        } catch (const ChecksumException & e) {
+            LOG(LOG_ERROR,
+                "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, "
+                "retry read again from another Datanode.",
+                curBlock->toString().c_str(), path.c_str(),
+                curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer));
+        }
+
+        /*
+         * Successfully create the block reader but failed to read.
+         * Disable the local block reader and try the same node again.
+         */
+        if (!blockReader || dynamic_cast<LocalBlockReader *>(blockReader.get())) {
+            temporaryDisableLocalRead = true;
+        } else {
+            /*
+             * Remote block reader failed to read, try another node.
+             */
+            LOG(INFO, "IntputStreamImpl: Add invalid datanode %s to failed datanodes and try another datanode again for file %s.",
+                curNode.formatAddress().c_str(), path.c_str());
+            failedNodes.push_back(curNode);
+            std::sort(failedNodes.begin(), failedNodes.end());
+        }
+
+        blockReader.reset();
+    }
+}
+
+/**
+ * To read data from hdfs.
+ * @param buf the buffer used to filled.
+ * @param size buffer size.
+ * @return return the number of bytes filled in the buffer, it may less than size.
+ */
+int32_t InputStreamImpl::readInternal(char * buf, int32_t size) {
+    int updateMetadataOnFailure = conf->getMaxReadBlockRetry();
+
+    try {
+        do {
+            const LocatedBlock * lb = NULL;
+
+            /*
+             * Check if we have got the block information we need.
+             */
+            if (!lbs || cursor >= getFileLength()
+                    || (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) {
+                /*
+                 * Get block information from namenode.
+                 * Do RPC failover work in updateBlockInfos.
+                 */
+                updateBlockInfos();
+
+                /*
+                 * We already have the up-to-date block information,
+                 * Check if we reach the end of file.
+                 */
+                if (cursor >= getFileLength()) {
+                    THROW(HdfsEndOfStream,
+                          "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %d, from file: %s",
+                          cursor, size, path.c_str());
+                }
+            }
+
+            /*
+             * If we reach the end of block or the block information has just updated,
+             * seek to the right block to read.
+             */
+            if (cursor >= endOfCurBlock) {
+                lb = lbs->findBlock(cursor);
+
+                if (!lb) {
+                    THROW(HdfsIOException,
+                          "InputStreamImpl: cannot find block information at position: %" PRId64 " for file: %s",
+                          cursor, path.c_str());
+                }
+
+                /*
+                 * Seek to the right block, setup all needed variable,
+                 * but do not setup block reader, setup it latter.
+                 */
+                seekToBlock(*lb);
+            }
+
+            int32_t retval = readOneBlock(buf, size, updateMetadataOnFailure > 0);
+
+            /*
+             * Now we have tried all replicas and failed.
+             * We will update metadata once and try again.
+             */
+            if (retval < 0) {
+                lbs.reset();
+                endOfCurBlock = 0;
+                --updateMetadataOnFailure;
+
+                try {
+                    sleep_for(seconds(1));
+                } catch (...) {
+                }
+
+                continue;
+            }
+
+            return retval;
+        } while (true);
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsEndOfStream & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        /*
+         * wrap the underlying error and rethrow.
+         */
+        NESTED_THROW(HdfsIOException,
+                     "InputStreamImpl: cannot read file: %s, from position %" PRId64 ", size: %d.",
+                     path.c_str(), cursor, size);
+    }
+}
+
+/**
+ * To read data from hdfs, block until get the given size of bytes.
+ * @param buf the buffer used to filled.
+ * @param size the number of bytes to be read.
+ */
+void InputStreamImpl::readFully(char * buf, int64_t size) {
+    LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, path.c_str(), size, cursor);
+    checkStatus();
+
+    try {
+        return readFullyInternal(buf, size);
+    } catch (const HdfsEndOfStream & e) {
+        throw;
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+void InputStreamImpl::readFullyInternal(char * buf, int64_t size) {
+    int32_t done;
+    int64_t pos = cursor, todo = size;
+
+    try {
+        while (todo > 0) {
+            done = todo < std::numeric_limits<int32_t>::max() ?
+                   static_cast<int32_t>(todo) :
+                   std::numeric_limits<int32_t>::max();
+            done = readInternal(buf + (size - todo), done);
+            todo -= done;
+        }
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsEndOfStream & e) {
+        THROW(HdfsEndOfStream,
+              "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %" PRId64 ", from file: %s",
+              pos, size, path.c_str());
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "InputStreamImpl: cannot read fully from file: %s, from position %" PRId64 ", size: %" PRId64 ".",
+                     path.c_str(), pos, size);
+    }
+}
+
+int64_t InputStreamImpl::available() {
+    checkStatus();
+
+    try {
+        if (blockReader) {
+            return blockReader->available();
+        }
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+
+    return 0;
+}
+
+/**
+ * To move the file point to the given position.
+ * @param size the given position.
+ */
+void InputStreamImpl::seek(int64_t pos) {
+    LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, path.c_str(), pos, cursor);
+    checkStatus();
+
+    try {
+        seekInternal(pos);
+    } catch (...) {
+        lastError = current_exception();
+        throw;
+    }
+}
+
+void InputStreamImpl::seekInternal(int64_t pos) {
+    if (cursor == pos) {
+        return;
+    }
+
+    if (!lbs || pos > getFileLength()) {
+        updateBlockInfos();
+
+        if (pos > getFileLength()) {
+            THROW(HdfsEndOfStream,
+                  "InputStreamImpl: seek over EOF, current position: %" PRId64 ", seek target: %" PRId64 ", in file: %s",
+                  cursor, pos, path.c_str());
+        }
+    }
+
+    try {
+        if (blockReader && pos > cursor && pos < endOfCurBlock) {
+            blockReader->skip(pos - cursor);
+            cursor = pos;
+            return;
+        }
+    } catch (const HdfsIOException & e) {
+        std::string buffer;
+        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
+            pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
+        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
+    } catch (const ChecksumException & e) {
+        std::string buffer;
+        LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s",
+            pos - cursor, path.c_str(), GetExceptionDetail(e, buffer));
+        LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str());
+    }
+
+    /**
+     * the seek target exceed the current block or skip failed in current block reader.
+     * reset current block reader and set the cursor to the target position to seek.
+     */
+    endOfCurBlock = 0;
+    blockReader.reset();
+    cursor = pos;
+}
+
+/**
+ * To get the current file point position.
+ * @return the position of current file point.
+ */
+int64_t InputStreamImpl::tell() {
+    checkStatus();
+    LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor);
+    return cursor;
+}
+
+/**
+ * Close the stream.
+ */
+void InputStreamImpl::close() {
+    LOG(DEBUG2, "%p close file %s for read", this, path.c_str());
+    closed = true;
+    localRead = true;
+    readFromUnderConstructedBlock = false;
+    verify = true;
+    filesystem.reset();
+    cursor = 0;
+    endOfCurBlock = 0;
+    lastBlockBeingWrittenLength = 0;
+    prefetchSize = 0;
+    blockReader.reset();
+    curBlock.reset();
+    lbs.reset();
+    conf.reset();
+    failedNodes.clear();
+    path.clear();
+    localReaderBuffer.resize(0);
+    lastError = exception_ptr();
+}
+
+std::string InputStreamImpl::toString() {
+    if (path.empty()) {
+        return std::string("InputStream for path ") + path;
+    } else {
+        return std::string("InputStream (not opened)");
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamImpl.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/InputStreamImpl.h b/depends/libhdfs3/src/client/InputStreamImpl.h
new file mode 100644
index 0000000..c4128f6
--- /dev/null
+++ b/depends/libhdfs3/src/client/InputStreamImpl.h
@@ -0,0 +1,160 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
+#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_
+
+#include "platform.h"
+
+#include "BlockReader.h"
+#include "ExceptionInternal.h"
+#include "FileSystem.h"
+#include "Hash.h"
+#include "InputStreamInter.h"
+#include "Memory.h"
+#include "PeerCache.h"
+#include "rpc/RpcAuth.h"
+#include "server/Datanode.h"
+#include "server/LocatedBlock.h"
+#include "server/LocatedBlocks.h"
+#include "SessionConfig.h"
+#include "Unordered.h"
+
+#ifdef MOCK
+#include "TestDatanodeStub.h"
+#endif
+
+namespace Hdfs {
+namespace Internal {
+
+/**
+ * A input stream used read data from hdfs.
+ */
+class InputStreamImpl: public InputStreamInter {
+public:
+    InputStreamImpl();
+    ~InputStreamImpl();
+
+    /**
+     * Open a file to read
+     * @param fs hdfs file system.
+     * @param path the file to be read.
+     * @param verifyChecksum verify the checksum.
+     */
+    void open(shared_ptr<FileSystemInter> fs, const char * path, bool verifyChecksum);
+
+    /**
+     * To read data from hdfs.
+     * @param buf the buffer used to filled.
+     * @param size buffer size.
+     * @return return the number of bytes filled in the buffer, it may less than size.
+     */
+    int32_t read(char * buf, int32_t size);
+
+    /**
+     * To read data from hdfs, block until get the given size of bytes.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     */
+    void readFully(char * buf, int64_t size);
+
+    int64_t available();
+
+    /**
+     * To move the file point to the given position.
+     * @param pos the given position.
+     */
+    void seek(int64_t pos);
+
+    /**
+     * To get the current file point position.
+     * @return the position of current file point.
+     */
+    int64_t tell();
+
+    /**
+     * Close the stream.
+     */
+    void close();
+
+    /**
+     * Convert to a printable string
+     *
+     * @return return a printable string
+     */
+    std::string toString();
+
+private:
+    bool choseBestNode();
+    bool isLocalNode();
+    int32_t readInternal(char * buf, int32_t size);
+    int32_t readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure);
+    int64_t getFileLength();
+    int64_t readBlockLength(const LocatedBlock & b);
+    void checkStatus();
+    void openInternal(shared_ptr<FileSystemInter> fs, const char * path,
+                      bool verifyChecksum);
+    void readFullyInternal(char * buf, int64_t size);
+    void seekInternal(int64_t pos);
+    void seekToBlock(const LocatedBlock & lb);
+    void setupBlockReader(bool temporaryDisableLocalRead);
+    void updateBlockInfos();
+
+private:
+    bool closed;
+    bool localRead;
+    bool readFromUnderConstructedBlock;
+    bool verify;
+    DatanodeInfo curNode;
+    exception_ptr lastError;
+    FileStatus fileInfo;
+    int maxGetBlockInfoRetry;
+    int64_t cursor;
+    int64_t endOfCurBlock;
+    int64_t lastBlockBeingWrittenLength;
+    int64_t prefetchSize;
+    PeerCache *peerCache;
+    RpcAuth auth;
+    shared_ptr<BlockReader> blockReader;
+    shared_ptr<FileSystemInter> filesystem;
+    shared_ptr<LocatedBlock> curBlock;
+    shared_ptr<LocatedBlocks> lbs;
+    shared_ptr<SessionConfig> conf;
+    std::string path;
+    std::vector<DatanodeInfo> failedNodes;
+    std::vector<char> localReaderBuffer;
+
+#ifdef MOCK
+private:
+    Hdfs::Mock::TestDatanodeStub * stub;
+#endif
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMIMPL_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/InputStreamInter.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/InputStreamInter.h b/depends/libhdfs3/src/client/InputStreamInter.h
new file mode 100644
index 0000000..470f8d2
--- /dev/null
+++ b/depends/libhdfs3/src/client/InputStreamInter.h
@@ -0,0 +1,104 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_
+#define _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_
+
+#include <Memory.h>
+
+#include <string>
+
+namespace Hdfs {
+namespace Internal {
+
+class FileSystemInter;
+
+/**
+ * A input stream used read data from hdfs.
+ */
+class InputStreamInter {
+public:
+
+    virtual ~InputStreamInter() {
+    }
+
+    /**
+     * Open a file to read
+     * @param fs hdfs file system.
+     * @param path the file to be read.
+     * @param verifyChecksum verify the checksum.
+     */
+    virtual void open(shared_ptr<FileSystemInter> fs, const char * path,
+                      bool verifyChecksum) = 0;
+
+    /**
+     * To read data from hdfs.
+     * @param buf the buffer used to filled.
+     * @param size buffer size.
+     * @return return the number of bytes filled in the buffer, it may less than size.
+     */
+    virtual int32_t read(char * buf, int32_t size) = 0;
+
+    /**
+     * To read data from hdfs, block until get the given size of bytes.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     */
+    virtual void readFully(char * buf, int64_t size) = 0;
+
+    /**
+     * Get how many bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking.
+     */
+    virtual int64_t available() = 0;
+
+    /**
+     * To move the file point to the given position.
+     * @param pos the given position.
+     */
+    virtual void seek(int64_t pos) = 0;
+
+    /**
+     * To get the current file point position.
+     * @return the position of current file point.
+     */
+    virtual int64_t tell() = 0;
+
+    /**
+     * Close the stream.
+     */
+    virtual void close() = 0;
+
+    /**
+     * Output a readable string of this input stream.
+     */
+    virtual std::string toString() = 0;
+};
+
+}
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_INPUTSTREAMINTER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/KerberosName.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/KerberosName.cpp b/depends/libhdfs3/src/client/KerberosName.cpp
new file mode 100644
index 0000000..1890b88
--- /dev/null
+++ b/depends/libhdfs3/src/client/KerberosName.cpp
@@ -0,0 +1,121 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "KerberosName.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+#include <regex.h>
+#include <string.h>
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+static void HandleRegError(int rc, regex_t * comp) {
+    std::vector<char> buffer;
+    size_t size = regerror(rc, comp, NULL, 0);
+    buffer.resize(size + 1);
+    regerror(rc, comp, &buffer[0], buffer.size());
+    THROW(HdfsIOException, "KerberosName: Failed to parse Kerberos principal.");
+}
+
+KerberosName::KerberosName() {
+}
+
+KerberosName::KerberosName(const std::string & principal) {
+    parse(principal);
+}
+
+void KerberosName::parse(const std::string & principal) {
+    int rc;
+    static const char * pattern = "([^/@]*)(/([^/@]*))?@([^/@]*)";
+    regex_t comp;
+    regmatch_t pmatch[5];
+
+    if (principal.empty()) {
+        return;
+    }
+
+    memset(&comp, 0, sizeof(regex_t));
+    rc = regcomp(&comp, pattern, REG_EXTENDED);
+
+    if (rc) {
+        HandleRegError(rc, &comp);
+    }
+
+    try {
+        memset(pmatch, 0, sizeof(pmatch));
+        rc = regexec(&comp, principal.c_str(),
+                     sizeof(pmatch) / sizeof(pmatch[1]), pmatch, 0);
+
+        if (rc && rc != REG_NOMATCH) {
+            HandleRegError(rc, &comp);
+        }
+
+        if (rc == REG_NOMATCH) {
+            if (principal.find('@') != principal.npos) {
+                THROW(HdfsIOException,
+                      "KerberosName: Malformed Kerberos name: %s",
+                      principal.c_str());
+            } else {
+                name = principal;
+            }
+        } else {
+            if (pmatch[1].rm_so != -1) {
+                name = principal.substr(pmatch[1].rm_so,
+                                        pmatch[1].rm_eo - pmatch[1].rm_so);
+            }
+
+            if (pmatch[3].rm_so != -1) {
+                host = principal.substr(pmatch[3].rm_so,
+                                        pmatch[3].rm_eo - pmatch[3].rm_so);
+            }
+
+            if (pmatch[4].rm_so != -1) {
+                realm = principal.substr(pmatch[4].rm_so,
+                                         pmatch[4].rm_eo - pmatch[4].rm_so);
+            }
+        }
+    } catch (...) {
+        regfree(&comp);
+        throw;
+    }
+
+    regfree(&comp);
+}
+
+size_t KerberosName::hash_value() const {
+    size_t values[] = { StringHasher(name), StringHasher(host), StringHasher(
+                            realm)
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/KerberosName.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/KerberosName.h b/depends/libhdfs3/src/client/KerberosName.h
new file mode 100644
index 0000000..cbae1eb
--- /dev/null
+++ b/depends/libhdfs3/src/client/KerberosName.h
@@ -0,0 +1,104 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_
+#define _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_
+
+#include <string>
+#include <sstream>
+
+#include "Hash.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class KerberosName {
+public:
+    KerberosName();
+    KerberosName(const std::string & principal);
+
+    std::string getPrincipal() const {
+        std::stringstream ss;
+        ss.imbue(std::locale::classic());
+        ss << name;
+
+        if (!host.empty()) {
+            ss << "/" << host;
+        }
+
+        if (!realm.empty()) {
+            ss << '@' << realm;
+        }
+
+        return ss.str();
+    }
+
+    const std::string & getHost() const {
+        return host;
+    }
+
+    void setHost(const std::string & host) {
+        this->host = host;
+    }
+
+    const std::string & getName() const {
+        return name;
+    }
+
+    void setName(const std::string & name) {
+        this->name = name;
+    }
+
+    const std::string & getRealm() const {
+        return realm;
+    }
+
+    void setRealm(const std::string & realm) {
+        this->realm = realm;
+    }
+
+    size_t hash_value() const;
+
+    bool operator ==(const KerberosName & other) const {
+        return name == other.name && host == other.host && realm == other.realm;
+    }
+
+private:
+    void parse(const std::string & principal);
+
+private:
+    std::string name;
+    std::string host;
+    std::string realm;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::KerberosName);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_KERBEROSNAME_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LeaseRenewer.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/LeaseRenewer.cpp b/depends/libhdfs3/src/client/LeaseRenewer.cpp
new file mode 100644
index 0000000..2d4523c
--- /dev/null
+++ b/depends/libhdfs3/src/client/LeaseRenewer.cpp
@@ -0,0 +1,167 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemInter.h"
+#include "LeaseRenewer.h"
+#include "Logger.h"
+
+#define DEFAULT_LEASE_RENEW_INTERVAL (60 * 1000)
+
+namespace Hdfs {
+namespace Internal {
+
+once_flag LeaseRenewer::once;
+shared_ptr<LeaseRenewer> LeaseRenewer::renewer;
+
+LeaseRenewer & LeaseRenewer::GetLeaseRenewer() {
+    call_once(once, &LeaseRenewer::CreateSinglten);
+    assert(renewer);
+    return *renewer;
+}
+
+void LeaseRenewer::CreateSinglten() {
+    renewer = shared_ptr < LeaseRenewer > (new LeaseRenewerImpl());
+}
+
+LeaseRenewerImpl::LeaseRenewerImpl() :
+    stop(true), interval(DEFAULT_LEASE_RENEW_INTERVAL) {
+}
+
+LeaseRenewerImpl::~LeaseRenewerImpl() {
+    stop = true;
+    cond.notify_all();
+
+    if (worker.joinable()) {
+        worker.join();
+    }
+}
+
+int LeaseRenewerImpl::getInterval() const {
+    return interval;
+}
+
+void LeaseRenewerImpl::setInterval(int interval) {
+    this->interval = interval;
+}
+
+void LeaseRenewerImpl::StartRenew(shared_ptr<FileSystemInter> filesystem) {
+    lock_guard<mutex> lock(mut);
+    const char * clientName = filesystem->getClientName();
+
+    if (maps.find(clientName) == maps.end()) {
+        maps[clientName] = filesystem;
+    }
+
+    filesystem->registerOpenedOutputStream();
+
+    if (stop && !maps.empty()) {
+        if (worker.joinable()) {
+            worker.join();
+        }
+
+        stop = false;
+        CREATE_THREAD(worker, bind(&LeaseRenewerImpl::renewer, this));
+    }
+}
+
+void LeaseRenewerImpl::StopRenew(shared_ptr<FileSystemInter> filesystem) {
+    lock_guard<mutex> lock(mut);
+    const char * clientName = filesystem->getClientName();
+
+    if (filesystem->unregisterOpenedOutputStream()
+            && maps.find(clientName) != maps.end()) {
+        maps.erase(clientName);
+    }
+}
+
+void LeaseRenewerImpl::renewer() {
+    assert(stop == false);
+
+    while (!stop) {
+        try {
+            unique_lock < mutex > lock(mut);
+            cond.wait_for(lock, milliseconds(interval));
+
+            if (stop || maps.empty()) {
+                break;
+            }
+
+            std::map<std::string, shared_ptr<FileSystemInter> >::iterator s, e, d;
+            e = maps.end();
+
+            for (s = maps.begin(); s != e;) {
+                shared_ptr<FileSystemInter> fs = s->second;
+
+                try {
+                    if (!fs->renewLease()) {
+                        d = s++;
+                        maps.erase(d);
+                    } else {
+                        ++s;
+                    }
+
+                    continue;
+                } catch (const HdfsException & e) {
+                    std::string buffer;
+                    LOG(LOG_ERROR,
+                        "Failed to renew lease for filesystem which client name is %s, since:\n%s",
+                        fs->getClientName(), GetExceptionDetail(e, buffer));
+                } catch (const std::exception & e) {
+                    LOG(LOG_ERROR,
+                        "Failed to renew lease for filesystem which client name is %s, since:\n%s",
+                        fs->getClientName(), e.what());
+                    break;
+                }
+
+                ++s;
+            }
+
+            if (maps.empty()) {
+                break;
+            }
+        } catch (const std::bad_alloc & e) {
+            /*
+             * keep quiet if we run out of memory, since writing log needs memory,
+             * that may cause the process terminated.
+             */
+            break;
+        } catch (const std::exception & e) {
+            LOG(LOG_ERROR,
+                "Lease renewer will exit since unexpected exception: %s",
+                e.what());
+            break;
+        }
+    }
+
+    stop = true;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LeaseRenewer.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/LeaseRenewer.h b/depends/libhdfs3/src/client/LeaseRenewer.h
new file mode 100644
index 0000000..84c02ff
--- /dev/null
+++ b/depends/libhdfs3/src/client/LeaseRenewer.h
@@ -0,0 +1,82 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_
+#define _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_
+
+#include <map>
+
+#include "Atomic.h"
+#include "Memory.h"
+#include "Thread.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class FileSystemInter;
+
+class LeaseRenewer {
+public:
+    virtual ~LeaseRenewer() {
+    }
+
+    virtual void StartRenew(shared_ptr<FileSystemInter> filesystem) = 0;
+    virtual void StopRenew(shared_ptr<FileSystemInter> filesystem) = 0;
+
+public:
+    static LeaseRenewer & GetLeaseRenewer();
+    static void CreateSinglten();
+
+private:
+    static once_flag once;
+    static shared_ptr<LeaseRenewer> renewer;
+};
+
+class LeaseRenewerImpl: public LeaseRenewer {
+public:
+    LeaseRenewerImpl();
+    ~LeaseRenewerImpl();
+    int getInterval() const;
+    void setInterval(int interval);
+    void StartRenew(shared_ptr<FileSystemInter> filesystem);
+    void StopRenew(shared_ptr<FileSystemInter> filesystem);
+
+private:
+    void renewer();
+
+private:
+    atomic<bool> stop;
+    condition_variable cond;
+    int interval;
+    mutex mut;
+    std::map<std::string, shared_ptr<FileSystemInter> > maps;
+    thread worker;
+};
+
+}
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_LEASE_RENEW_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LocalBlockReader.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/LocalBlockReader.cpp b/depends/libhdfs3/src/client/LocalBlockReader.cpp
new file mode 100644
index 0000000..b0cb8a4
--- /dev/null
+++ b/depends/libhdfs3/src/client/LocalBlockReader.cpp
@@ -0,0 +1,288 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "BigEndian.h"
+#include "datatransfer.pb.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "HWCrc32c.h"
+#include "LocalBlockReader.h"
+#include "SWCrc32c.h"
+
+#include <inttypes.h>
+#include <limits>
+
+#define BMVERSION 1
+#define BMVERSION_SIZE 2
+
+#define HEADER_SIZE (BMVERSION_SIZE + CHECKSUM_TYPE_SIZE + CHECKSUM_BYTES_PER_CHECKSUM_SIZE)
+
+namespace Hdfs {
+namespace Internal {
+
+LocalBlockReader::LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info,
+                                   const ExtendedBlock& block, int64_t offset,
+                                   bool verify, SessionConfig& conf,
+                                   std::vector<char>& buffer)
+    : verify(verify),
+      pbuffer(NULL),
+      pMetaBuffer(NULL),
+      block(block),
+      checksumSize(0),
+      chunkSize(0),
+      position(0),
+      size(0),
+      cursor(0),
+      length(block.getNumBytes()),
+      info(info),
+      buffer(buffer) {
+    try {
+        metaFd = info->getMetaFile();
+        dataFd = info->getDataFile();
+
+        std::vector<char> header;
+        pMetaBuffer = metaFd->read(header, HEADER_SIZE);
+        int16_t version = ReadBigEndian16FromArray(&pMetaBuffer[0]);
+
+        if (BMVERSION != version) {
+            THROW(HdfsIOException,
+                  "LocalBlockReader get an unmatched block, expected block version %d, real version is %d",
+                  BMVERSION, static_cast<int>(version));
+        }
+
+        switch (pMetaBuffer[BMVERSION_SIZE]) {
+        case ChecksumTypeProto::CHECKSUM_NULL:
+            this->verify = false;
+            checksumSize = 0;
+            metaFd.reset();
+            break;
+
+        case ChecksumTypeProto::CHECKSUM_CRC32:
+            THROW(HdfsIOException,
+                  "LocalBlockReader does not support CRC32 checksum.");
+            break;
+
+        case ChecksumTypeProto::CHECKSUM_CRC32C:
+            if (HWCrc32c::available()) {
+                checksum = shared_ptr<Checksum>(new HWCrc32c());
+            } else {
+                checksum = shared_ptr<Checksum>(new SWCrc32c());
+            }
+
+            chunkSize = ReadBigEndian32FromArray(
+                            &pMetaBuffer[BMVERSION_SIZE + CHECKSUM_TYPE_SIZE]);
+            checksumSize = sizeof(int32_t);
+            break;
+
+        default:
+            THROW(HdfsIOException,
+                  "LocalBlockReader cannot recognize checksum type: %d.",
+                  static_cast<int>(pMetaBuffer[BMVERSION_SIZE]));
+        }
+
+        if (verify && chunkSize <= 0) {
+            THROW(HdfsIOException,
+                  "LocalBlockReader get an invalid checksum parameter, bytes per check: %d.",
+                  chunkSize);
+        }
+
+        localBufferSize = conf.getLocalReadBufferSize();
+
+        if (verify) {
+            localBufferSize = (localBufferSize + chunkSize - 1) / chunkSize * chunkSize;
+        }
+
+        if (offset > 0) {
+            skip(offset);
+        }
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        NESTED_THROW(HdfsIOException,
+                     "Failed to construct LocalBlockReader for block: %s.",
+                     block.toString().c_str());
+    }
+}
+
+LocalBlockReader::~LocalBlockReader() {
+}
+
+void LocalBlockReader::readAndVerify(int32_t bufferSize) {
+    assert(true == verify);
+    assert(cursor % chunkSize == 0);
+    int chunks = (bufferSize + chunkSize - 1) / chunkSize;
+    pbuffer = dataFd->read(buffer, bufferSize);
+    pMetaBuffer = metaFd->read(metaBuffer, chunks * checksumSize);
+
+    for (int i = 0; i < chunks; ++i) {
+        checksum->reset();
+        int chunk = chunkSize;
+
+        if (chunkSize * (i + 1) > bufferSize) {
+            chunk = bufferSize % chunkSize;
+        }
+
+        checksum->update(&pbuffer[i * chunkSize], chunk);
+        uint32_t target = ReadBigEndian32FromArray(
+                              &pMetaBuffer[i * checksumSize]);
+
+        if (target != checksum->getValue()) {
+            THROW(ChecksumException,
+                  "LocalBlockReader checksum not match for block: %s",
+                  block.toString().c_str());
+        }
+    }
+}
+
+int32_t LocalBlockReader::readInternal(char * buf, int32_t len) {
+    int32_t todo = len;
+
+    /*
+     * read from buffer.
+     */
+    if (position < size) {
+        todo = todo < size - position ? todo : size - position;
+        memcpy(buf, &pbuffer[position], todo);
+        position += todo;
+        cursor += todo;
+        return todo;
+    }
+
+    /*
+     * end of block
+     */
+    todo = todo < length - cursor ? todo : length - cursor;
+
+    if (0 == todo) {
+        return 0;
+    }
+
+    /*
+     * bypass the buffer
+     */
+    if (!verify
+            && (todo > localBufferSize || todo == length - cursor)) {
+        dataFd->copy(buf, todo);
+        cursor += todo;
+        return todo;
+    }
+
+    /*
+     * fill buffer.
+     */
+    int bufferSize = localBufferSize;
+    bufferSize = bufferSize < length - cursor ? bufferSize : length - cursor;
+    assert(bufferSize > 0);
+
+    if (verify) {
+        readAndVerify(bufferSize);
+    } else {
+        pbuffer = dataFd->read(buffer, bufferSize);
+    }
+
+    position = 0;
+    size = bufferSize;
+    assert(position < size);
+    return readInternal(buf, todo);
+}
+
+int32_t LocalBlockReader::read(char * buf, int32_t size) {
+    try {
+        return readInternal(buf, size);
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        info->setValid(false);
+        NESTED_THROW(HdfsIOException,
+                     "LocalBlockReader failed to read from position: %" PRId64 ", length: %d, block: %s.",
+                     cursor, size, block.toString().c_str());
+    }
+
+    assert(!"cannot reach here");
+    return 0;
+}
+
+void LocalBlockReader::skip(int64_t len) {
+    assert(len < length - cursor);
+
+    try {
+        int64_t todo = len;
+
+        while (todo > 0) {
+            /*
+             * skip the data in buffer.
+             */
+            if (size - position > 0) {
+                int batch = todo < size - position ? todo : size - position;
+                position += batch;
+                todo -= batch;
+                cursor += batch;
+                continue;
+            }
+
+            if (verify) {
+                int64_t lastChunkSize = (cursor + todo) % chunkSize;
+                cursor = (cursor + todo) / chunkSize * chunkSize;
+                int64_t metaCursor = HEADER_SIZE
+                                     + checksumSize * (cursor / chunkSize);
+                metaFd->seek(metaCursor);
+                todo = lastChunkSize;
+            } else {
+                cursor += todo;
+                todo = 0;
+            }
+
+            if (cursor > 0) {
+                dataFd->seek(cursor);
+            }
+
+            /*
+             * fill buffer again and verify checksum
+             */
+            if (todo > 0) {
+                assert(true == verify);
+                int bufferSize = localBufferSize;
+                bufferSize =
+                    bufferSize < length - cursor ?
+                    bufferSize : length - cursor;
+                readAndVerify(bufferSize);
+                position = 0;
+                size = bufferSize;
+            }
+        }
+    } catch (const HdfsCanceled & e) {
+        throw;
+    } catch (const HdfsException & e) {
+        info->setValid(false);
+        NESTED_THROW(HdfsIOException,
+                     "LocalBlockReader failed to skip from position: %" PRId64 ", length: %d, block: %s.",
+                     cursor, size, block.toString().c_str());
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/LocalBlockReader.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/LocalBlockReader.h b/depends/libhdfs3/src/client/LocalBlockReader.h
new file mode 100644
index 0000000..facdb4c
--- /dev/null
+++ b/depends/libhdfs3/src/client/LocalBlockReader.h
@@ -0,0 +1,105 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_
+#define _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_
+
+#include "BlockReader.h"
+#include "Checksum.h"
+#include "FileWrapper.h"
+#include "Memory.h"
+#include "ReadShortCircuitInfo.h"
+#include "SessionConfig.h"
+
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+class LocalBlockReader: public BlockReader {
+public:
+    LocalBlockReader(const shared_ptr<ReadShortCircuitInfo>& info,
+                     const ExtendedBlock & block, int64_t offset, bool verify,
+                     SessionConfig & conf, std::vector<char> & buffer);
+
+    ~LocalBlockReader();
+
+    /**
+     * Get how many bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking.
+     */
+    virtual int64_t available() {
+        return length - cursor;
+    }
+
+    /**
+     * To read data from block.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     * @return return the number of bytes filled in the buffer,
+     *  it may less than size. Return 0 if reach the end of block.
+     */
+    virtual int32_t read(char * buf, int32_t size);
+
+    /**
+     * Move the cursor forward len bytes.
+     * @param len The number of bytes to skip.
+     */
+    virtual void skip(int64_t len);
+
+private:
+    /**
+     * Fill buffer and verify checksum.
+     * @param bufferSize The size of buffer.
+     */
+    void readAndVerify(int32_t bufferSize);
+    int32_t readInternal(char * buf, int32_t len);
+
+private:
+    bool verify; //verify checksum or not.
+    const char * pbuffer;
+    const char * pMetaBuffer;
+    const ExtendedBlock & block;
+    int checksumSize;
+    int chunkSize;
+    int localBufferSize;
+    int position; //point in buffer.
+    int size;  //data size in buffer.
+    int64_t cursor; //point in block.
+    int64_t length; //data size of block.
+    shared_ptr<Checksum> checksum;
+    shared_ptr<FileWrapper> dataFd;
+    shared_ptr<FileWrapper> metaFd;
+    shared_ptr<ReadShortCircuitInfo> info;
+    std::vector<char> & buffer;
+    std::vector<char> metaBuffer;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_LOCALBLOCKREADER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStream.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStream.cpp b/depends/libhdfs3/src/client/OutputStream.cpp
new file mode 100644
index 0000000..227ba71
--- /dev/null
+++ b/depends/libhdfs3/src/client/OutputStream.cpp
@@ -0,0 +1,96 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Atomic.h"
+#include "FileSystemImpl.h"
+#include "Memory.h"
+#include "OutputStream.h"
+#include "OutputStreamImpl.h"
+
+using namespace Hdfs::Internal;
+
+namespace Hdfs {
+
+OutputStream::OutputStream() {
+    impl = new Internal::OutputStreamImpl;
+}
+
+OutputStream::~OutputStream() {
+    delete impl;
+}
+
+void OutputStream::open(FileSystem & fs, const char * path, int flag,
+                        const Permission permission, bool createParent, int replication,
+                        int64_t blockSize) {
+    if (!fs.impl) {
+        THROW(HdfsIOException, "FileSystem: not connected.");
+    }
+
+    impl->open(fs.impl->filesystem, path, flag, permission, createParent, replication,
+               blockSize);
+}
+
+/**
+ * To append data to file.
+ * @param buf the data used to append.
+ * @param size the data size.
+ */
+void OutputStream::append(const char * buf, int64_t size) {
+    impl->append(buf, size);
+}
+
+/**
+ * Flush all data in buffer and waiting for ack.
+ * Will block until get all acks.
+ */
+void OutputStream::flush() {
+    impl->flush();
+}
+
+/**
+ * return the current file length.
+ * @return current file length.
+ */
+int64_t OutputStream::tell() {
+    return impl->tell();
+}
+
+/**
+ * the same as flush right now.
+ */
+void OutputStream::sync() {
+    impl->sync();
+}
+
+/**
+ * close the stream.
+ */
+void OutputStream::close() {
+    impl->close();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStream.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStream.h b/depends/libhdfs3/src/client/OutputStream.h
new file mode 100644
index 0000000..14ae6a7
--- /dev/null
+++ b/depends/libhdfs3/src/client/OutputStream.h
@@ -0,0 +1,132 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_
+#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_
+
+#include "FileSystem.h"
+
+namespace Hdfs {
+
+/**
+ * Use the CreateFlag as follows:
+ * <ol>
+ * <li> CREATE - to create a file if it does not exist,
+ * else throw FileAlreadyExists.</li>
+ * <li> APPEND - to append to a file if it exists,
+ * else throw FileNotFoundException.</li>
+ * <li> OVERWRITE - to truncate a file if it exists,
+ * else throw FileNotFoundException.</li>
+ * <li> CREATE|APPEND - to create a file if it does not exist,
+ * else append to an existing file.</li>
+ * <li> CREATE|OVERWRITE - to create a file if it does not exist,
+ * else overwrite an existing file.</li>
+ * <li> SyncBlock - to force closed blocks to the disk device.
+ * In addition {@link OutputStream::sync()} should be called after each write,
+ * if true synchronous behavior is required.</li>
+ * </ol>
+ *
+ * Following combination is not valid and will result in
+ * {@link InvalidParameter}:
+ * <ol>
+ * <li> APPEND|OVERWRITE</li>
+ * <li> CREATE|APPEND|OVERWRITE</li>
+ * </ol>
+ */
+enum CreateFlag {
+    Create = 0x01, Overwrite = 0x02, Append = 0x04, SyncBlock = 0x08
+};
+
+namespace Internal {
+class OutputStreamInter;
+}
+
+/**
+ * A output stream used to write data to hdfs.
+ */
+class OutputStream {
+public:
+    /**
+     * Construct a new OutputStream.
+     */
+    OutputStream();
+    /**
+     * Destroy a OutputStream instance.
+     */
+    ~OutputStream();
+
+    /**
+     * To create or append a file.
+     * @param fs hdfs file system.
+     * @param path the file path.
+     * @param flag creation flag, can be Create, Append or Create|Overwrite.
+     * @param permission create a new file with given permission.
+     * @param createParent if the parent does not exist, create it.
+     * @param replication create a file with given number of replication.
+     * @param blockSize  create a file with given block size.
+     */
+    void open(FileSystem & fs, const char * path, int flag = Create,
+              const Permission permission = Permission(0644), bool createParent =
+                  false, int replication = 0, int64_t blockSize = 0);
+
+    /**
+     * To append data to file.
+     * @param buf the data used to append.
+     * @param size the data size.
+     */
+    void append(const char * buf, int64_t size);
+
+    /**
+     * Flush all data in buffer and waiting for ack.
+     * Will block until get all acks.
+     */
+    void flush();
+
+    /**
+     * return the current file length.
+     * @return current file length.
+     */
+    int64_t tell();
+
+    /**
+     * the same as flush right now.
+     */
+    void sync();
+
+    /**
+     * close the stream.
+     */
+    void close();
+
+private:
+    Internal::OutputStreamInter * impl;
+
+};
+
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAM_H_ */


Mime
View raw message