hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhuvnesh2...@apache.org
Subject [08/48] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified
Date Mon, 04 Apr 2016 05:09:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/server/NamenodeProxy.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/server/NamenodeProxy.cpp b/depends/libhdfs3/src/server/NamenodeProxy.cpp
new file mode 100644
index 0000000..d66bbd9
--- /dev/null
+++ b/depends/libhdfs3/src/server/NamenodeProxy.cpp
@@ -0,0 +1,534 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Logger.h"
+#include "NamenodeImpl.h"
+#include "NamenodeProxy.h"
+#include "StringUtil.h"
+
+#include <string>
+
+#include <sys/fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+
+namespace Hdfs {
+namespace Internal {
+
+static uint32_t GetInitNamenodeIndex(const std::string id) {
+    std::string path = "/tmp/";
+    path += id;
+    int fd;
+    uint32_t index = 0;
+    /*
+     * try create the file
+     */
+    fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666);
+
+    if (fd < 0) {
+        if (errno == EEXIST) {
+            /*
+             * the file already exist, try to open it
+             */
+            fd = open(path.c_str(), O_RDONLY);
+        } else {
+            /*
+             * failed to create, do not care why
+             */
+            return 0;
+        }
+    } else {
+        if (0 != flock(fd, LOCK_EX)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return index;
+        }
+
+        /*
+         * created file, initialize it with 0
+         */
+        if (write(fd, &index, sizeof(index)) < 0) {
+          LOG(WARNING,
+              "NamenodeProxy: Failed to write current Namenode index into "
+              "cache file.");
+            /*
+             * ignore the failure.
+             */
+        }
+
+        flock(fd, LOCK_UN);
+        close(fd);
+        return index;
+    }
+
+    /*
+     * the file exist, read it.
+     */
+    if (fd >= 0) {
+        if (0 != flock(fd, LOCK_SH)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return index;
+        }
+
+        if (sizeof(index) != read(fd, &index, sizeof(index))) {
+            /*
+             * failed to read, do not care why
+             */
+            index = 0;
+        }
+
+        flock(fd, LOCK_UN);
+        close(fd);
+    }
+
+    return index;
+}
+
+static void SetInitNamenodeIndex(const std::string & id, uint32_t index) {
+    std::string path = "/tmp/";
+    path += id;
+    int fd;
+    /*
+     * try open the file for write
+     */
+    fd = open(path.c_str(), O_WRONLY);
+
+    if (fd > 0) {
+        if (0 != flock(fd, LOCK_EX)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return;
+        }
+
+        if (write(fd, &index, sizeof(index)) < 0) {
+            LOG(WARNING,
+                "NamenodeProxy: Failed to write current Namenode index into "
+                "cache file.");
+            /*
+             * ignore the failure.
+             */
+        }
+        flock(fd, LOCK_UN);
+        close(fd);
+    }
+}
+
+NamenodeProxy::NamenodeProxy(const std::vector<NamenodeInfo> & namenodeInfos, const
std::string & tokenService,
+                             const SessionConfig & c, const RpcAuth & a) :
+    clusterid(tokenService), currentNamenode(0) {
+    if (namenodeInfos.size() == 1) {
+        enableNamenodeHA = false;
+        maxNamenodeHARetry = 0;
+    } else {
+        enableNamenodeHA = true;
+        maxNamenodeHARetry = c.getRpcMaxHaRetry();
+    }
+
+    for (size_t i = 0; i < namenodeInfos.size(); ++i) {
+        std::vector<std::string> nninfo = StringSplit(namenodeInfos[i].getRpcAddr(),
":");
+
+        if (nninfo.size() != 2) {
+            THROW(InvalidParameter, "Cannot create namenode proxy, %s does not contain host
or port",
+                  namenodeInfos[i].getRpcAddr().c_str());
+        }
+
+        namenodes.push_back(
+            shared_ptr<Namenode>(
+                new NamenodeImpl(nninfo[0].c_str(), nninfo[1].c_str(), clusterid, c, a)));
+    }
+
+    if (enableNamenodeHA) {
+        currentNamenode = GetInitNamenodeIndex(clusterid) % namenodeInfos.size();
+    }
+}
+
+NamenodeProxy::~NamenodeProxy() {
+}
+
+shared_ptr<Namenode> NamenodeProxy::getActiveNamenode(uint32_t & oldValue) {
+    lock_guard<mutex> lock(mut);
+
+    if (namenodes.empty()) {
+        THROW(HdfsFileSystemClosed, "NamenodeProxy is closed.");
+    }
+
+    oldValue = currentNamenode;
+    return namenodes[currentNamenode % namenodes.size()];
+}
+
+void NamenodeProxy::failoverToNextNamenode(uint32_t oldValue) {
+    lock_guard<mutex> lock(mut);
+
+    if (oldValue != currentNamenode) {
+        //already failover in another thread.
+        return;
+    }
+
+    ++currentNamenode;
+    currentNamenode = currentNamenode % namenodes.size();
+    SetInitNamenodeIndex(clusterid, currentNamenode);
+}
+
+static void HandleHdfsFailoverException(const HdfsFailoverException & e) {
+    try {
+        Hdfs::rethrow_if_nested(e);
+    } catch (...) {
+        NESTED_THROW(Hdfs::HdfsRpcException, "%s", e.what());
+    }
+
+    //should not reach here
+    abort();
+}
+
+#define NAMENODE_HA_RETRY_BEGIN() \
+    do { \
+        int __count = 0; \
+        do { \
+            uint32_t __oldValue = 0; \
+            shared_ptr<Namenode> namenode =  getActiveNamenode(__oldValue); \
+            try { \
+                (void)0
+
+#define NAMENODE_HA_RETRY_END() \
+    break; \
+    } catch (const NameNodeStandbyException & e) { \
+        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
+            throw; \
+        } \
+    } catch (const HdfsFailoverException & e) { \
+        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
+            HandleHdfsFailoverException(e); \
+        } \
+    } \
+    failoverToNextNamenode(__oldValue); \
+    LOG(WARNING, "NamenodeProxy: Failover to another Namenode."); \
+    } while (true); \
+    } while (0)
+
+void NamenodeProxy::getBlockLocations(const std::string & src, int64_t offset,
+                                      int64_t length, LocatedBlocks & lbs) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->getBlockLocations(src, offset, length, lbs);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::create(const std::string & src, const Permission & masked,
+                           const std::string & clientName, int flag, bool createParent,
+                           short replication, int64_t blockSize) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->create(src, masked, clientName, flag, createParent, replication, blockSize);
+    NAMENODE_HA_RETRY_END();
+}
+
+std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
+NamenodeProxy::append(const std::string& src, const std::string& clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->append(src, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >();
+}
+
+bool NamenodeProxy::setReplication(const std::string & src, short replication) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->setReplication(src, replication);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+void NamenodeProxy::setPermission(const std::string & src,
+                                  const Permission & permission) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setPermission(src, permission);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::setOwner(const std::string & src,
+                             const std::string & username, const std::string & groupname)
{
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setOwner(src, username, groupname);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::abandonBlock(const ExtendedBlock & b,
+                                 const std::string & src, const std::string & holder)
{
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->abandonBlock(b, src, holder);
+    NAMENODE_HA_RETRY_END();
+}
+
+shared_ptr<LocatedBlock> NamenodeProxy::addBlock(const std::string & src,
+        const std::string & clientName, const ExtendedBlock * previous,
+        const std::vector<DatanodeInfo> & excludeNodes) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->addBlock(src, clientName, previous, excludeNodes);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+shared_ptr<LocatedBlock> NamenodeProxy::getAdditionalDatanode(
+    const std::string & src, const ExtendedBlock & blk,
+    const std::vector<DatanodeInfo> & existings,
+    const std::vector<std::string> & storageIDs,
+    const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes,
+    const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getAdditionalDatanode(src, blk, existings,
+                                           storageIDs, excludes, numAdditionalNodes, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+bool NamenodeProxy::complete(const std::string & src,
+                             const std::string & clientName, const ExtendedBlock * last)
{
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->complete(src, clientName, last);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+/*void NamenodeProxy::reportBadBlocks(const std::vector<LocatedBlock> & blocks)
{
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->reportBadBlocks(blocks);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+bool NamenodeProxy::rename(const std::string & src, const std::string & dst) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->rename(src, dst);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+/*
+void NamenodeProxy::concat(const std::string & trg,
+                           const std::vector<std::string> & srcs) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->concat(trg, srcs);
+    NAMENODE_HA_RETRY_END();
+}
+*/
+
+bool NamenodeProxy::truncate(const std::string & src, int64_t size,
+                             const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->truncate(src, size, clientName);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::getLease(const std::string & src,
+                             const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->getLease(src, clientName);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::releaseLease(const std::string & src,
+                                 const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->releaseLease(src, clientName);
+    NAMENODE_HA_RETRY_END();
+}
+
+bool NamenodeProxy::deleteFile(const std::string & src, bool recursive) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->deleteFile(src, recursive);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+bool NamenodeProxy::mkdirs(const std::string & src, const Permission & masked,
+                           bool createParent) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->mkdirs(src, masked, createParent);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+bool NamenodeProxy::getListing(const std::string & src,
+                               const std::string & startAfter, bool needLocation,
+                               std::vector<FileStatus> & dl) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getListing(src, startAfter, needLocation, dl);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+void NamenodeProxy::renewLease(const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->renewLease(clientName);
+    NAMENODE_HA_RETRY_END();
+}
+
+/*bool NamenodeProxy::recoverLease(const std::string & src,
+                                 const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->recoverLease(src, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}*/
+
+std::vector<int64_t> NamenodeProxy::getFsStats() {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFsStats();
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return std::vector<int64_t>();
+}
+
+/*void NamenodeProxy::metaSave(const std::string & filename) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->metaSave(filename);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+FileStatus NamenodeProxy::getFileInfo(const std::string & src, bool *exist) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFileInfo(src, exist);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return FileStatus();
+}
+
+/*FileStatus NamenodeProxy::getFileLinkInfo(const std::string & src) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFileLinkInfo(src);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return FileStatus();
+}*/
+
+/*ContentSummary NamenodeProxy::getContentSummary(const std::string & path) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getContentSummary(path);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return ContentSummary();
+}*/
+
+/*void NamenodeProxy::setQuota(const std::string & path, int64_t namespaceQuota,
+                             int64_t diskspaceQuota) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setQuota(path, namespaceQuota, diskspaceQuota);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+void NamenodeProxy::fsync(const std::string & src, const std::string & client) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->fsync(src, client);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::setTimes(const std::string & src, int64_t mtime,
+                             int64_t atime) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setTimes(src, mtime, atime);
+    NAMENODE_HA_RETRY_END();
+}
+
+/*void NamenodeProxy::createSymlink(const std::string & target,
+                                  const std::string & link, const Permission & dirPerm,
+                                  bool createParent) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->createSymlink(target, link, dirPerm, createParent);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+/*std::string NamenodeProxy::getLinkTarget(const std::string & path) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getLinkTarget(path);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return "";
+}*/
+
+shared_ptr<LocatedBlock> NamenodeProxy::updateBlockForPipeline(
+    const ExtendedBlock & block, const std::string & clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->updateBlockForPipeline(block, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+void NamenodeProxy::updatePipeline(const std::string & clientName,
+                                   const ExtendedBlock & oldBlock, const ExtendedBlock
& newBlock,
+                                   const std::vector<DatanodeInfo> & newNodes,
+                                   const std::vector<std::string> & storageIDs)
{
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
+    NAMENODE_HA_RETRY_END();
+}
+
+Token NamenodeProxy::getDelegationToken(const std::string & renewer) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getDelegationToken(renewer);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return Token();
+}
+
+int64_t NamenodeProxy::renewDelegationToken(const Token & token) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->renewDelegationToken(token);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return 0;
+}
+
+void NamenodeProxy::cancelDelegationToken(const Token & token) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->cancelDelegationToken(token);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::close() {
+    lock_guard<mutex> lock(mut);
+    namenodes.clear();
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/server/NamenodeProxy.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/server/NamenodeProxy.h b/depends/libhdfs3/src/server/NamenodeProxy.h
new file mode 100644
index 0000000..3792602
--- /dev/null
+++ b/depends/libhdfs3/src/server/NamenodeProxy.h
@@ -0,0 +1,164 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_
+#define _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_
+
+#include "Memory.h"
+#include "Namenode.h"
+#include "NamenodeInfo.h"
+#include "Thread.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class NamenodeProxy: public Namenode {
+public:
+    NamenodeProxy(const std::vector<NamenodeInfo> & namenodeInfos, const std::string
& tokenService,
+                  const SessionConfig & c, const RpcAuth & a);
+    ~NamenodeProxy();
+
+public:
+
+    void getBlockLocations(const std::string & src, int64_t offset,
+                           int64_t length, LocatedBlocks & lbs);
+
+    void create(const std::string & src, const Permission & masked,
+                const std::string & clientName, int flag, bool createParent,
+                short replication, int64_t blockSize);
+
+    std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > append(
+        const std::string& src, const std::string& clientName);
+
+    bool setReplication(const std::string & src, short replication);
+
+    void setPermission(const std::string & src, const Permission & permission);
+
+    void setOwner(const std::string & src, const std::string & username,
+                  const std::string & groupname);
+
+    void abandonBlock(const ExtendedBlock & b, const std::string & src,
+                      const std::string & holder);
+
+    shared_ptr<LocatedBlock> addBlock(const std::string & src,
+                                      const std::string & clientName, const ExtendedBlock
* previous,
+                                      const std::vector<DatanodeInfo> & excludeNodes);
+
+    shared_ptr<LocatedBlock> getAdditionalDatanode(const std::string & src,
+            const ExtendedBlock & blk,
+            const std::vector<DatanodeInfo> & existings,
+            const std::vector<std::string> & storageIDs,
+            const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes,
+            const std::string & clientName);
+
+    bool complete(const std::string & src, const std::string & clientName,
+                  const ExtendedBlock * last);
+
+    void reportBadBlocks(const std::vector<LocatedBlock> & blocks);
+
+    bool rename(const std::string & src, const std::string & dst);
+
+    void concat(const std::string & trg, const std::vector<std::string> & srcs);
+
+    bool truncate(const std::string & src, int64_t size,
+                  const std::string & clientName);
+
+    void getLease(const std::string & src, const std::string & clientName);
+
+    void releaseLease(const std::string & src, const std::string & clientName);
+
+    /*void rename2(const std::string & src, const std::string & dst)
+     throw (AccessControlException, DSQuotaExceededException,
+     FileAlreadyExistsException, FileNotFoundException,
+     NSQuotaExceededException, ParentNotDirectoryException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) ;*/
+
+    bool deleteFile(const std::string & src, bool recursive);
+
+    bool mkdirs(const std::string & src, const Permission & masked,
+                bool createParent);
+
+    bool getListing(const std::string & src, const std::string & startAfter,
+                    bool needLocation, std::vector<FileStatus> & dl);
+
+    void renewLease(const std::string & clientName);
+
+    bool recoverLease(const std::string & src, const std::string & clientName);
+
+    std::vector<int64_t> getFsStats();
+
+    void metaSave(const std::string & filename);
+
+    FileStatus getFileInfo(const std::string & src, bool *exist);
+
+    FileStatus getFileLinkInfo(const std::string & src);
+
+    void setQuota(const std::string & path, int64_t namespaceQuota,
+                  int64_t diskspaceQuota);
+
+    void fsync(const std::string & src, const std::string & client);
+
+    void setTimes(const std::string & src, int64_t mtime, int64_t atime);
+
+    void createSymlink(const std::string & target, const std::string & link,
+                       const Permission & dirPerm, bool createParent);
+
+    std::string getLinkTarget(const std::string & path);
+
+    shared_ptr<LocatedBlock> updateBlockForPipeline(const ExtendedBlock & block,
+            const std::string & clientName);
+
+    void updatePipeline(const std::string & clientName,
+                        const ExtendedBlock & oldBlock, const ExtendedBlock & newBlock,
+                        const std::vector<DatanodeInfo> & newNodes,
+                        const std::vector<std::string> & storageIDs);
+
+    Token getDelegationToken(const std::string & renewer);
+
+    int64_t renewDelegationToken(const Token & token);
+
+    void cancelDelegationToken(const Token & token);
+
+    void close();
+
+private:
+    shared_ptr<Namenode> getActiveNamenode(uint32_t & oldValue);
+    void failoverToNextNamenode(uint32_t oldValue);
+
+private:
+    bool enableNamenodeHA;
+    int maxNamenodeHARetry;
+    mutex mut;
+    std::string clusterid;
+    std::vector<shared_ptr<Namenode> > namenodes;
+    uint32_t currentNamenode;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/server/RpcHelper.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/server/RpcHelper.h b/depends/libhdfs3/src/server/RpcHelper.h
new file mode 100644
index 0000000..267dd33
--- /dev/null
+++ b/depends/libhdfs3/src/server/RpcHelper.h
@@ -0,0 +1,293 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
+#define _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
+
+#include "client/FileStatus.h"
+#include "client/Permission.h"
+#include "ClientDatanodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "DatanodeInfo.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "ExtendedBlock.h"
+#include "LocatedBlock.h"
+#include "LocatedBlocks.h"
+#include "StackPrinter.h"
+
+#include <algorithm>
+#include <cassert>
+
+using namespace google::protobuf;
+
+namespace Hdfs {
+namespace Internal {
+
+class Nothing {
+};
+
+template < typename T1 = Nothing, typename T2 = Nothing, typename T3 = Nothing,
+         typename T4 = Nothing, typename T5 = Nothing, typename T6 = Nothing,
+         typename T7 = Nothing, typename T8 = Nothing, typename T9 = Nothing,
+         typename T10 = Nothing, typename T11 = Nothing  >
+class UnWrapper: public UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing>
{
+private:
+    typedef UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing> BaseType;
+
+public:
+    UnWrapper(const HdfsRpcServerException & e) :
+        BaseType(e), e(e) {
+    }
+
+    void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char * file,
+            int line) {
+        if (e.getErrClass() == T1::ReflexName) {
+#ifdef NEED_BOOST
+            boost::throw_exception(T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1,
STACK_DEPTH).c_str()));
+#else
+            throw T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1, STACK_DEPTH).c_str());
+#endif
+        } else {
+            BaseType::unwrap(file, line);
+        }
+    }
+private:
+    const HdfsRpcServerException & e;
+};
+
+template<>
+class UnWrapper < Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing,
+        Nothing, Nothing, Nothing, Nothing > {
+public:
+    UnWrapper(const HdfsRpcServerException & e) :
+        e(e) {
+    }
+    void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char * file,
+            int line) {
+        THROW(HdfsIOException,
+              "Unexpected exception: when unwrap the rpc remote exception \"%s\", %s in %s:
%d",
+              e.getErrClass().c_str(), e.getErrMsg().c_str(), file, line);
+    }
+private:
+    const HdfsRpcServerException & e;
+};
+
+static inline void Convert(ExtendedBlock & eb,
+                           const ExtendedBlockProto & proto) {
+    eb.setBlockId(proto.blockid());
+    eb.setGenerationStamp(proto.generationstamp());
+    eb.setNumBytes(proto.numbytes());
+    eb.setPoolId(proto.poolid());
+}
+
+static inline void Convert(Token & token,
+                           const TokenProto & proto) {
+    token.setIdentifier(proto.identifier());
+    token.setKind(proto.kind());
+    token.setPassword(proto.password());
+    token.setService(proto.service());
+}
+
+static inline void Convert(DatanodeInfo & node,
+                           const DatanodeInfoProto & proto) {
+    const DatanodeIDProto & idProto = proto.id();
+    node.setHostName(idProto.hostname());
+    node.setInfoPort(idProto.infoport());
+    node.setIpAddr(idProto.ipaddr());
+    node.setIpcPort(idProto.ipcport());
+    node.setDatanodeId(idProto.datanodeuuid());
+    node.setXferPort(idProto.xferport());
+    node.setLocation(proto.location());
+}
+
+static inline shared_ptr<LocatedBlock> Convert(const LocatedBlockProto & proto)
{
+    Token token;
+    shared_ptr<LocatedBlock> lb(new LocatedBlock);
+    Convert(token, proto.blocktoken());
+    lb->setToken(token);
+    std::vector<DatanodeInfo> & nodes = lb->mutableLocations();
+    nodes.resize(proto.locs_size());
+
+    for (int i = 0 ; i < proto.locs_size(); ++i) {
+        Convert(nodes[i], proto.locs(i));
+    }
+
+    if (proto.storagetypes_size() > 0) {
+        assert(proto.storagetypes_size() == proto.locs_size());
+        std::vector<std::string> & storageIDs = lb->mutableStorageIDs();
+        storageIDs.resize(proto.storagetypes_size());
+
+        for (int i = 0; i < proto.storagetypes_size(); ++i) {
+            storageIDs[i] = proto.storageids(i);
+        }
+    }
+
+    Convert(*lb, proto.b());
+    lb->setOffset(proto.offset());
+    lb->setCorrupt(proto.corrupt());
+    return lb;
+}
+
+static inline void Convert(LocatedBlocks & lbs,
+                           const LocatedBlocksProto & proto) {
+    shared_ptr<LocatedBlock> lb;
+    lbs.setFileLength(proto.filelength());
+    lbs.setIsLastBlockComplete(proto.islastblockcomplete());
+    lbs.setUnderConstruction(proto.underconstruction());
+
+    if (proto.has_lastblock()) {
+        lb = Convert(proto.lastblock());
+        lbs.setLastBlock(lb);
+    }
+
+    std::vector<LocatedBlock> & blocks = lbs.getBlocks();
+    blocks.resize(proto.blocks_size());
+
+    for (int i = 0; i < proto.blocks_size(); ++i) {
+        blocks[i] = *Convert(proto.blocks(i));
+    }
+
+    std::sort(blocks.begin(), blocks.end(), std::less<LocatedBlock>());
+}
+
+static inline void Convert(const std::string & src, FileStatus & fs,
+                           const HdfsFileStatusProto & proto) {
+    fs.setAccessTime(proto.access_time());
+    fs.setBlocksize(proto.blocksize());
+    fs.setGroup(proto.group().c_str());
+    fs.setLength(proto.length());
+    fs.setModificationTime(proto.modification_time());
+    fs.setOwner(proto.owner().c_str());
+    fs.setPath((src + "/" + proto.path()).c_str());
+    fs.setReplication(proto.block_replication());
+    fs.setSymlink(proto.symlink().c_str());
+    fs.setPermission(Permission(proto.permission().perm()));
+    fs.setIsdir(proto.filetype() == HdfsFileStatusProto::IS_DIR);
+}
+
+static inline void Convert(const std::string & src,
+                           std::vector<FileStatus> & dl,
+                           const DirectoryListingProto & proto) {
+    RepeatedPtrField<HdfsFileStatusProto> ptrproto = proto.partiallisting();
+
+    for (int i = 0; i < ptrproto.size(); i++) {
+        FileStatus fileStatus;
+        Convert(src, fileStatus, ptrproto.Get(i));
+        dl.push_back(fileStatus);
+    }
+}
+
+static inline Token Convert(const TokenProto & proto) {
+    Token retval;
+    retval.setIdentifier(proto.identifier());
+    retval.setKind(proto.kind());
+    retval.setPassword(proto.password());
+    return retval;
+}
+
+/*static inline void Convert(ContentSummary & contentSummary, const ContentSummaryProto
& proto) {
+    contentSummary.setDirectoryCount(proto.directorycount());
+    contentSummary.setFileCount(proto.filecount());
+    contentSummary.setLength(proto.length());
+    contentSummary.setQuota(proto.quota());
+    contentSummary.setSpaceConsumed(proto.spaceconsumed());
+    contentSummary.setSpaceQuota(proto.spacequota());
+}*/
+
+static inline void Build(const Token & token,
+                         TokenProto * proto) {
+    proto->set_identifier(token.getIdentifier());
+    proto->set_kind(token.getKind());
+    proto->set_password(token.getPassword());
+    proto->set_service(token.getService());
+}
+
+static inline void Build(const Permission & p, FsPermissionProto * proto) {
+    proto->set_perm(p.toShort());
+}
+
+static inline void Build(const DatanodeInfo & dn, DatanodeIDProto * proto) {
+    proto->set_hostname(dn.getHostName());
+    proto->set_infoport(dn.getInfoPort());
+    proto->set_ipaddr(dn.getIpAddr());
+    proto->set_ipcport(dn.getIpcPort());
+    proto->set_datanodeuuid(dn.getDatanodeId());
+    proto->set_xferport(dn.getXferPort());
+}
+
+static inline void Build(const std::vector<DatanodeInfo> & dns,
+                         RepeatedPtrField<DatanodeInfoProto> * proto) {
+    for (size_t i = 0; i < dns.size(); ++i) {
+        DatanodeInfoProto * p = proto->Add();
+        Build(dns[i], p->mutable_id());
+        p->set_location(dns[i].getLocation());
+    }
+}
+
+static inline void Build(const ExtendedBlock & eb, ExtendedBlockProto * proto) {
+    proto->set_blockid(eb.getBlockId());
+    proto->set_generationstamp(eb.getGenerationStamp());
+    proto->set_numbytes(eb.getNumBytes());
+    proto->set_poolid(eb.getPoolId());
+}
+
+static inline void Build(LocatedBlock & b, LocatedBlockProto * proto) {
+    proto->set_corrupt(b.isCorrupt());
+    proto->set_offset(b.getOffset());
+    Build(b, proto->mutable_b());
+    Build(b.getLocations(), proto->mutable_locs());
+}
+
+/*static inline void Build(const std::vector<LocatedBlock> & blocks,
+                         RepeatedPtrField<LocatedBlockProto> * proto) {
+    for (size_t i = 0; i < blocks.size(); ++i) {
+        LocatedBlockProto * p = proto->Add();
+        p->set_corrupt(blocks[i].isCorrupt());
+        p->set_offset(blocks[i].getOffset());
+        Build(blocks[i], p->mutable_b());
+    }
+}*/
+
+static inline void Build(const std::vector<std::string> & srcs,
+                         RepeatedPtrField<std::string> * proto) {
+    for (size_t i = 0; i < srcs.size(); ++i) {
+        proto->Add()->assign(srcs[i]);
+    }
+}
+
+static inline void Build(const std::vector<DatanodeInfo> & dns,
+                         RepeatedPtrField<DatanodeIDProto> * proto) {
+    for (size_t i = 0; i < dns.size(); ++i) {
+        Build(dns[i], proto->Add());
+    }
+}
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/CMakeLists.txt b/depends/libhdfs3/test/CMakeLists.txt
new file mode 100644
index 0000000..cad3885
--- /dev/null
+++ b/depends/libhdfs3/test/CMakeLists.txt
@@ -0,0 +1,43 @@
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
+
+SET(TEST_WORKING_DIR ${CMAKE_CURRENT_SOURCE_DIR}/data/)
+ADD_DEFINITIONS(-DGTEST_LANG_CXX11=0 -Dprivate=public -Dprotected=public -DDATA_DIR="${TEST_WORKING_DIR}/")
+
+ADD_SUBDIRECTORY(function)
+ADD_SUBDIRECTORY(unit)
+ADD_SUBDIRECTORY(secure)
+
+IF(TEST_RUNNER)
+    SEPARATE_ARGUMENTS(TEST_RUNNER_LIST UNIX_COMMAND ${TEST_RUNNER})
+ENDIF(TEST_RUNNER)
+
+ADD_CUSTOM_TARGET(unittest
+	COMMAND ${TEST_RUNNER_LIST} ${CMAKE_CURRENT_BINARY_DIR}/unit/unit
+	DEPENDS unit
+	WORKING_DIRECTORY ${TEST_WORKING_DIR}
+	COMMENT "Run Unit Test..."
+)
+
+ADD_CUSTOM_TARGET(functiontest
+	COMMAND ${TEST_RUNNER_LIST} ${CMAKE_CURRENT_BINARY_DIR}/function/function
+	DEPENDS function
+	WORKING_DIRECTORY ${TEST_WORKING_DIR}
+	COMMENT "Run Function Test..."
+)
+
+ADD_CUSTOM_TARGET(securetest
+	COMMAND ${TEST_RUNNER_LIST} ${CMAKE_CURRENT_BINARY_DIR}/secure/secure
+	DEPENDS secure
+	WORKING_DIRECTORY ${TEST_WORKING_DIR}
+	COMMENT "Run Security Function Test..."
+)
+
+ADD_CUSTOM_TARGET(test 
+    COMMAND ${CMAKE_MAKE_PROGRAM} unittest || true
+    COMMAND ${CMAKE_MAKE_PROGRAM} functiontest || true
+    COMMENT "Run All Test..."
+)
+
+SET(unit_SOURCES ${unit_SOURCES} PARENT_SCOPE)
+SET(function_SOURCES ${function_SOURCES} PARENT_SCOPE)
+SET(secure_SOURCES ${secure_SOURCES} PARENT_SCOPE)


Mime
View raw message