hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [1/5] HDFS-7012. Add hdfs native client RPC functionality (Zhanwei Wang via Colin P. McCabe)
Date Fri, 03 Oct 2014 18:28:49 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-10388 c7442f840 -> 4b2cc72fa


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.cc
new file mode 100644
index 0000000..3d499e7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.cc
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ClientNamenodeProtocol.pb.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Namenode.h"
+#include "NamenodeImpl.h"
+#include "rpc/RpcCall.h"
+#include "rpc/RpcClient.h"
+#include "RpcHelper.h"
+
+#define NAMENODE_VERSION 1
+#define NAMENODE_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
+#define DELEGATION_TOKEN_KIND "HDFS_DELEGATION_TOKEN"
+
+using namespace google::protobuf;
+using namespace hadoop::common;
+using namespace hadoop::hdfs;
+
+namespace hdfs {
+namespace internal {
+
+NamenodeImpl::NamenodeImpl(const char *host, const char *port,
+        const std::string &tokenService, const SessionConfig &c,
+        const RpcAuth &a) :
+    auth(a), client(RpcClient::getClient()), conf(c), protocol(
+        NAMENODE_VERSION, NAMENODE_PROTOCOL, DELEGATION_TOKEN_KIND),
+        server(tokenService, host, port) {
+}
+
+NamenodeImpl::~NamenodeImpl() {
+}
+
+void NamenodeImpl::invoke(const RpcCall &call) {
+    RpcChannel &channel = client.getChannel(auth, protocol, server, conf);
+
+    try {
+        channel.invoke(call);
+    } catch (...) {
+        channel.close(false);
+        throw;
+    }
+
+    channel.close(false);
+}
+
+//Idempotent
+void NamenodeImpl::getBlockLocations(const std::string &src, int64_t offset,
+        int64_t length, LocatedBlocks &lbs) /* throw (AccessControlException,
+         FileNotFoundException, UnresolvedLinkException, HdfsIOException) */ {
+    try {
+        GetBlockLocationsRequestProto request;
+        GetBlockLocationsResponseProto response;
+        request.set_length(length);
+        request.set_offset(offset);
+        request.set_src(src);
+        invoke(RpcCall(true, "getBlockLocations", &request, &response));
+        Convert(lbs, response.locations());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+void NamenodeImpl::create(const std::string &src, const Permission &masked,
+                          const std::string &clientName, int flag, bool createParent,
+                          short replication, int64_t blockSize)
+    /* throw (AccessControlException,
+         AlreadyBeingCreatedException, DSQuotaExceededException,
+         FileAlreadyExistsException, FileNotFoundException,
+         NSQuotaExceededException, ParentNotDirectoryException,
+          UnresolvedLinkException, HdfsIOException) */{
+    try {
+        CreateRequestProto request;
+        CreateResponseProto response;
+        request.set_blocksize(blockSize);
+        request.set_clientname(clientName);
+        request.set_createflag(flag);
+        request.set_createparent(createParent);
+        request.set_replication(replication);
+        request.set_src(src);
+        Build(masked, request.mutable_masked());
+        invoke(RpcCall(false, "create", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < AlreadyBeingCreatedException,
+                  DSQuotaExceededException, FileAlreadyExistsException,
+                  FileNotFoundException, NSQuotaExceededException,
+                  ParentNotDirectoryException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+shared_ptr<LocatedBlock> NamenodeImpl::append(const std::string &src,
+        const std::string &clientName)
+/* throw (AlreadyBeingCreatedException, DSQuotaExceededException,
+ FileNotFoundException,
+ UnresolvedLinkException, HdfsIOException) */{
+    try {
+        AppendRequestProto request;
+        AppendResponseProto response;
+        request.set_clientname(clientName);
+        request.set_src(src);
+        invoke(RpcCall(false, "append", &request, &response));
+
+        if (response.has_block()) {
+            return Convert(response.block());
+        } else {
+            return shared_ptr<LocatedBlock>();
+        }
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < AlreadyBeingCreatedException, AccessControlException,
+                  DSQuotaExceededException, FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+bool NamenodeImpl::setReplication(const std::string &src, short replication)
+/* throw (DSQuotaExceededException,
+ FileNotFoundException,  UnresolvedLinkException,
+ HdfsIOException) */{
+    try {
+        SetReplicationRequestProto request;
+        SetReplicationResponseProto response;
+        request.set_src(src.c_str());
+        request.set_replication(static_cast<uint32>(replication));
+        invoke(RpcCall(true, "setReplication", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < DSQuotaExceededException,
+                  FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+void NamenodeImpl::setPermission(const std::string &src,
+                                 const Permission &permission) /* throw (AccessControlException,
+         FileNotFoundException,
+         UnresolvedLinkException, HdfsIOException) */{
+    try {
+        SetPermissionRequestProto request;
+        SetPermissionResponseProto response;
+        request.set_src(src);
+        Build(permission, request.mutable_permission());
+        invoke(RpcCall(true, "setPermission", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+void NamenodeImpl::setOwner(const std::string &src,
+                            const std::string &username, const std::string &groupname)
+/* throw (FileNotFoundException,
+  UnresolvedLinkException, HdfsIOException) */{
+    try {
+        SetOwnerRequestProto request;
+        SetOwnerResponseProto response;
+        request.set_src(src);
+
+        if (!username.empty()) {
+            request.set_username(username);
+        }
+
+        if (!groupname.empty()) {
+            request.set_groupname(groupname);
+        }
+
+        invoke(RpcCall(true, "setOwner", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+void NamenodeImpl::abandonBlock(const ExtendedBlock &b,
+                                const std::string &src, const std::string &holder)
+/* throw (FileNotFoundException,
+ UnresolvedLinkException, HdfsIOException) */{
+    try {
+        AbandonBlockRequestProto request;
+        AbandonBlockResponseProto response;
+        request.set_holder(holder);
+        request.set_src(src);
+        Build(b, request.mutable_b());
+        invoke(RpcCall(false, "abandonBlock", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+shared_ptr<LocatedBlock> NamenodeImpl::addBlock(const std::string &src,
+        const std::string &clientName, const ExtendedBlock *previous,
+        const std::vector<DatanodeInfo> &excludeNodes)
+/* throw (FileNotFoundException,
+ NotReplicatedYetException,
+ UnresolvedLinkException, HdfsIOException) */{
+    try {
+        AddBlockRequestProto request;
+        AddBlockResponseProto response;
+        request.set_clientname(clientName);
+        request.set_src(src);
+
+        if (previous) {
+            Build(*previous, request.mutable_previous());
+        }
+
+        if (excludeNodes.size()) {
+            Build(excludeNodes, request.mutable_excludenodes());
+        }
+
+        invoke(RpcCall(true, "addBlock", &request, &response));
+        return Convert(response.block());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  NotReplicatedYetException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+shared_ptr<LocatedBlock> NamenodeImpl::getAdditionalDatanode(
+    const std::string &src, const ExtendedBlock &blk,
+    const std::vector<DatanodeInfo> &existings,
+    const std::vector<std::string> &storageIDs,
+    const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes,
+    const std::string &clientName)
+/* throw ( FileNotFoundException,
+  UnresolvedLinkException, HdfsIOException) */{
+    try {
+        GetAdditionalDatanodeRequestProto request;
+        GetAdditionalDatanodeResponseProto response;
+        request.set_src(src);
+        Build(existings, request.mutable_existings());
+        Build(storageIDs, request.mutable_existingstorageuuids());
+        Build(excludes, request.mutable_excludes());
+        Build(blk, request.mutable_blk());
+        request.set_clientname(clientName);
+        request.set_numadditionalnodes(numAdditionalNodes);
+        invoke(RpcCall(true, "getAdditionalDatanode", &request, &response));
+        return Convert(response.block());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper <
+        FileNotFoundException,
+        NotReplicatedYetException,
+        UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+bool NamenodeImpl::complete(const std::string &src,
+                            const std::string &clientName, const ExtendedBlock *last)
+/* throw (FileNotFoundException,
+  UnresolvedLinkException, HdfsIOException) */{
+    try {
+        CompleteRequestProto request;
+        CompleteResponseProto response;
+        request.set_clientname(clientName);
+        request.set_src(src);
+
+        if (last) {
+            Build(*last, request.mutable_last());
+        }
+
+        invoke(RpcCall(false, "complete", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+/*void NamenodeImpl::reportBadBlocks(const std::vector<LocatedBlock> &blocks)
+ throw (HdfsIOException) {
+    try {
+        ReportBadBlocksRequestProto request;
+        ReportBadBlocksResponseProto response;
+        Build(blocks, request.mutable_blocks());
+        invoke(RpcCall(true, "reportBadBlocks", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+bool NamenodeImpl::rename(const std::string &src, const std::string &dst)
+      /* throw (UnresolvedLinkException, HdfsIOException) */{
+    try {
+        RenameRequestProto request;
+        RenameResponseProto response;
+        request.set_src(src);
+        request.set_dst(dst);
+        invoke(RpcCall(false, "rename", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<UnresolvedLinkException, HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+/*void NamenodeImpl::concat(const std::string &trg,
+          const std::vector<std::string> &srcs)
+      throw (UnresolvedLinkException, HdfsIOException) {
+    try {
+        ConcatRequestProto request;
+        ConcatResponseProto response;
+        request.set_trg(trg);
+        Build(srcs, request.mutable_srcs());
+        invoke(RpcCall(false, "concat", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<UnresolvedLinkException, HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+bool NamenodeImpl::deleteFile(const std::string &src, bool recursive)
+      /* throw (FileNotFoundException, UnresolvedLinkException,
+       * HdfsIOException) */ {
+    try {
+        DeleteRequestProto request;
+        DeleteResponseProto response;
+        request.set_src(src);
+        request.set_recursive(recursive);
+        invoke(RpcCall(false, "delete", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+bool NamenodeImpl::mkdirs(const std::string &src, const Permission &masked,
+                          bool createParent) /* throw (AccessControlException,
+         FileAlreadyExistsException, FileNotFoundException,
+         NSQuotaExceededException, ParentNotDirectoryException,
+          UnresolvedLinkException, HdfsIOException) */{
+    try {
+        MkdirsRequestProto request;
+        MkdirsResponseProto response;
+        request.set_src(src);
+        request.set_createparent(createParent);
+        Build(masked, request.mutable_masked());
+        invoke(RpcCall(true, "mkdirs", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileAlreadyExistsException,
+                  FileNotFoundException, NSQuotaExceededException,
+                  ParentNotDirectoryException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+bool NamenodeImpl::getListing(const std::string &src,
+          const std::string &startAfter, bool needLocation,
+          std::vector<FileStatus> &dl) /* throw (AccessControlException,
+         FileNotFoundException, UnresolvedLinkException, HdfsIOException) */ {
+    try {
+        GetListingRequestProto request;
+        GetListingResponseProto response;
+        request.set_src(src);
+        request.set_startafter(startAfter);
+        request.set_needlocation(needLocation);
+        invoke(RpcCall(true, "getListing", &request, &response));
+
+        if (response.has_dirlist()) {
+            const DirectoryListingProto &lists = response.dirlist();
+            Convert(dl, lists);
+            return lists.remainingentries() > 0;
+        }
+
+        THROW(FileNotFoundException, "%s not found.", src.c_str());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+void NamenodeImpl::renewLease(const std::string &clientName)
+          /* throw (HdfsIOException) */{
+    try {
+        RenewLeaseRequestProto request;
+        RenewLeaseResponseProto response;
+        request.set_clientname(clientName);
+        invoke(RpcCall(true, "renewLease", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+bool NamenodeImpl::recoverLease(const std::string &src,
+           const std::string &clientName) /* throw (HdfsIOException) */ {
+    try {
+        RecoverLeaseRequestProto request;
+        RecoverLeaseResponseProto response;
+        request.set_src(src);
+        request.set_clientname(clientName);
+        invoke(RpcCall(true, "recoverLease", &request, &response));
+        return response.result();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+
+    return false;
+}
+
+//Idempotent
+std::vector<int64_t> NamenodeImpl::getFsStats() {
+      /* throw (HdfsIOException) */
+    try {
+        GetFsStatusRequestProto request;
+        GetFsStatsResponseProto response;
+        invoke(RpcCall(true, "getFsStats", &request, &response));
+        std::vector<int64_t> retval;
+        retval.push_back(response.capacity());
+        retval.push_back(response.used());
+        retval.push_back(response.remaining());
+        retval.push_back(response.under_replicated());
+        retval.push_back(response.corrupt_blocks());
+        retval.push_back(response.missing_blocks());
+        return retval;
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(
+                      e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+
+    return std::vector<int64_t>();
+}
+
+/*void NamenodeImpl::metaSave(const std::string &filename)
+ throw (HdfsIOException) {
+    try {
+        MetaSaveRequestProto request;
+        MetaSaveResponseProto response;
+        request.set_filename(filename);
+        invoke(RpcCall(true, "metaSave", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+FileStatus NamenodeImpl::getFileInfo(const std::string &src)
+/* throw (FileNotFoundException,
+ UnresolvedLinkException, HdfsIOException) */{
+    FileStatus retval;
+
+    try {
+        GetFileInfoRequestProto request;
+        GetFileInfoResponseProto response;
+        request.set_src(src);
+        invoke(RpcCall(true, "getFileInfo", &request, &response));
+
+        if (response.has_fs()) {
+            Convert(retval, response.fs());
+            assert(src.find_last_of('/') != src.npos);
+            const char *path = src.c_str() + src.find_last_of('/') + 1;
+            path = src == "/" ? "/" : path;
+            retval.setPath(path);
+            return retval;
+        }
+
+        THROW(FileNotFoundException, "Path %s does not exist.", src.c_str());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+/*FileStatus NamenodeImpl::getFileLinkInfo(const std::string &src)
+ throw (UnresolvedLinkException, HdfsIOException) {
+    FileStatus fileStatus;
+
+    try {
+        GetFileLinkInfoRequestProto request;
+        GetFileLinkInfoResponseProto response;
+        request.set_src(src);
+        invoke(RpcCall(true, "getFileLinkInfo", &request, &response));
+
+        if (response.has_fs()) {
+            Convert(fileStatus, response.fs());
+            return fileStatus;
+        }
+
+        THROW(FileNotFoundException, "Path %s does not exist.", src.c_str());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < UnresolvedLinkException,
+                  HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+/*ContentSummary NamenodeImpl::getContentSummary(const std::string &path)
+ throw (FileNotFoundException,
+ UnresolvedLinkException, HdfsIOException) {
+    ContentSummary contentSummary;
+
+    try {
+        GetContentSummaryRequestProto request;
+        GetContentSummaryResponseProto response;
+        request.set_path(path);
+        invoke(RpcCall(true, "getContentSummary", &request, &response));
+
+        if (response.has_summary()) {
+            Convert(contentSummary, response.summary());
+            return contentSummary;
+        }
+
+        THROW(FileNotFoundException, "Path %s does not exist.", path.c_str());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+/*void NamenodeImpl::setQuota(const std::string &path, int64_t namespaceQuota,
+                            int64_t diskspaceQuota)  throw (AccessControlException,
+         FileNotFoundException, UnresolvedLinkException, HdfsIOException) {
+    try {
+        SetQuotaRequestProto request;
+        SetQuotaResponseProto response;
+        request.set_path(path);
+        request.set_namespacequota(namespaceQuota);
+        request.set_diskspacequota(diskspaceQuota);
+        invoke(RpcCall(true, "diskspaceQuota", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+void NamenodeImpl::fsync(const std::string &src, const std::string &client)
+/* throw (FileNotFoundException,
+ UnresolvedLinkException, HdfsIOException) */{
+    try {
+        FsyncRequestProto request;
+        FsyncResponseProto response;
+        request.set_client(client);
+        request.set_src(src);
+        invoke(RpcCall(true, "fsync", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+//Idempotent
+void NamenodeImpl::setTimes(const std::string &src, int64_t mtime,
+                            int64_t atime) /* throw (FileNotFoundException,
+         UnresolvedLinkException, HdfsIOException) */{
+    try {
+        SetTimesRequestProto request;
+        SetTimesResponseProto response;
+        request.set_src(src);
+        request.set_mtime(mtime);
+        request.set_atime(atime);
+        invoke(RpcCall(true, "setTimes", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper < FileNotFoundException,
+                  UnresolvedLinkException, HdfsIOException > unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+/*void NamenodeImpl::createSymlink(const std::string &target,
+              const std::string &link, const Permission &dirPerm,
+              bool createParent)  throw (AccessControlException,
+         FileAlreadyExistsException, FileNotFoundException,
+         ParentNotDirectoryException,
+         UnresolvedLinkException, HdfsIOException) {
+    try {
+        CreateSymlinkRequestProto request;
+        CreateSymlinkResponseProto response;
+        request.set_target(target);
+        request.set_link(link);
+        request.set_createparent(createParent);
+        Build(dirPerm, request.mutable_dirperm());
+        invoke(RpcCall(true, "createSymlink", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<FileNotFoundException, HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+/*std::string NamenodeImpl::getLinkTarget(const std::string &path)
+ throw (FileNotFoundException, HdfsIOException) {
+    try {
+        GetLinkTargetRequestProto request;
+        GetLinkTargetResponseProto response;
+        request.set_path(path);
+        invoke(RpcCall(true, "getLinkTarget", &request, &response));
+        return response.targetpath();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<FileNotFoundException, HdfsIOException> unwrapper(
+            e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}*/
+
+//Idempotent
+shared_ptr<LocatedBlock> NamenodeImpl::updateBlockForPipeline(
+        const ExtendedBlock &block, const std::string &clientName)
+/* throw (HdfsIOException) */{
+    try {
+        UpdateBlockForPipelineRequestProto request;
+        UpdateBlockForPipelineResponseProto response;
+        request.set_clientname(clientName);
+        Build(block, request.mutable_block());
+        invoke(RpcCall(true, "updateBlockForPipeline", &request, &response));
+        return Convert(response.block());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+void NamenodeImpl::updatePipeline(const std::string &clientName,
+          const ExtendedBlock &oldBlock, const ExtendedBlock &newBlock,
+          const std::vector<DatanodeInfo> &newNodes,
+          const std::vector<std::string> &storageIDs) {
+    /* throw (HdfsIOException) */
+    try {
+        UpdatePipelineRequestProto request;
+        UpdatePipelineResponseProto response;
+        request.set_clientname(clientName);
+        Build(oldBlock, request.mutable_oldblock());
+        Build(newBlock, request.mutable_newblock());
+        Build(newNodes, request.mutable_newnodes());
+        Build(storageIDs, request.mutable_storageids());
+        invoke(RpcCall(false, "updatePipeline", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+Token NamenodeImpl::getDelegationToken(const std::string &renewer) {
+    try {
+        GetDelegationTokenRequestProto request;
+        GetDelegationTokenResponseProto response;
+        request.set_renewer(renewer);
+        invoke(RpcCall(true, "getDelegationToken", &request, &response));
+        return Convert(response.token());
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+int64_t NamenodeImpl::renewDelegationToken(const Token &token) {
+    try {
+        RenewDelegationTokenRequestProto request;
+        RenewDelegationTokenResponseProto response;
+        Build(token, request.mutable_token());
+        invoke(RpcCall(true, "renewDelegationToken", &request, &response));
+        return response.newexpirytime();
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsInvalidBlockToken, HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+void NamenodeImpl::cancelDelegationToken(const Token &token) {
+    try {
+        CancelDelegationTokenRequestProto request;
+        CancelDelegationTokenResponseProto response;
+        Build(token, request.mutable_token());
+        invoke(RpcCall(true, "cancelDelegationToken", &request, &response));
+    } catch (const HdfsRpcServerException &e) {
+        UnWrapper<HdfsInvalidBlockToken, HdfsIOException> unwrapper(e);
+        unwrapper.unwrap(__FILE__, __LINE__);
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.h
new file mode 100644
index 0000000..809d643
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeImpl.h
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_SERVER_NAMENODEIMPL_H_
+#define _HDFS_LIBHDFS3_SERVER_NAMENODEIMPL_H_
+
+#include "Namenode.h"
+
+namespace hdfs {
+namespace internal {
+
+class NamenodeImpl: public Namenode {
+public:
+    NamenodeImpl(const char *host, const char *port,
+            const std::string &tokenService, const SessionConfig &c,
+            const RpcAuth &a);
+
+    ~NamenodeImpl();
+
+    //Idempotent
+    void getBlockLocations(const std::string &src, int64_t offset,
+                           int64_t length, LocatedBlocks &lbs) /* throw (AccessControlException,
+             FileNotFoundException, UnresolvedLinkException,
+             HdfsIOException) */;
+
+    void create(const std::string &src, const Permission &masked,
+                const std::string &clientName, int flag, bool createParent,
+                short replication, int64_t blockSize) /* throw (AccessControlException,
+             AlreadyBeingCreatedException, DSQuotaExceededException,
+             FileAlreadyExistsException, FileNotFoundException,
+             NSQuotaExceededException, ParentNotDirectoryException,
+             SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    shared_ptr<LocatedBlock> append(const std::string &src, const std::string &clientName)
+    /* throw (AccessControlException,
+             DSQuotaExceededException, FileNotFoundException,
+             SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    bool setReplication(const std::string &src, short replication)
+    /* throw (AccessControlException, DSQuotaExceededException,
+     FileNotFoundException, SafeModeException, UnresolvedLinkException,
+     HdfsIOException) */;
+
+    //Idempotent
+    void setPermission(const std::string &src, const Permission &permission)
+    /* throw (AccessControlException, FileNotFoundException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    void setOwner(const std::string &src, const std::string &username,
+                  const std::string &groupname) /* throw (AccessControlException,
+             FileNotFoundException, SafeModeException,
+             UnresolvedLinkException, HdfsIOException) */;
+
+    void abandonBlock(const ExtendedBlock &b, const std::string &src,
+                      const std::string &holder) /* throw (AccessControlException,
+             FileNotFoundException, UnresolvedLinkException,
+             HdfsIOException) */;
+
+    shared_ptr<LocatedBlock> addBlock(const std::string &src, const std::string &clientName,
+                                      const ExtendedBlock *previous,
+                                      const std::vector<DatanodeInfo> &excludeNodes)
+    /* throw (AccessControlException, FileNotFoundException,
+     NotReplicatedYetException, SafeModeException,
+     UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    shared_ptr<LocatedBlock> getAdditionalDatanode(const std::string &src,
+            const ExtendedBlock &blk,
+            const std::vector<DatanodeInfo> &existings,
+            const std::vector<std::string> &storageIDs,
+            const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes,
+            const std::string &clientName)
+    /* throw (AccessControlException, FileNotFoundException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    bool complete(const std::string &src, const std::string &clientName,
+                  const ExtendedBlock *last) /* throw (AccessControlException,
+             FileNotFoundException, SafeModeException,
+             UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    void reportBadBlocks(const std::vector<LocatedBlock> &blocks)
+    /* throw (HdfsIOException) */;
+
+    bool rename(const std::string &src, const std::string &dst)
+    /* throw (UnresolvedLinkException, HdfsIOException) */;
+
+    void concat(const std::string &trg, const std::vector<std::string> &srcs)
+    /* throw (HdfsIOException, UnresolvedLinkException) */;
+
+    /*void rename2(const std::string &src, const std::string &dst)
+     throw (AccessControlException, DSQuotaExceededException,
+     FileAlreadyExistsException, FileNotFoundException,
+     NSQuotaExceededException, ParentNotDirectoryException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) ;*/
+
+    bool deleteFile(const std::string &src, bool recursive)
+    /* throw (AccessControlException, FileNotFoundException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    bool mkdirs(const std::string &src, const Permission &masked,
+                bool createParent) /* throw (AccessControlException,
+             FileAlreadyExistsException, FileNotFoundException,
+             NSQuotaExceededException, ParentNotDirectoryException,
+             SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    bool getListing(const std::string &src, const std::string &startAfter,
+                    bool needLocation, std::vector<FileStatus> &dl)
+    /* throw (AccessControlException, FileNotFoundException,
+     UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    void renewLease(const std::string &clientName)
+    /* throw (AccessControlException, HdfsIOException) */;
+
+    //Idempotent
+    bool recoverLease(const std::string &src, const std::string &clientName)
+    /* throw (HdfsIOException) */;
+
+    //Idempotent
+    std::vector<int64_t> getFsStats() /* throw (HdfsIOException) */;
+
+    void metaSave(const std::string &filename) /* throw (HdfsIOException) */;
+
+    //Idempotent
+    FileStatus getFileInfo(const std::string &src)
+    /* throw (AccessControlException, FileNotFoundException,
+     UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    FileStatus getFileLinkInfo(const std::string &src)
+    /* throw (AccessControlException, UnresolvedLinkException,
+     HdfsIOException) */;
+
+    /*    //Idempotent
+        ContentSummary getContentSummary(const std::string &path)
+         throw (AccessControlException, FileNotFoundException,
+         UnresolvedLinkException, HdfsIOException) ;*/
+
+    //Idempotent
+    void setQuota(const std::string &path, int64_t namespaceQuota,
+                  int64_t diskspaceQuota) /* throw (AccessControlException,
+             FileNotFoundException, UnresolvedLinkException,
+             HdfsIOException) */;
+
+    //Idempotent
+    void fsync(const std::string &src, const std::string &client)
+    /* throw (AccessControlException, FileNotFoundException,
+     UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    void setTimes(const std::string &src, int64_t mtime, int64_t atime)
+    /* throw (AccessControlException, FileNotFoundException,
+     UnresolvedLinkException, HdfsIOException) */;
+
+    void createSymlink(const std::string &target, const std::string &link,
+                       const Permission &dirPerm, bool createParent)
+    /* throw (AccessControlException, FileAlreadyExistsException,
+     FileNotFoundException, ParentNotDirectoryException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) */;
+
+    //Idempotent
+    std::string getLinkTarget(const std::string &path)
+    /* throw (AccessControlException, FileNotFoundException,
+     HdfsIOException) */;
+
+    //Idempotent
+    shared_ptr<LocatedBlock> updateBlockForPipeline(const ExtendedBlock &block,
+            const std::string &clientName)
+    /* throw (HdfsIOException) */;
+
+    void updatePipeline(const std::string &clientName,
+                        const ExtendedBlock &oldBlock, const ExtendedBlock &newBlock,
+                        const std::vector<DatanodeInfo> &newNodes,
+                        const std::vector<std::string> &storageIDs) /* throw (HdfsIOException) */;
+
+    //Idempotent
+    Token getDelegationToken(const std::string &renewer)
+    /* throws IOException*/;
+
+    //Idempotent
+    int64_t renewDelegationToken(const Token &token)
+    /*throws IOException*/;
+
+    //Idempotent
+    void cancelDelegationToken(const Token &token)
+    /*throws IOException*/;
+
+private:
+    void invoke(const RpcCall &call);
+
+private:
+    RpcAuth auth;
+    RpcClient &client;
+    RpcConfig conf;
+    RpcProtocolInfo protocol;
+    RpcServerInfo server;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_SERVER_NAMENODEIMPL_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.cc
new file mode 100644
index 0000000..1a0a655
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.cc
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "NamenodeInfo.h"
+#include "StringUtil.h"
+#include "XmlConfig.h"
+
+#include <string>
+#include <vector>
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+
+NamenodeInfo::NamenodeInfo() {
+}
+
+const char *const DFS_NAMESERVICES = "dfs.nameservices";
+const char *const DFS_NAMENODE_HA = "dfs.ha.namenodes";
+const char *const DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
+const char *const DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
+
+std::vector<NamenodeInfo> NamenodeInfo::GetHANamenodeInfo(
+    const std::string & service, const Config & conf) {
+    std::vector<NamenodeInfo> retval;
+    std::string strNameNodes = StringTrim(
+              conf.getString(std::string(DFS_NAMENODE_HA) + "." + service));
+    std::vector<std::string> nns = StringSplit(strNameNodes, ",");
+    retval.resize(nns.size());
+
+    for (size_t i = 0; i < nns.size(); ++i) {
+        std::string dfsRpcAddress = StringTrim(
+              std::string(DFS_NAMENODE_RPC_ADDRESS_KEY) + "." + service + "."
+              + StringTrim(nns[i]));
+        std::string dfsHttpAddress = StringTrim(
+              std::string(DFS_NAMENODE_HTTP_ADDRESS_KEY) + "." + service + "." +
+              StringTrim(nns[i]));
+        retval[i].setRpcAddr(StringTrim(conf.getString(dfsRpcAddress, "")));
+        retval[i].setHttpAddr(StringTrim(conf.getString(dfsHttpAddress, "")));
+    }
+
+    return retval;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.h
new file mode 100644
index 0000000..f317161
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeInfo.h
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS_SERVER_NAMENODEINFO_H_
+#define _HDFS_LIBHDFS_SERVER_NAMENODEINFO_H_
+
+#include "XmlConfig.h"
+
+#include <string>
+#include <vector>
+
+namespace hdfs {
+
+class NamenodeInfo {
+public:
+    NamenodeInfo();
+
+    const std::string &getHttpAddr() const {
+        return http_addr;
+    }
+
+    void setHttpAddr(const std::string &httpAddr) {
+        http_addr = httpAddr;
+    }
+
+    const std::string &getRpcAddr() const {
+        return rpc_addr;
+    }
+
+    void setRpcAddr(const std::string &rpcAddr) {
+        rpc_addr = rpcAddr;
+    }
+
+    static std::vector<NamenodeInfo> GetHANamenodeInfo(
+          const std::string &service, const Config &conf);
+
+private:
+    std::string rpc_addr;
+    std::string http_addr;
+};
+
+}
+
+#endif /* _HDFS_LIBHDFS_SERVER_NAMENODEINFO_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.cc
new file mode 100644
index 0000000..020f719
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.cc
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Logger.h"
+#include "NamenodeImpl.h"
+#include "NamenodeProxy.h"
+#include "StringUtil.h"
+
+#include <string>
+
+#include <sys/fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+
+namespace hdfs {
+namespace internal {
+
+static uint32_t GetInitNamenodeIndex(const std::string &id) {
+    std::string path = "/tmp/";
+    path += id;
+    int fd;
+    uint32_t index = 0;
+    /*
+     * try create the file
+     */
+    fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666);
+
+    if (fd < 0) {
+        if (errno == EEXIST) {
+            /*
+             * the file already exist, try to open it
+             */
+            fd = open(path.c_str(), O_RDONLY);
+        } else {
+            /*
+             * failed to create, do not care why
+             */
+            return 0;
+        }
+    } else {
+        if (0 != flock(fd, LOCK_EX)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return index;
+        }
+
+        /*
+         * created file, initialize it with 0
+         */
+        write(fd, &index, sizeof(index));
+        flock(fd, LOCK_UN);
+        close(fd);
+        return index;
+    }
+
+    /*
+     * the file exist, read it.
+     */
+    if (fd >= 0) {
+        if (0 != flock(fd, LOCK_SH)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return index;
+        }
+
+        if (sizeof(index) != read(fd, &index, sizeof(index))) {
+            /*
+             * failed to read, do not care why
+             */
+            index = 0;
+        }
+
+        flock(fd, LOCK_UN);
+        close(fd);
+    }
+
+    return index;
+}
+
+static void SetInitNamenodeIndex(const std::string &id, uint32_t index) {
+    std::string path = "/tmp/";
+    path += id;
+    int fd;
+    /*
+     * try open the file for write
+     */
+    fd = open(path.c_str(), O_WRONLY);
+
+    if (fd > 0) {
+        if (0 != flock(fd, LOCK_EX)) {
+            /*
+             * failed to lock
+             */
+            close(fd);
+            return;
+        }
+
+        write(fd, &index, sizeof(index));
+        flock(fd, LOCK_UN);
+        close(fd);
+    }
+}
+
+NamenodeProxy::NamenodeProxy(const std::vector<NamenodeInfo> &namenodeInfos,
+                             const std::string &tokenService,
+                             const SessionConfig &c, const RpcAuth &a) :
+    clusterid(tokenService), currentNamenode(0) {
+    if (namenodeInfos.size() == 1) {
+        enableNamenodeHA = false;
+        maxNamenodeHARetry = 0;
+    } else {
+        enableNamenodeHA = true;
+        maxNamenodeHARetry = c.getRpcMaxHaRetry();
+    }
+
+    for (size_t i = 0; i < namenodeInfos.size(); ++i) {
+        std::vector<std::string> nninfo = StringSplit(namenodeInfos[i].getRpcAddr(), ":");
+
+        if (nninfo.size() != 2) {
+            THROW(InvalidParameter, "Cannot create namenode proxy, %s does not contain host or port",
+                  namenodeInfos[i].getRpcAddr().c_str());
+        }
+
+        namenodes.push_back(
+            shared_ptr<Namenode>(
+                new NamenodeImpl(nninfo[0].c_str(), nninfo[1].c_str(), clusterid, c, a)));
+    }
+
+    if (enableNamenodeHA) {
+        currentNamenode = GetInitNamenodeIndex(clusterid) % namenodeInfos.size();
+    }
+}
+
+NamenodeProxy::~NamenodeProxy() {
+}
+
+shared_ptr<Namenode> NamenodeProxy::getActiveNamenode(uint32_t &oldValue) {
+    lock_guard<mutex> lock(mut);
+
+    if (namenodes.empty()) {
+        THROW(HdfsFileSystemClosed, "NamenodeProxy is closed.");
+    }
+
+    oldValue = currentNamenode;
+    return namenodes[currentNamenode % namenodes.size()];
+}
+
+void NamenodeProxy::failoverToNextNamenode(uint32_t oldValue) {
+    lock_guard<mutex> lock(mut);
+
+    if (oldValue != currentNamenode) {
+        //already failover in another thread.
+        return;
+    }
+
+    ++currentNamenode;
+    currentNamenode = currentNamenode % namenodes.size();
+    SetInitNamenodeIndex(clusterid, currentNamenode);
+}
+
+static void HandleHdfsFailoverException(const HdfsFailoverException &e) {
+    try {
+        rethrow_if_nested(e);
+    } catch (...) {
+        NESTED_THROW(hdfs::HdfsRpcException, "%s", e.what());
+    }
+
+    //should not reach here
+    abort();
+}
+
+#define NAMENODE_HA_RETRY_BEGIN() \
+    do { \
+        int __count = 0; \
+        do { \
+            uint32_t __oldValue = 0; \
+            shared_ptr<Namenode> namenode =  getActiveNamenode(__oldValue); \
+            try { \
+                (void)0
+
+#define NAMENODE_HA_RETRY_END() \
+    break; \
+    } catch (const NameNodeStandbyException &e) { \
+        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
+            throw; \
+        } \
+    } catch (const HdfsFailoverException &e) { \
+        if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
+            HandleHdfsFailoverException(e); \
+        } \
+    } \
+    failoverToNextNamenode(__oldValue); \
+    LOG(WARNING, "NamenodeProxy: Failover to another Namenode."); \
+    } while (true); \
+    } while (0)
+
+void NamenodeProxy::getBlockLocations(const std::string &src, int64_t offset,
+        int64_t length, LocatedBlocks &lbs) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->getBlockLocations(src, offset, length, lbs);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::create(const std::string &src, const Permission &masked,
+        const std::string &clientName, int flag, bool createParent,
+        short replication, int64_t blockSize) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->create(src, masked, clientName, flag, createParent, replication, blockSize);
+    NAMENODE_HA_RETRY_END();
+}
+
+shared_ptr<LocatedBlock> NamenodeProxy::append(const std::string &src,
+        const std::string &clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->append(src, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+bool NamenodeProxy::setReplication(const std::string &src, short replication) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->setReplication(src, replication);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+void NamenodeProxy::setPermission(const std::string &src,
+        const Permission &permission) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setPermission(src, permission);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::setOwner(const std::string &src,
+        const std::string &username, const std::string &groupname) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setOwner(src, username, groupname);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::abandonBlock(const ExtendedBlock &b,
+        const std::string &src, const std::string &holder) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->abandonBlock(b, src, holder);
+    NAMENODE_HA_RETRY_END();
+}
+
+shared_ptr<LocatedBlock> NamenodeProxy::addBlock(const std::string &src,
+        const std::string &clientName, const ExtendedBlock * previous,
+        const std::vector<DatanodeInfo> &excludeNodes) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->addBlock(src, clientName, previous, excludeNodes);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+shared_ptr<LocatedBlock> NamenodeProxy::getAdditionalDatanode(
+    const std::string &src, const ExtendedBlock &blk,
+    const std::vector<DatanodeInfo> &existings,
+    const std::vector<std::string> &storageIDs,
+    const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes,
+    const std::string &clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getAdditionalDatanode(src, blk, existings,
+                  storageIDs, excludes, numAdditionalNodes, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+bool NamenodeProxy::complete(const std::string &src,
+                  const std::string &clientName, const ExtendedBlock *last) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->complete(src, clientName, last);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+/*void NamenodeProxy::reportBadBlocks(const std::vector<LocatedBlock> &blocks) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->reportBadBlocks(blocks);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+bool NamenodeProxy::rename(const std::string &src, const std::string &dst) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->rename(src, dst);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+/*
+void NamenodeProxy::concat(const std::string &trg,
+                           const std::vector<std::string> &srcs) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->concat(trg, srcs);
+    NAMENODE_HA_RETRY_END();
+}
+*/
+
+bool NamenodeProxy::deleteFile(const std::string &src, bool recursive) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->deleteFile(src, recursive);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+bool NamenodeProxy::mkdirs(const std::string &src, const Permission &masked,
+          bool createParent) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->mkdirs(src, masked, createParent);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+bool NamenodeProxy::getListing(const std::string &src,
+          const std::string &startAfter, bool needLocation,
+          std::vector<FileStatus> &dl) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getListing(src, startAfter, needLocation, dl);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+void NamenodeProxy::renewLease(const std::string &clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->renewLease(clientName);
+    NAMENODE_HA_RETRY_END();
+}
+
+bool NamenodeProxy::recoverLease(const std::string &src,
+                                 const std::string &clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->recoverLease(src, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return false;
+}
+
+std::vector<int64_t> NamenodeProxy::getFsStats() {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFsStats();
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return std::vector<int64_t>();
+}
+
+/*void NamenodeProxy::metaSave(const std::string &filename) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->metaSave(filename);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+FileStatus NamenodeProxy::getFileInfo(const std::string &src) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFileInfo(src);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return FileStatus();
+}
+
+/*FileStatus NamenodeProxy::getFileLinkInfo(const std::string &src) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getFileLinkInfo(src);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return FileStatus();
+}*/
+
+/*ContentSummary NamenodeProxy::getContentSummary(const std::string &path) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getContentSummary(path);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return ContentSummary();
+}*/
+
+/*void NamenodeProxy::setQuota(const std::string &path, int64_t namespaceQuota,
+                             int64_t diskspaceQuota) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setQuota(path, namespaceQuota, diskspaceQuota);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+void NamenodeProxy::fsync(const std::string &src, const std::string &client) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->fsync(src, client);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::setTimes(const std::string &src, int64_t mtime,
+                             int64_t atime) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->setTimes(src, mtime, atime);
+    NAMENODE_HA_RETRY_END();
+}
+
+/*void NamenodeProxy::createSymlink(const std::string &target,
+                const std::string &link, const Permission &dirPerm,
+                bool createParent) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->createSymlink(target, link, dirPerm, createParent);
+    NAMENODE_HA_RETRY_END();
+}*/
+
+/*std::string NamenodeProxy::getLinkTarget(const std::string &path) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getLinkTarget(path);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return "";
+}*/
+
+shared_ptr<LocatedBlock> NamenodeProxy::updateBlockForPipeline(
+          const ExtendedBlock &block, const std::string &clientName) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->updateBlockForPipeline(block, clientName);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return shared_ptr<LocatedBlock>();
+}
+
+void NamenodeProxy::updatePipeline(const std::string &clientName,
+          const ExtendedBlock &oldBlock, const ExtendedBlock &newBlock,
+          const std::vector<DatanodeInfo> &newNodes,
+          const std::vector<std::string> &storageIDs) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->updatePipeline(clientName, oldBlock, newBlock,
+                             newNodes, storageIDs);
+    NAMENODE_HA_RETRY_END();
+}
+
+Token NamenodeProxy::getDelegationToken(const std::string &renewer) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->getDelegationToken(renewer);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return Token();
+}
+
+int64_t NamenodeProxy::renewDelegationToken(const Token &token) {
+    NAMENODE_HA_RETRY_BEGIN();
+    return namenode->renewDelegationToken(token);
+    NAMENODE_HA_RETRY_END();
+    assert(!"should not reach here");
+    return 0;
+}
+
+void NamenodeProxy::cancelDelegationToken(const Token &token) {
+    NAMENODE_HA_RETRY_BEGIN();
+    namenode->cancelDelegationToken(token);
+    NAMENODE_HA_RETRY_END();
+}
+
+void NamenodeProxy::close() {
+    lock_guard<mutex> lock(mut);
+    namenodes.clear();
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.h
new file mode 100644
index 0000000..2cbd460
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/NamenodeProxy.h
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_
+#define _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_
+
+#include "Namenode.h"
+#include "NamenodeInfo.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+
+namespace hdfs {
+namespace internal {
+
+class NamenodeProxy: public Namenode {
+public:
+    NamenodeProxy(const std::vector<NamenodeInfo> &namenodeInfos,
+            const std::string &tokenService,
+            const SessionConfig &c, const RpcAuth &a);
+    ~NamenodeProxy();
+
+public:
+
+    void getBlockLocations(const std::string &src, int64_t offset,
+            int64_t length, LocatedBlocks &lbs);
+
+    void create(const std::string &src, const Permission &masked,
+            const std::string &clientName, int flag, bool createParent,
+            short replication, int64_t blockSize);
+
+    shared_ptr<LocatedBlock> append(const std::string &src,
+            const std::string &clientName);
+
+    bool setReplication(const std::string &src, short replication);
+
+    void setPermission(const std::string &src, const Permission &permission);
+
+    void setOwner(const std::string &src, const std::string &username,
+            const std::string &groupname);
+
+    void abandonBlock(const ExtendedBlock &b, const std::string &src,
+            const std::string &holder);
+
+    shared_ptr<LocatedBlock> addBlock(const std::string &src,
+            const std::string &clientName, const ExtendedBlock *previous,
+            const std::vector<DatanodeInfo> &excludeNodes);
+
+    shared_ptr<LocatedBlock> getAdditionalDatanode(const std::string &src,
+            const ExtendedBlock &blk,
+            const std::vector<DatanodeInfo> &existings,
+            const std::vector<std::string> &storageIDs,
+            const std::vector<DatanodeInfo> &excludes, int numAdditionalNodes,
+            const std::string &clientName);
+
+    bool complete(const std::string &src, const std::string &clientName,
+            const ExtendedBlock *last);
+
+    void reportBadBlocks(const std::vector<LocatedBlock> &blocks);
+
+    bool rename(const std::string &src, const std::string &dst);
+
+    void concat(const std::string &trg, const std::vector<std::string> &srcs);
+
+    /*void rename2(const std::string &src, const std::string &dst)
+     throw (AccessControlException, DSQuotaExceededException,
+     FileAlreadyExistsException, FileNotFoundException,
+     NSQuotaExceededException, ParentNotDirectoryException,
+     SafeModeException, UnresolvedLinkException, HdfsIOException) ;*/
+
+    bool deleteFile(const std::string &src, bool recursive);
+
+    bool mkdirs(const std::string &src, const Permission &masked,
+            bool createParent);
+
+    bool getListing(const std::string &src, const std::string &startAfter,
+            bool needLocation, std::vector<FileStatus> &dl);
+
+    void renewLease(const std::string &clientName);
+
+    bool recoverLease(const std::string &src, const std::string &clientName);
+
+    std::vector<int64_t> getFsStats();
+
+    void metaSave(const std::string &filename);
+
+    FileStatus getFileInfo(const std::string &src);
+
+    FileStatus getFileLinkInfo(const std::string &src);
+
+    void setQuota(const std::string &path, int64_t namespaceQuota,
+            int64_t diskspaceQuota);
+
+    void fsync(const std::string &src, const std::string &client);
+
+    void setTimes(const std::string &src, int64_t mtime, int64_t atime);
+
+    void createSymlink(const std::string &target, const std::string &link,
+            const Permission &dirPerm, bool createParent);
+
+    std::string getLinkTarget(const std::string &path);
+
+    shared_ptr<LocatedBlock> updateBlockForPipeline(const ExtendedBlock &block,
+            const std::string &clientName);
+
+    void updatePipeline(const std::string &clientName,
+            const ExtendedBlock &oldBlock, const ExtendedBlock &newBlock,
+            const std::vector<DatanodeInfo> &newNodes,
+            const std::vector<std::string> &storageIDs);
+
+    Token getDelegationToken(const std::string &renewer);
+
+    int64_t renewDelegationToken(const Token &token);
+
+    void cancelDelegationToken(const Token &token);
+
+    void close();
+
+private:
+    shared_ptr<Namenode> getActiveNamenode(uint32_t &oldValue);
+    void failoverToNextNamenode(uint32_t oldValue);
+
+private:
+    bool enableNamenodeHA;
+    int maxNamenodeHARetry;
+    mutex mut;
+    std::string clusterid;
+    std::vector<shared_ptr<Namenode> > namenodes;
+    uint32_t currentNamenode;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_SERVER_NAMENODEPROXY_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/RpcHelper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/RpcHelper.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/RpcHelper.h
new file mode 100644
index 0000000..2183c55
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/server/RpcHelper.h
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
+#define _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_
+
+#include "ClientDatanodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "DatanodeInfo.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "ExtendedBlock.h"
+#include "LocatedBlock.h"
+#include "LocatedBlocks.h"
+#include "StackPrinter.h"
+#include "client/FileStatus.h"
+#include "client/Permission.h"
+#include "hdfs.pb.h"
+
+#include <algorithm>
+#include <cassert>
+
+using namespace google::protobuf;
+
+namespace hdfs {
+namespace internal {
+
+class Nothing {
+};
+
+template < typename T1 = Nothing, typename T2 = Nothing, typename T3 = Nothing,
+         typename T4 = Nothing, typename T5 = Nothing, typename T6 = Nothing,
+         typename T7 = Nothing, typename T8 = Nothing, typename T9 = Nothing,
+         typename T10 = Nothing, typename T11 = Nothing  >
+class UnWrapper: public UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing> {
+private:
+    typedef UnWrapper<T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Nothing> BaseType;
+
+public:
+    UnWrapper(const HdfsRpcServerException &e) :
+        BaseType(e), e(e) {
+    }
+
+    void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char *file,
+            int line) {
+        if (e.getErrClass() == T1::ReflexName) {
+#ifdef NEED_BOOST
+            boost::throw_exception(T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1, STACK_DEPTH).c_str()));
+#else
+            throw T1(e.getErrMsg(), SkipPathPrefix(file), line, PrintStack(1, STACK_DEPTH).c_str());
+#endif
+        } else {
+            BaseType::unwrap(file, line);
+        }
+    }
+private:
+    const HdfsRpcServerException &e;
+};
+
+template<>
+class UnWrapper < Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing,
+        Nothing, Nothing, Nothing, Nothing > {
+public:
+    UnWrapper(const HdfsRpcServerException &e) :
+        e(e) {
+    }
+    void ATTRIBUTE_NORETURN ATTRIBUTE_NOINLINE unwrap(const char *file,
+            int line) {
+        THROW(HdfsIOException,
+              "Unexpected exception: when unwrap the rpc remote exception \"%s\", %s in %s: %d",
+              e.getErrClass().c_str(), e.getErrMsg().c_str(), file, line);
+    }
+private:
+    const HdfsRpcServerException &e;
+};
+
+static inline void Convert(ExtendedBlock &eb,
+                const hadoop::hdfs::ExtendedBlockProto &proto) {
+    eb.setBlockId(proto.blockid());
+    eb.setGenerationStamp(proto.generationstamp());
+    eb.setNumBytes(proto.numbytes());
+    eb.setPoolId(proto.poolid());
+}
+
+static inline void Convert(Token &token,
+                const hadoop::common::TokenProto &proto) {
+    token.setIdentifier(proto.identifier());
+    token.setKind(proto.kind());
+    token.setPassword(proto.password());
+    token.setService(proto.service());
+}
+
+static inline void Convert(DatanodeInfo &node,
+                const hadoop::hdfs::DatanodeInfoProto &proto) {
+    const hadoop::hdfs::DatanodeIDProto &idProto = proto.id();
+    node.setHostName(idProto.hostname());
+    node.setInfoPort(idProto.infoport());
+    node.setIpAddr(idProto.ipaddr());
+    node.setIpcPort(idProto.ipcport());
+    node.setDatanodeId(idProto.datanodeuuid());
+    node.setXferPort(idProto.xferport());
+    node.setLocation(proto.location());
+}
+
+static inline shared_ptr<LocatedBlock> Convert(
+                const hadoop::hdfs::LocatedBlockProto &proto) {
+    Token token;
+    shared_ptr<LocatedBlock> lb(new LocatedBlock);
+    Convert(token, proto.blocktoken());
+    lb->setToken(token);
+    std::vector<DatanodeInfo> &nodes = lb->mutableLocations();
+    nodes.resize(proto.locs_size());
+
+    for (int i = 0 ; i < proto.locs_size(); ++i) {
+        Convert(nodes[i], proto.locs(i));
+    }
+
+    if (proto.storagetypes_size() > 0) {
+        assert(proto.storagetypes_size() == proto.locs_size());
+        std::vector<std::string> &storageIDs = lb->mutableStorageIDs();
+        storageIDs.resize(proto.storagetypes_size());
+
+        for (int i = 0; i < proto.storagetypes_size(); ++i) {
+            storageIDs[i] = proto.storageids(i);
+        }
+    }
+
+    Convert(*lb, proto.b());
+    lb->setOffset(proto.offset());
+    lb->setCorrupt(proto.corrupt());
+    return lb;
+}
+
+static inline void Convert(LocatedBlocks &lbs,
+                const hadoop::hdfs::LocatedBlocksProto &proto) {
+    shared_ptr<LocatedBlock> lb;
+    lbs.setFileLength(proto.filelength());
+    lbs.setIsLastBlockComplete(proto.islastblockcomplete());
+    lbs.setUnderConstruction(proto.underconstruction());
+
+    if (proto.has_lastblock()) {
+        lb = Convert(proto.lastblock());
+        lbs.setLastBlock(lb);
+    }
+
+    std::vector<LocatedBlock> &blocks = lbs.getBlocks();
+    blocks.resize(proto.blocks_size());
+
+    for (int i = 0; i < proto.blocks_size(); ++i) {
+        blocks[i] = *Convert(proto.blocks(i));
+    }
+
+    std::sort(blocks.begin(), blocks.end(), std::less<LocatedBlock>());
+}
+
+static inline void Convert(FileStatus &fs,
+                const hadoop::hdfs::HdfsFileStatusProto &proto) {
+    fs.setAccessTime(proto.access_time());
+    fs.setBlocksize(proto.blocksize());
+    fs.setGroup(proto.group().c_str());
+    fs.setLength(proto.length());
+    fs.setModificationTime(proto.modification_time());
+    fs.setOwner(proto.owner().c_str());
+    fs.setPath(proto.path().c_str());
+    fs.setReplication(proto.block_replication());
+    fs.setSymlink(proto.symlink().c_str());
+    fs.setPermission(Permission(proto.permission().perm()));
+    fs.setIsdir(proto.filetype() == hadoop::hdfs::HdfsFileStatusProto::IS_DIR);
+}
+
+static inline void Convert(std::vector<FileStatus> &dl,
+                const hadoop::hdfs::DirectoryListingProto &proto) {
+    RepeatedPtrField<hadoop::hdfs::HdfsFileStatusProto> ptrproto =
+          proto.partiallisting();
+
+    for (int i = 0; i < ptrproto.size(); i++) {
+        FileStatus fileStatus;
+        Convert(fileStatus, ptrproto.Get(i));
+        dl.push_back(fileStatus);
+    }
+}
+
+static inline Token Convert(const hadoop::common::TokenProto &proto) {
+    Token retval;
+    retval.setIdentifier(proto.identifier());
+    retval.setKind(proto.kind());
+    retval.setPassword(proto.password());
+    return retval;
+}
+
+/*static inline void Convert(ContentSummary &contentSummary, const ContentSummaryProto &proto) {
+    contentSummary.setDirectoryCount(proto.directorycount());
+    contentSummary.setFileCount(proto.filecount());
+    contentSummary.setLength(proto.length());
+    contentSummary.setQuota(proto.quota());
+    contentSummary.setSpaceConsumed(proto.spaceconsumed());
+    contentSummary.setSpaceQuota(proto.spacequota());
+}*/
+
+static inline void Build(const Token &token,
+                hadoop::common::TokenProto *proto) {
+    proto->set_identifier(token.getIdentifier());
+    proto->set_kind(token.getKind());
+    proto->set_password(token.getPassword());
+    proto->set_service(token.getService());
+}
+
+static inline void Build(const Permission &p,
+                hadoop::hdfs::FsPermissionProto *proto) {
+    proto->set_perm(p.toShort());
+}
+
+static inline void Build(const DatanodeInfo &dn,
+                hadoop::hdfs::DatanodeIDProto *proto) {
+    proto->set_hostname(dn.getHostName());
+    proto->set_infoport(dn.getInfoPort());
+    proto->set_ipaddr(dn.getIpAddr());
+    proto->set_ipcport(dn.getIpcPort());
+    proto->set_datanodeuuid(dn.getDatanodeId());
+    proto->set_xferport(dn.getXferPort());
+}
+
+static inline void Build(const std::vector<DatanodeInfo> &dns,
+                RepeatedPtrField<hadoop::hdfs::DatanodeInfoProto> *proto) {
+    for (size_t i = 0; i < dns.size(); ++i) {
+        hadoop::hdfs::DatanodeInfoProto *p = proto->Add();
+        Build(dns[i], p->mutable_id());
+        p->set_location(dns[i].getLocation());
+    }
+}
+
+static inline void Build(const ExtendedBlock &eb,
+                hadoop::hdfs::ExtendedBlockProto *proto) {
+    proto->set_blockid(eb.getBlockId());
+    proto->set_generationstamp(eb.getGenerationStamp());
+    proto->set_numbytes(eb.getNumBytes());
+    proto->set_poolid(eb.getPoolId());
+}
+
+static inline void Build(LocatedBlock &b,
+                hadoop::hdfs::LocatedBlockProto *proto) {
+    proto->set_corrupt(b.isCorrupt());
+    proto->set_offset(b.getOffset());
+    Build(b, proto->mutable_b());
+    Build(b.getLocations(), proto->mutable_locs());
+}
+
+/*static inline void Build(const std::vector<LocatedBlock> &blocks,
+                         RepeatedPtrField<LocatedBlockProto> *proto) {
+    for (size_t i = 0; i < blocks.size(); ++i) {
+        LocatedBlockProto *p = proto->Add();
+        p->set_corrupt(blocks[i].isCorrupt());
+        p->set_offset(blocks[i].getOffset());
+        Build(blocks[i], p->mutable_b());
+    }
+}*/
+
+static inline void Build(const std::vector<std::string> &srcs,
+                         RepeatedPtrField<std::string> *proto) {
+    for (size_t i = 0; i < srcs.size(); ++i) {
+        proto->Add()->assign(srcs[i]);
+    }
+}
+
+static inline void Build(const std::vector<DatanodeInfo> &dns,
+                RepeatedPtrField<hadoop::hdfs::DatanodeIDProto> *proto) {
+    for (size_t i = 0; i < dns.size(); ++i) {
+        Build(dns[i], proto->Add());
+    }
+}
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_SERVER_RPCHELPER_H_ */


Mime
View raw message