hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhuvnesh2...@apache.org
Subject [11/48] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified
Date Mon, 04 Apr 2016 05:09:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/proto/hdfs.proto b/depends/libhdfs3/src/proto/hdfs.proto
new file mode 100644
index 0000000..19e3f79
--- /dev/null
+++ b/depends/libhdfs3/src/proto/hdfs.proto
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+// This file contains protocol buffers that are used throughout HDFS -- i.e.
+// by the client, server, and data transfer protocols.
+
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "HdfsProtos";
+option java_generate_equals_and_hash = true;
+package Hdfs.Internal;
+
+import "Security.proto";
+
+/**
+ * Extended block idenfies a block
+ */
+message ExtendedBlockProto {
+  required string poolId = 1;   // Block pool id - gloablly unique across clusters
+  required uint64 blockId = 2;  // the local id within a pool
+  required uint64 generationStamp = 3;
+  optional uint64 numBytes = 4 [default = 0];  // len does not belong in ebid 
+                                               // here for historical reasons
+}
+
+/**
+ * Identifies a Datanode
+ */
+message DatanodeIDProto {
+  required string ipAddr = 1;    // IP address
+  required string hostName = 2;  // hostname
+  required string datanodeUuid = 3;     // UUID assigned to the Datanode. For
+                                        // upgraded clusters this is the same
+                                        // as the original StorageID of the
+                                        // Datanode.
+  required uint32 xferPort = 4;  // data streaming port
+  required uint32 infoPort = 5;  // datanode http port
+  required uint32 ipcPort = 6;   // ipc server port
+  optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
+}
+
+/**
+ * DatanodeInfo array
+ */
+message DatanodeInfosProto {
+  repeated DatanodeInfoProto datanodes = 1;
+}
+
+/**
+ * The status of a Datanode
+ */
+message DatanodeInfoProto {
+  required DatanodeIDProto id = 1;
+  optional uint64 capacity = 2 [default = 0];
+  optional uint64 dfsUsed = 3 [default = 0];
+  optional uint64 remaining = 4 [default = 0];
+  optional uint64 blockPoolUsed = 5 [default = 0];
+  optional uint64 lastUpdate = 6 [default = 0];
+  optional uint32 xceiverCount = 7 [default = 0];
+  optional string location = 8;
+  enum AdminState {
+    NORMAL = 0;
+    DECOMMISSION_INPROGRESS = 1;
+    DECOMMISSIONED = 2;
+  }
+
+  optional AdminState adminState = 10 [default = NORMAL];
+  optional uint64 cacheCapacity = 11 [default = 0];
+  optional uint64 cacheUsed = 12 [default = 0];
+}
+
+/**
+ * Summary of a file or directory
+ */
+message ContentSummaryProto {
+  required uint64 length = 1;
+  required uint64 fileCount = 2;
+  required uint64 directoryCount = 3;
+  required uint64 quota = 4;
+  required uint64 spaceConsumed = 5;
+  required uint64 spaceQuota = 6;
+}
+
+/**
+ * Contains a list of paths corresponding to corrupt files and a cookie
+ * used for iterative calls to NameNode.listCorruptFileBlocks.
+ *
+ */
+message CorruptFileBlocksProto {
+ repeated string files = 1;
+ required string   cookie = 2;
+}
+
+/**
+ * File or Directory permision - same spec as posix
+ */
+message FsPermissionProto {
+  required uint32 perm = 1;       // Actually a short - only 16bits used
+}
+
+/**
+ * Types of recognized storage media.
+ */
+enum StorageTypeProto {
+  DISK = 1;
+  SSD = 2;
+}
+
+/**
+ * A list of storage IDs. 
+ */
+message StorageUuidsProto {
+  repeated string storageUuids = 1;
+}
+
+/**
+ * A LocatedBlock gives information about a block and its location.
+ */ 
+message LocatedBlockProto {
+  required ExtendedBlockProto b  = 1;
+  required uint64 offset = 2;           // offset of first byte of block in the file
+  repeated DatanodeInfoProto locs = 3;  // Locations ordered by proximity to client ip
+  required bool corrupt = 4;            // true if all replicas of a block are corrupt, else false
+                                        // If block has few corrupt replicas, they are filtered and 
+                                        // their locations are not part of this object
+
+  required TokenProto blockToken = 5;
+  repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
+  repeated StorageTypeProto storageTypes = 7;
+  repeated string storageIDs = 8;
+}
+
+message DataEncryptionKeyProto {
+  required uint32 keyId = 1;
+  required string blockPoolId = 2;
+  required bytes nonce = 3;
+  required bytes encryptionKey = 4;
+  required uint64 expiryDate = 5;
+  optional string encryptionAlgorithm = 6;
+}
+
+
+/**
+ * A set of file blocks and their locations.
+ */
+message LocatedBlocksProto {
+  required uint64 fileLength = 1;
+  repeated LocatedBlockProto blocks = 2;
+  required bool underConstruction = 3;
+  optional LocatedBlockProto lastBlock = 4;
+  required bool isLastBlockComplete = 5;
+}
+
+
+/**
+ * Status of a file, directory or symlink
+ * Optionally includes a file's block locations if requested by client on the rpc call.
+ */
+message HdfsFileStatusProto {
+  enum FileType {
+    IS_DIR = 1;
+    IS_FILE = 2;
+    IS_SYMLINK = 3;
+  }
+  required FileType fileType = 1;
+  required bytes path = 2;          // local name of inode encoded java UTF8
+  required uint64 length = 3;
+  required FsPermissionProto permission = 4;
+  required string owner = 5;
+  required string group = 6;
+  required uint64 modification_time = 7;
+  required uint64 access_time = 8;
+
+  // Optional fields for symlink
+  optional bytes symlink = 9;             // if symlink, target encoded java UTF8 
+
+  // Optional fields for file
+  optional uint32 block_replication = 10 [default = 0]; // only 16bits used
+  optional uint64 blocksize = 11 [default = 0];
+  optional LocatedBlocksProto locations = 12;  // suppled only if asked by client
+
+  // Optional field for fileId
+  optional uint64 fileId = 13 [default = 0]; // default as an invalid id
+  optional int32 childrenNum = 14 [default = -1];
+} 
+
+/**
+ * Checksum algorithms/types used in HDFS
+ * Make sure this enum's integer values match enum values' id properties defined
+ * in org.apache.hadoop.util.DataChecksum.Type
+ */
+enum ChecksumTypeProto {
+  CHECKSUM_NULL = 0;
+  CHECKSUM_CRC32 = 1;
+  CHECKSUM_CRC32C = 2;
+}
+
+/**
+ * HDFS Server Defaults
+ */
+message FsServerDefaultsProto {
+  required uint64 blockSize = 1;
+  required uint32 bytesPerChecksum = 2;
+  required uint32 writePacketSize = 3;
+  required uint32 replication = 4; // Actually a short - only 16 bits used
+  required uint32 fileBufferSize = 5;
+  optional bool encryptDataTransfer = 6 [default = false];
+  optional uint64 trashInterval = 7 [default = 0];
+  optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
+}
+
+
+/**
+ * Directory listing
+ */
+message DirectoryListingProto {
+  repeated HdfsFileStatusProto partialListing = 1;
+  required uint32 remainingEntries  = 2;
+}
+
+/**
+ * Status of a snapshottable directory: besides the normal information for 
+ * a directory status, also include snapshot quota, number of snapshots, and
+ * the full path of the parent directory. 
+ */
+message SnapshottableDirectoryStatusProto {
+  required HdfsFileStatusProto dirStatus = 1;
+
+  // Fields specific for snapshottable directory
+  required uint32 snapshot_quota = 2;
+  required uint32 snapshot_number = 3;
+  required bytes parent_fullpath = 4;
+}
+
+/**
+ * Snapshottable directory listing
+ */
+message SnapshottableDirectoryListingProto {
+  repeated SnapshottableDirectoryStatusProto snapshottableDirListing = 1;
+}
+
+/**
+ * Snapshot diff report entry
+ */
+message SnapshotDiffReportEntryProto {
+  required bytes fullpath = 1;
+  required string modificationLabel = 2;
+}
+
+/**
+ * Snapshot diff report
+ */
+message SnapshotDiffReportProto {
+  // full path of the directory where snapshots were taken
+  required string snapshotRoot = 1;
+  required string fromSnapshot = 2;
+  required string toSnapshot = 3;
+  repeated SnapshotDiffReportEntryProto diffReportEntries = 4;
+}
+
+/**
+ * Common node information shared by all the nodes in the cluster
+ */
+message StorageInfoProto {
+  required uint32 layoutVersion = 1; // Layout version of the file system
+  required uint32 namespceID = 2;    // File system namespace ID
+  required string clusterID = 3;     // ID of the cluster
+  required uint64 cTime = 4;         // File system creation time
+}
+
+/**
+ * Information sent by a namenode to identify itself to the primary namenode.
+ */
+message NamenodeRegistrationProto {
+  required string rpcAddress = 1;    // host:port of the namenode RPC address
+  required string httpAddress = 2;   // host:port of the namenode http server
+  enum NamenodeRoleProto {
+    NAMENODE = 1;
+    BACKUP = 2;
+    CHECKPOINT = 3;
+  }
+  required StorageInfoProto storageInfo = 3;  // Node information
+  optional NamenodeRoleProto role = 4 [default = NAMENODE];        // Namenode role
+}
+
+/**
+ * Unique signature to identify checkpoint transactions.
+ */
+message CheckpointSignatureProto {
+  required string blockPoolId = 1;
+  required uint64 mostRecentCheckpointTxId = 2;
+  required uint64 curSegmentTxId = 3;
+  required StorageInfoProto storageInfo = 4;
+}
+
+/**
+ * Command sent from one namenode to another namenode.
+ */
+message NamenodeCommandProto {
+  enum Type {
+    NamenodeCommand = 0;      // Base command
+    CheckPointCommand = 1;    // Check point command
+  }
+  required uint32 action = 1;
+  required Type type = 2;
+  optional CheckpointCommandProto checkpointCmd = 3;
+}
+
+/**
+ * Command returned from primary to checkpointing namenode.
+ * This command has checkpoint signature that identifies
+ * checkpoint transaction and is needed for further
+ * communication related to checkpointing.
+ */
+message CheckpointCommandProto {
+  // Unique signature to identify checkpoint transation
+  required CheckpointSignatureProto signature = 1; 
+
+  // If true, return transfer image to primary upon the completion of checkpoint
+  required bool needToReturnImage = 2;
+}
+
+/**
+ * Block information
+ */
+message BlockProto {
+  required uint64 blockId = 1;
+  required uint64 genStamp = 2;
+  optional uint64 numBytes = 3 [default = 0];
+}
+
+/**
+ * Block and datanodes where is it located
+ */
+message BlockWithLocationsProto {
+  required BlockProto block = 1;   // Block
+  repeated string datanodeUuids = 2; // Datanodes with replicas of the block
+  repeated string storageUuids = 3;  // Storages with replicas of the block
+}
+
+/**
+ * List of block with locations
+ */
+message BlocksWithLocationsProto {
+  repeated BlockWithLocationsProto blocks = 1;
+}
+
+/**
+ * Editlog information with available transactions
+ */
+message RemoteEditLogProto {
+  required uint64 startTxId = 1;  // Starting available edit log transaction
+  required uint64 endTxId = 2;    // Ending available edit log transaction
+  optional bool isInProgress = 3 [default = false];
+}
+
+/**
+ * Enumeration of editlogs available on a remote namenode
+ */
+message RemoteEditLogManifestProto {
+  repeated RemoteEditLogProto logs = 1;
+}
+
+/**
+ * Namespace information that describes namespace on a namenode
+ */
+message NamespaceInfoProto {
+  required string buildVersion = 1;         // Software revision version (e.g. an svn or git revision)
+  required uint32 unused = 2;               // Retained for backward compatibility
+  required string blockPoolID = 3;          // block pool used by the namespace
+  required StorageInfoProto storageInfo = 4;// Node information
+  required string softwareVersion = 5;      // Software version number (e.g. 2.0.0)
+}
+
+/**
+ * Block access token information
+ */
+message BlockKeyProto {
+  required uint32 keyId = 1;      // Key identifier
+  required uint64 expiryDate = 2; // Expiry time in milliseconds
+  optional bytes keyBytes = 3;    // Key secret
+}
+
+/**
+ * Current key and set of block keys at the namenode.
+ */
+message ExportedBlockKeysProto {
+  required bool isBlockTokenEnabled = 1;
+  required uint64 keyUpdateInterval = 2;
+  required uint64 tokenLifeTime = 3;
+  required BlockKeyProto currentKey = 4;
+  repeated BlockKeyProto allKeys = 5;
+}
+
+/**
+ * State of a block replica at a datanode
+ */
+enum ReplicaStateProto {
+  FINALIZED = 0;  // State of a replica when it is not modified
+  RBW = 1;        // State of replica that is being written to
+  RWR = 2;        // State of replica that is waiting to be recovered
+  RUR = 3;        // State of replica that is under recovery
+  TEMPORARY = 4;  // State of replica that is created for replication
+}
+
+/**
+ * Block that needs to be recovered with at a given location
+ */
+message RecoveringBlockProto {
+  required uint64 newGenStamp = 1;      // New genstamp post recovery
+  required LocatedBlockProto block = 2; // Block to be recovered
+}
+
+/**
+ * void request
+ */
+message VersionRequestProto {
+}
+
+/**
+ * Version response from namenode.
+ */
+message VersionResponseProto {
+  required NamespaceInfoProto info = 1;
+}
+
+/**
+ * Information related to a snapshot
+ * TODO: add more information
+ */
+message SnapshotInfoProto {
+  required string snapshotName = 1;
+  required string snapshotRoot = 2;
+  required FsPermissionProto permission = 3;
+  required string owner = 4;
+  required string group = 5;
+  required string createTime = 6;
+  // TODO: do we need access time?
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcAuth.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcAuth.cpp b/depends/libhdfs3/src/rpc/RpcAuth.cpp
new file mode 100644
index 0000000..b09c8fd
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcAuth.cpp
@@ -0,0 +1,55 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RpcAuth.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+namespace Hdfs {
+namespace Internal {
+
+AuthMethod RpcAuth::ParseMethod(const std::string & str) {
+    if (0 == strcasecmp(str.c_str(), "SIMPLE")) {
+        return AuthMethod::SIMPLE;
+    } else if (0 == strcasecmp(str.c_str(), "KERBEROS")) {
+        return AuthMethod::KERBEROS;
+    } else if (0 == strcasecmp(str.c_str(), "TOKEN")) {
+        return AuthMethod::TOKEN;
+    } else {
+        THROW(InvalidParameter, "RpcAuth: Unknown auth mechanism type: %s",
+              str.c_str());
+    }
+}
+
+size_t RpcAuth::hash_value() const {
+    size_t values[] = { Int32Hasher(method), user.hash_value() };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcAuth.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcAuth.h b/depends/libhdfs3/src/rpc/RpcAuth.h
new file mode 100644
index 0000000..379eae8
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcAuth.h
@@ -0,0 +1,106 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+#define _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+
+#include "client/UserInfo.h"
+#include "Hash.h"
+
+#include <string>
+
+namespace Hdfs {
+namespace Internal {
+
+enum AuthMethod {
+    SIMPLE = 80, KERBEROS = 81, //"GSSAPI"
+    TOKEN = 82, //"DIGEST-MD5"
+    UNKNOWN = 255
+};
+
+enum AuthProtocol {
+    NONE = 0, SASL = -33
+};
+
+class RpcAuth {
+public:
+    RpcAuth() :
+        method(SIMPLE) {
+    }
+
+    explicit RpcAuth(AuthMethod mech) :
+        method(mech) {
+    }
+
+    RpcAuth(const UserInfo & ui, AuthMethod mech) :
+        method(mech), user(ui) {
+    }
+
+    AuthProtocol getProtocol() const {
+        return method == SIMPLE ? AuthProtocol::NONE : AuthProtocol::SASL;
+    }
+
+    const UserInfo & getUser() const {
+        return user;
+    }
+
+    UserInfo & getUser() {
+        return user;
+    }
+
+    void setUser(const UserInfo & user) {
+        this->user = user;
+    }
+
+    AuthMethod getMethod() const {
+        return method;
+    }
+
+    void setMethod(AuthMethod method) {
+        this->method = method;
+    }
+
+    size_t hash_value() const;
+
+    bool operator ==(const RpcAuth & other) const {
+        return method == other.method && user == other.user;
+    }
+
+public:
+    static AuthMethod ParseMethod(const std::string & str);
+
+private:
+    AuthMethod method;
+    UserInfo user;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::RpcAuth);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCAUTH_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcCall.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcCall.h b/depends/libhdfs3/src/rpc/RpcCall.h
new file mode 100644
index 0000000..796fcf3
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcCall.h
@@ -0,0 +1,84 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+
+#include "google/protobuf/message.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class RpcCall {
+public:
+    RpcCall(bool idemp, std::string n, google::protobuf::Message * req,
+            google::protobuf::Message * resp) :
+        idempotent(idemp), name(n), request(req), response(resp) {
+    }
+
+    bool isIdempotent() const {
+        return idempotent;
+    }
+
+    const char * getName() const {
+        return name.c_str();
+    }
+
+    void setIdempotent(bool idempotent) {
+        this->idempotent = idempotent;
+    }
+
+    void setName(const std::string & name) {
+        this->name = name;
+    }
+
+    google::protobuf::Message * getRequest() {
+        return request;
+    }
+
+    void setRequest(google::protobuf::Message * request) {
+        this->request = request;
+    }
+
+    google::protobuf::Message * getResponse() {
+        return response;
+    }
+
+    void setResponse(google::protobuf::Message * response) {
+        this->response = response;
+    }
+
+private:
+    bool idempotent;
+    std::string name;
+    google::protobuf::Message * request;
+    google::protobuf::Message * response;
+};
+
+}
+}
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCALL_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannel.cpp b/depends/libhdfs3/src/rpc/RpcChannel.cpp
new file mode 100644
index 0000000..b427541
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcChannel.cpp
@@ -0,0 +1,859 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "Logger.h"
+#include "RpcChannel.h"
+#include "RpcClient.h"
+#include "RpcContentWrapper.h"
+#include "RpcHeader.pb.h"
+#include "RpcHeader.pb.h"
+#include "server/RpcHelper.h"
+#include "Thread.h"
+#include "WriteBuffer.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#define RPC_HEADER_MAGIC "hrpc"
+#define RPC_HEADER_VERSION 9
+#define SERIALIZATION_TYPE_PROTOBUF 0
+#define CONNECTION_CONTEXT_CALL_ID -3
+
+using namespace ::google::protobuf;
+using namespace google::protobuf::io;
+
+namespace Hdfs {
+namespace Internal {
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, RpcClient & c) :
+    refs(0), available(false), key(k), client(c) {
+    sock = shared_ptr<Socket>(new TcpSocketImpl);
+    sock->setLingerTimeout(k.getConf().getLingerTimeout());
+    in = shared_ptr<BufferedSocketReader>(
+             new BufferedSocketReaderImpl(
+                 *static_cast<TcpSocketImpl *>(sock.get())));
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey & k, Socket * s,
+                               BufferedSocketReader * in, RpcClient & c) :
+    refs(0), available(false), key(k), client(c) {
+    sock = shared_ptr<Socket>(s);
+    this->in = shared_ptr<BufferedSocketReader>(in);
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::~RpcChannelImpl() {
+    assert(pendingCalls.empty());
+    assert(refs == 0);
+
+    if (available) {
+        sock->close();
+    }
+}
+
+void RpcChannelImpl::close(bool immediate) {
+    lock_guard<mutex> lock(writeMut);
+    --refs;
+    assert(refs >= 0);
+
+    if (immediate && !refs) {
+        assert(pendingCalls.empty());
+        available = false;
+        sock->close();
+    }
+}
+
+void RpcChannelImpl::sendSaslMessage(RpcSaslProto * msg, Message * resp) {
+    int totalLen;
+    WriteBuffer buffer;
+    RpcRequestHeaderProto rpcHeader;
+    rpcHeader.set_callid(AuthProtocol::SASL);
+    rpcHeader.set_clientid(client.getClientId());
+    rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    RpcContentWrapper wrapper(&rpcHeader, msg);
+    totalLen = wrapper.getLength();
+    buffer.writeBigEndian(totalLen);
+    wrapper.writeTo(buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), key.getConf().getWriteTimeout());
+    RpcRemoteCallPtr call(
+        new RpcRemoteCall(RpcCall(false, "sasl message", NULL, resp),
+                          AuthProtocol::SASL, client.getClientId()));
+    pendingCalls[AuthProtocol::SASL] = call;
+}
+
+
+const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient(
+    const RepeatedPtrField<RpcSaslProto_SaslAuth> * auths) {
+    const RpcSaslProto_SaslAuth * auth = NULL;
+    Token token;
+
+    for (int i = 0; i < auths->size(); ++i) {
+        auth = &auths->Get(i);
+        RpcAuth method(RpcAuth::ParseMethod(auth->method()));
+
+        if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) {
+            token = key.getToken();
+            break;
+        } else if (method.getMethod() == AuthMethod::KERBEROS) {
+            break;
+        } else if (method.getMethod() == AuthMethod::SIMPLE) {
+            return auth;
+        } else if (method.getMethod() == AuthMethod::UNKNOWN) {
+            return auth;
+        } else {
+            auth = NULL;
+        }
+    }
+
+    if (!auth) {
+        std::stringstream ss;
+        ss.imbue(std::locale::classic());
+        ss << "Client cannot authenticate via: ";
+
+        for (int i = 0; i < auths->size(); ++i) {
+            auth = &auths->Get(i);
+            ss << auth->mechanism() << ", ";
+        }
+
+        THROW(AccessControlException, "%s", ss.str().c_str());
+    }
+
+    saslClient = shared_ptr<SaslClient>(
+                     new SaslClient(*auth, token, key.getAuth().getUser().getPrincipal()));
+    return auth;
+}
+
+std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto & response, bool serverIsDone) {
+    std::string token;
+
+    if (response.has_token()) {
+        token = saslClient->evaluateChallenge(response.token());
+    } else if (!serverIsDone) {
+        THROW(AccessControlException, "Server challenge contains no token");
+    }
+
+    if (serverIsDone) {
+        if (!saslClient->isComplete()) {
+            THROW(AccessControlException, "Client is out of sync with server");
+        }
+
+        if (!token.empty()) {
+            THROW(AccessControlException, "Client generated spurious response");
+        }
+    }
+
+    return token;
+}
+
+RpcAuth RpcChannelImpl::setupSaslConnection() {
+    RpcAuth retval;
+    RpcSaslProto negotiateRequest, response, msg;
+    negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE);
+    sendSaslMessage(&negotiateRequest, &response);
+    bool done = false;
+
+    do {
+        readOneResponse(false);
+        msg.Clear();
+
+        switch (response.state()) {
+        case RpcSaslProto_SaslState_NEGOTIATE: {
+            const RpcSaslProto_SaslAuth * auth = createSaslClient(
+                    &response.auths());
+            retval = RpcAuth(RpcAuth::ParseMethod(auth->method()));
+
+            if (retval.getMethod() == AuthMethod::SIMPLE) {
+                done = true;
+            } else if (retval.getMethod() == AuthMethod::UNKNOWN) {
+                THROW(AccessControlException, "Unknown auth mechanism");
+            } else {
+                std::string respToken;
+                RpcSaslProto_SaslAuth * respAuth = msg.add_auths();
+                respAuth->CopyFrom(*auth);
+                std::string chanllege;
+
+                if (auth->has_challenge()) {
+                    chanllege = auth->challenge();
+                    respAuth->clear_challenge();
+                }
+
+                respToken = saslClient->evaluateChallenge(chanllege);
+
+                if (!respToken.empty()) {
+                    msg.set_token(respToken);
+                }
+
+                msg.set_state(RpcSaslProto_SaslState_INITIATE);
+            }
+
+            break;
+        }
+
+        case RpcSaslProto_SaslState_CHALLENGE: {
+            if (!saslClient) {
+                THROW(AccessControlException, "Server sent unsolicited challenge");
+            }
+
+            std::string token = saslEvaluateToken(response, false);
+            msg.set_token(token);
+            msg.set_state(RpcSaslProto_SaslState_RESPONSE);
+            break;
+        }
+
+        case RpcSaslProto_SaslState_SUCCESS:
+            if (!saslClient) {
+                retval = RpcAuth(AuthMethod::SIMPLE);
+            } else {
+                saslEvaluateToken(response, true);
+            }
+
+            done = true;
+            break;
+
+        default:
+            break;
+        }
+
+        if (!done) {
+            response.Clear();
+            sendSaslMessage(&msg, &response);
+        }
+    } while (!done);
+
+    return retval;
+}
+
+void RpcChannelImpl::connect() {
+    int sleep = 1;
+    exception_ptr lastError;
+    const RpcConfig & conf = key.getConf();
+    const RpcServerInfo & server = key.getServer();
+    std::string buffer;
+
+    for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) {
+        RpcAuth auth = key.getAuth();
+
+        if (key.hasToken()) {
+            auth.setMethod(AuthMethod::TOKEN);
+        }
+
+        try {
+            while (true) {
+                sock->connect(server.getHost().c_str(), server.getPort().c_str(),
+                              conf.getConnectTimeout());
+                sock->setNoDelay(conf.isTcpNoDelay());
+                sendConnectionHeader(auth);
+
+                if (auth.getProtocol() == AuthProtocol::SASL) {
+                    auth = setupSaslConnection();
+
+                    if (auth.getProtocol() == AuthProtocol::SASL) {
+                        //success
+                        break;
+                    }
+
+                    /*
+                     * switch to other auth protocol
+                     */
+                    sock->close();
+                    CheckOperationCanceled();
+                } else {
+                    break;
+                }
+            }
+
+            auth.setUser(key.getAuth().getUser());
+            sendConnectionContent(auth);
+            available = true;
+            lastActivity = lastIdle = steady_clock::now();
+            return;
+        } catch (const SaslException & e) {
+            /*
+             * Namenode may treat this connect as replay, retry later
+             */
+            sleep = (rand() % 5) + 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
+        } catch (const HdfsNetworkException & e) {
+            sleep = 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
+        } catch (const HdfsTimeoutException & e) {
+            sleep = 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer));
+        }
+
+        if (i + 1 < conf.getMaxRetryOnConnect()) {
+            LOG(INFO,
+                "Retrying connect to server: \"%s:%s\". Already tried %d time(s)",
+                server.getHost().c_str(), server.getPort().c_str(), i + 1);
+        }
+
+        sock->close();
+        CheckOperationCanceled();
+        sleep_for(seconds(sleep));
+    }
+
+    rethrow_exception(lastError);
+}
+
+exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) {
+    const RpcCall & call = remote->getCall();
+    exception_ptr lastError;
+
+    try {
+        if (client.isRunning()) {
+            lock_guard<mutex> lock(writeMut);
+
+            if (!available) {
+                connect();
+            }
+
+            sendRequest(remote);
+        }
+
+        /*
+         * We use one call thread to check response,
+         * other thread will wait on RPC call complete.
+         */
+        while (client.isRunning()) {
+            if (remote->finished()) {
+                /*
+                 * Current RPC call has finished.
+                 * Wake up another thread to check response.
+                 */
+                wakeupOneCaller(remote->getIdentity());
+                break;
+            }
+
+            unique_lock<mutex> lock(readMut, defer_lock_t());
+
+            if (lock.try_lock()) {
+                /*
+                 * Current thread will check response.
+                 */
+                checkOneResponse();
+            } else {
+                /*
+                 * Another thread checks response, just wait.
+                 */
+                remote->wait();
+            }
+        }
+    } catch (const HdfsNetworkConnectException & e) {
+        try {
+            NESTED_THROW(HdfsFailoverException,
+                         "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                         call.getName(), key.getServer().getHost().c_str(),
+                         key.getServer().getPort().c_str());
+        } catch (const HdfsFailoverException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsNetworkException & e) {
+        try {
+            NESTED_THROW(HdfsRpcException,
+                         "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                         call.getName(), key.getServer().getHost().c_str(),
+                         key.getServer().getPort().c_str());
+        } catch (const HdfsRpcException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsTimeoutException & e) {
+        try {
+            NESTED_THROW(HdfsFailoverException,
+                         "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                         call.getName(), key.getServer().getHost().c_str(),
+                         key.getServer().getPort().c_str());
+        } catch (const HdfsFailoverException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsRpcException & e) {
+        lastError = current_exception();
+    } catch (const HdfsIOException & e) {
+        try {
+            NESTED_THROW(HdfsRpcException,
+                         "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                         call.getName(), key.getServer().getHost().c_str(),
+                         key.getServer().getPort().c_str());
+        } catch (const HdfsRpcException & e) {
+            lastError = current_exception();
+        }
+    }
+
+    return lastError;
+}
+
+void RpcChannelImpl::invoke(const RpcCall & call) {
+    assert(refs > 0);
+    RpcRemoteCallPtr remote;
+    exception_ptr lastError;
+
+    try {
+        bool retry = false;
+
+        do {
+            int32_t id = client.getCallId();
+            remote = RpcRemoteCallPtr(new RpcRemoteCall(call, id, client.getClientId()));
+            lastError = exception_ptr();
+            lastError = invokeInternal(remote);
+
+            if (lastError) {
+                lock_guard<mutex> lock(writeMut);
+                shutdown(lastError);
+
+                if (!retry && call.isIdempotent()) {
+                    retry = true;
+                    std::string buffer;
+                    LOG(LOG_ERROR,
+                        "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s",
+                        call.getName(), key.getServer().getHost().c_str(),
+                        key.getServer().getPort().c_str(),
+                        GetExceptionDetail(lastError, buffer));
+                    LOG(INFO,
+                        "Retry idempotent RPC call \"%s\" on server \"%s:%s\"",
+                        call.getName(), key.getServer().getHost().c_str(),
+                        key.getServer().getPort().c_str());
+                } else {
+                    rethrow_exception(lastError);
+                }
+            } else {
+                break;
+            }
+        } while (retry);
+    } catch (const HdfsRpcServerException & e) {
+        if (!remote->finished()) {
+            /*
+             * a fatal error happened, the caller will unwrap it.
+             */
+            lock_guard<mutex> lock(writeMut);
+            lastError = current_exception();
+            shutdown(lastError);
+        }
+
+        /*
+         * else not a fatal error, check again at the end of this function.
+         */
+    } catch (const HdfsException & e) {
+        lock_guard<mutex> lock(writeMut);
+        lastError = current_exception();
+        shutdown(lastError);
+    }
+
+    /*
+     * if the call is not finished, either failed to setup connection,
+     * or client is closing.
+     */
+    if (!remote->finished() || !client.isRunning()) {
+        lock_guard<mutex> lock(writeMut);
+
+        if (lastError == exception_ptr()) {
+            try {
+                THROW(Hdfs::HdfsRpcException,
+                      "Failed to invoke RPC call \"%s\", RPC channel to \"%s:%s\" is to be closed since RpcClient is closing",
+                      call.getName(), key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+            } catch (...) {
+                lastError = current_exception();
+            }
+        }
+
+        /*
+         * wake up all.
+         */
+        shutdown(lastError);
+        rethrow_exception(lastError);
+    }
+
+    remote->check();
+}
+
+void RpcChannelImpl::shutdown(exception_ptr reason) {
+    assert(reason != exception_ptr());
+    available = false;
+    cleanupPendingCalls(reason);
+    sock->close();
+}
+
+void RpcChannelImpl::wakeupOneCaller(int32_t id) {
+    lock_guard<mutex> lock(writeMut);
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+    e = pendingCalls.end();
+
+    for (s = pendingCalls.begin(); s != e; ++s) {
+        if (s->first != id) {
+            s->second->wakeup();
+            return;
+        }
+    }
+}
+
+void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) {
+    WriteBuffer buffer;
+    assert(true == available);
+    remote->serialize(key.getProtocol(), buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+    uint32_t id = remote->getIdentity();
+    pendingCalls[id] = remote;
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) {
+    assert(!writeMut.try_lock());
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+    e = pendingCalls.end();
+
+    for (s = pendingCalls.begin(); s != e; ++s) {
+        s->second->cancel(reason);
+    }
+
+    pendingCalls.clear();
+}
+
+void RpcChannelImpl::checkOneResponse() {
+    int ping = key.getConf().getPingTimeout();
+    int timeout = key.getConf().getRpcTimeout();
+    steady_clock::time_point start = steady_clock::now();
+
+    while (client.isRunning()) {
+        if (getResponse()) {
+            readOneResponse(true);
+            return;
+        } else {
+            if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) {
+                lock_guard<mutex> lock(writeMut);
+                sendPing();
+            }
+        }
+
+        if (timeout > 0 && ToMilliSeconds(start, steady_clock::now()) >= timeout) {
+            try {
+                THROW(Hdfs::HdfsTimeoutException, "Timeout when wait for response from RPC channel \"%s:%s\"",
+                      key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+            } catch (...) {
+                NESTED_THROW(Hdfs::HdfsRpcException, "Timeout when wait for response from RPC channel \"%s:%s\"",
+                             key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+            }
+        }
+    }
+}
+
+void RpcChannelImpl::sendPing() {
+    static const std::vector<char> pingRequest = RpcRemoteCall::GetPingRequest(client.getClientId());
+
+    if (available) {
+        LOG(INFO,
+            "RPC channel to \"%s:%s\" got no response or idle for %d milliseconds, sending ping.",
+            key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), key.getConf().getPingTimeout());
+        sock->writeFully(&pingRequest[0], pingRequest.size(), key.getConf().getWriteTimeout());
+        lastActivity = steady_clock::now();
+    }
+}
+
+bool RpcChannelImpl::checkIdle() {
+    unique_lock<mutex> lock(writeMut, defer_lock_t());
+
+    if (lock.try_lock()) {
+        if (!pendingCalls.empty() || refs > 0) {
+            lastIdle = steady_clock::now();
+            return false;
+        }
+
+        int idle = key.getConf().getMaxIdleTime();
+        int ping = key.getConf().getPingTimeout();
+
+        try {
+            //close the connection if idle timeout
+            if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) {
+                sock->close();
+                return true;
+            }
+
+            //send ping
+            if (ping > 0 && ToMilliSeconds(lastActivity, steady_clock::now()) >= ping) {
+                sendPing();
+            }
+        } catch (...) {
+            std::string buffer;
+            LOG(LOG_ERROR,
+                "Failed to send ping via idle RPC channel to server \"%s:%s\": "
+                "\n%s",
+                key.getServer().getHost().c_str(),
+                key.getServer().getPort().c_str(),
+                GetExceptionDetail(current_exception(), buffer));
+            sock->close();
+            return true;
+        }
+    }
+
+    return false;
+}
+
+void RpcChannelImpl::waitForExit() {
+    assert(!client.isRunning());
+
+    while (refs != 0) {
+        sleep_for(milliseconds(100));
+    }
+
+    assert(pendingCalls.empty());
+}
+
+/**
+ * Write the connection header - this is sent when connection is established
+ * +----------------------------------+
+ * |  "hrpc" 4 bytes                  |
+ * +----------------------------------+
+ * |  Version (1 byte)                |
+ * +----------------------------------+
+ * |  Service Class (1 byte)          |
+ * +----------------------------------+
+ * |  AuthProtocol (1 byte)           |
+ * +----------------------------------+
+ */
+void RpcChannelImpl::sendConnectionHeader(const RpcAuth &auth) {
+    WriteBuffer buffer;
+    buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC));
+    buffer.write(static_cast<char>(RPC_HEADER_VERSION));
+    buffer.write(static_cast<char>(0));  //for future feature
+    buffer.write(static_cast<char>(auth.getProtocol()));
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+}
+
+void RpcChannelImpl::buildConnectionContext(
+    IpcConnectionContextProto & connectionContext, const RpcAuth & auth) {
+    connectionContext.set_protocol(key.getProtocol().getProtocol());
+    std::string euser = key.getAuth().getUser().getPrincipal();
+    std::string ruser = key.getAuth().getUser().getRealUser();
+
+    if (auth.getMethod() != AuthMethod::TOKEN) {
+        UserInformationProto * user = connectionContext.mutable_userinfo();
+        user->set_effectiveuser(euser);
+
+        if (auth.getMethod() != AuthMethod::SIMPLE) {
+            if (!ruser.empty() && ruser != euser) {
+                user->set_realuser(ruser);
+            }
+        }
+    }
+}
+
+void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) {
+    WriteBuffer buffer;
+    IpcConnectionContextProto connectionContext;
+    RpcRequestHeaderProto rpcHeader;
+    buildConnectionContext(connectionContext, auth);
+    rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID);
+    rpcHeader.set_clientid(client.getClientId());
+    rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    RpcContentWrapper wrapper(&rpcHeader, &connectionContext);
+    int size = wrapper.getLength();
+    buffer.writeBigEndian(size);
+    wrapper.writeTo(buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) {
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator it;
+    it = pendingCalls.find(id);
+
+    if (it == pendingCalls.end()) {
+        THROW(HdfsRpcException,
+              "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot find pending call: id = %d.",
+              key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), static_cast<int>(id));
+    }
+
+    RpcRemoteCallPtr rc = it->second;
+    pendingCalls.erase(it);
+    return rc;
+}
+
+bool RpcChannelImpl::getResponse() {
+    int idleTimeout = key.getConf().getMaxIdleTime();
+    int pingTimeout = key.getConf().getPingTimeout();
+    int timeout = key.getConf().getRpcTimeout();
+    int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout;
+    interval /= 2;
+    interval = interval < timeout ? interval : timeout;
+    steady_clock::time_point s = steady_clock::now();
+
+    while (client.isRunning()) {
+        if (in->poll(500)) {
+            return true;
+        }
+
+        if (ToMilliSeconds(s, steady_clock::now()) >= interval) {
+            return false;
+        }
+    }
+
+    return false;
+}
+
+static exception_ptr HandlerRpcResponseException(exception_ptr e) {
+    exception_ptr retval = e;
+
+    try {
+        rethrow_exception(e);
+    } catch (const HdfsRpcServerException & e) {
+        UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException, UnsupportedOperationException,
+                  AccessControlException, SafeModeException, SaslException > unwrapper(e);
+
+        try {
+            unwrapper.unwrap(__FILE__, __LINE__);
+        } catch (const NameNodeStandbyException & e) {
+            retval = current_exception();
+        } catch (const UnsupportedOperationException & e) {
+            retval = current_exception();
+        } catch (const AccessControlException & e) {
+            retval = current_exception();
+        } catch (const SafeModeException & e) {
+            retval = current_exception();
+        } catch (const SaslException & e) {
+            retval = current_exception();
+        } catch (const RpcNoSuchMethodException & e) {
+            retval = current_exception();
+        } catch (const HdfsIOException & e) {
+        }
+    }
+
+    return retval;
+}
+
+void RpcChannelImpl::readOneResponse(bool writeLock) {
+    int readTimeout = key.getConf().getReadTimeout();
+    std::vector<char> buffer(128);
+    RpcResponseHeaderProto curRespHeader;
+    RpcResponseHeaderProto::RpcStatusProto status;
+    uint32_t headerSize = 0, bodySize = 0;
+    in->readBigEndianInt32(readTimeout);
+    /*
+     * read response header
+     */
+    headerSize = in->readVarint32(readTimeout);
+    buffer.resize(headerSize);
+    in->readFully(&buffer[0], headerSize, readTimeout);
+
+    if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
+        THROW(HdfsRpcException,
+              "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header.",
+              key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
+    }
+
+    lastActivity = steady_clock::now();
+    status = curRespHeader.status();
+
+    if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) {
+        /*
+         * on success, read response body
+         */
+        RpcRemoteCallPtr rc;
+
+        if (writeLock) {
+            lock_guard<mutex> lock(writeMut);
+            rc = getPendingCall(curRespHeader.callid());
+        } else {
+            rc = getPendingCall(curRespHeader.callid());
+        }
+
+        bodySize = in->readVarint32(readTimeout);
+        buffer.resize(bodySize);
+
+        if (bodySize > 0) {
+            in->readFully(&buffer[0], bodySize, readTimeout);
+        }
+
+        Message * response = rc->getCall().getResponse();
+
+        if (!response->ParseFromArray(&buffer[0], bodySize)) {
+            THROW(HdfsRpcException,
+                  "RPC channel to \"%s:%s\" got protocol mismatch: rpc channel cannot parse response.",
+                  key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
+        }
+
+        rc->done();
+    } else {
+        /*
+         * on error, read error class and message
+         */
+        std::string errClass, errMessage;
+        errClass = curRespHeader.exceptionclassname();
+        errMessage = curRespHeader.errormsg();
+
+        if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) {
+            RpcRemoteCallPtr rc;
+            {
+                lock_guard<mutex> lock(writeMut);
+                rc = getPendingCall(curRespHeader.callid());
+            }
+
+            try {
+                THROW(HdfsRpcServerException, "%s: %s",
+                      errClass.c_str(), errMessage.c_str());
+            } catch (HdfsRpcServerException & e) {
+                e.setErrClass(errClass);
+                e.setErrMsg(errMessage);
+                rc->cancel(HandlerRpcResponseException(current_exception()));
+            }
+        } else { /*fatal*/
+            assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status);
+
+            if (errClass.empty()) {
+                THROW(HdfsRpcException, "%s: %s",
+                      errClass.c_str(), errMessage.c_str());
+            }
+
+            try {
+                THROW(HdfsRpcServerException, "%s: %s", errClass.c_str(),
+                      errMessage.c_str());
+            } catch (HdfsRpcServerException & e) {
+                e.setErrClass(errClass);
+                e.setErrMsg(errMessage);
+                rethrow_exception(HandlerRpcResponseException(current_exception()));
+            }
+        }
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcChannel.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannel.h b/depends/libhdfs3/src/rpc/RpcChannel.h
new file mode 100644
index 0000000..607b243
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcChannel.h
@@ -0,0 +1,276 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+
+#include "Atomic.h"
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "Memory.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "RpcCall.h"
+#include "RpcChannelKey.h"
+#include "RpcHeader.pb.h"
+#include "RpcRemoteCall.h"
+#include "SaslClient.h"
+#include "Thread.h"
+#include "Unordered.h"
+
+#include <google/protobuf/message.h>
+
+namespace Hdfs {
+namespace Internal {
+
+class RpcClient;
+
+class RpcChannel {
+public:
+    /**
+     * Destroy a channel
+     */
+    virtual ~RpcChannel() {
+    }
+
+    /**
+     * The caller finished the rpc call,
+     * this channel may be reused later if immediate is false.
+     * @param immediate Do not reuse the channel any more if immediate is true.
+     */
+    virtual void close(bool immediate) = 0;
+
+    /**
+     * Invoke a rpc call.
+     * @param call The call is to be invoked.
+     * @return The remote call object.
+     */
+    virtual void invoke(const RpcCall & call) = 0;
+
+    /**
+     * Close the channel if it idle expired.
+     * @return true if the channel idle expired.
+     */
+    virtual bool checkIdle() = 0;
+
+    /**
+     * Wait for all reference exiting.
+     * The channel cannot be reused any more.
+     * @pre RpcClient is not running.
+     */
+    virtual void waitForExit() = 0;
+
+    /**
+     * Add reference count to this channel.
+     */
+    virtual void addRef() = 0;
+};
+
+/**
+ * RpcChannel represent a rpc connect to the server.
+ */
+class RpcChannelImpl: public RpcChannel {
+public:
+    /**
+     * Construct a RpcChannelImpl instance.
+     * @param k The key of this channel.
+     */
+    RpcChannelImpl(const RpcChannelKey & k, RpcClient & c);
+
+    /**
+     * Destroy a RpcChannelImpl instance.
+     */
+    ~RpcChannelImpl();
+
+    /**
+     * The caller finished the rpc call,
+     * this channel may be reused later if immediate is false.
+     * @param immediate Do not reuse the channel any more if immediate is true.
+     */
+    void close(bool immediate);
+
+    /**
+     * Invoke a rpc call.
+     * @param call The call is to be invoked.
+     * @return The remote call object.
+     */
+    void invoke(const RpcCall & call);
+
+    /**
+     * Close the channel if it idle expired.
+     * @return true if the channel idle expired.
+     */
+    bool checkIdle();
+
+    /**
+     * Wait for all reference exiting.
+     * The channel cannot be reused any more.
+     * @pre RpcClient is not running.
+     */
+    void waitForExit();
+
+    /**
+     * Add reference count to this channel.
+     */
+    void addRef() {
+        ++refs;
+    }
+
+private:
+    /**
+     * Setup the RPC connection.
+     * @pre Already hold write lock.
+     */
+    void connect();
+
+    /**
+     * Cleanup all pending calls.
+     * @param reason The reason to cancel the call.
+     * @pre Already hold write lock.
+     */
+    void cleanupPendingCalls(exception_ptr reason);
+
+    /**
+     * Send rpc connect protocol header.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void sendConnectionHeader(const RpcAuth& auth);
+
+    /**
+     * Send rpc connection protocol content.
+     */
+    void sendConnectionContent(const RpcAuth & auth);
+
+    /**
+     * Build rpc connect context.
+     */
+    void buildConnectionContext(IpcConnectionContextProto & connectionContext, const RpcAuth & auth);
+
+    /**
+     * Send ping packet to server.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     * @pre Caller should hold the write lock.
+     */
+    void sendPing();
+
+    /**
+     * Send the call message to rpc server.
+     * @param remote The remote call.
+     * @pre Already hold write lock.
+     */
+    void sendRequest(RpcRemoteCallPtr remote);
+
+    /**
+     * Issue a rpc call and check response.
+     * Catch all recoverable error in this function
+     *
+     * @param remote The remote call
+     */
+    exception_ptr invokeInternal(RpcRemoteCallPtr remote);
+
+    /**
+     * Check response, block until get one response.
+     * @pre Channel already hold read lock.
+     */
+    void checkOneResponse();
+
+    /**
+     * read and handle one response.
+     * @pre Channel already hold read lock.
+     */
+    void readOneResponse(bool writeLock);
+
+    /**
+     * Get the call object with given id, and then remove it from pending call list.
+     * @param id The id of the call object to be returned.
+     * @return The call object with given id.
+     * @throw HdfsIOException
+     * @pre Channel already locked.
+     */
+    RpcRemoteCallPtr getPendingCall(int32_t id);
+
+    /**
+     * Check if there is data available for reading on socket.
+     * @return true if response is available.
+     */
+    bool getResponse();
+
+    /**
+     * wake up one caller to check response.
+     * @param id The call id which current caller handled.
+     */
+    void wakeupOneCaller(int32_t id);
+
+    /**
+     * shutdown the RPC connection since error.
+     * @param reason The reason to cancel the call
+     * @pre Already hold write lock.
+     */
+    void shutdown(exception_ptr reason);
+
+    const RpcSaslProto_SaslAuth * createSaslClient(
+        const ::google::protobuf::RepeatedPtrField<RpcSaslProto_SaslAuth> * auths);
+
+    void sendSaslMessage(RpcSaslProto * msg, ::google::protobuf::Message * resp);
+
+    std::string saslEvaluateToken(RpcSaslProto & response, bool serverIsDone);
+
+    RpcAuth setupSaslConnection();
+
+private:
+    /**
+     * Construct a RpcChannelImpl instance for test.
+     * @param key The key of this channel.
+     * @param sock The socket instance.
+     * @param in The BufferedSocketReader instance build on sock.
+     * @param client The RpcClient instance.
+     */
+    RpcChannelImpl(const RpcChannelKey & key, Socket * sock,
+                   BufferedSocketReader * in, RpcClient & client);
+
+private:
+    atomic<int> refs;
+    bool available;
+    mutex readMut;
+    mutex writeMut;
+    RpcChannelKey key;
+    RpcClient & client;
+    shared_ptr<BufferedSocketReader> in;
+    shared_ptr<SaslClient> saslClient;
+    shared_ptr<Socket> sock;
+    steady_clock::time_point lastActivity; // ping is a kind of activity, lastActivity will be updated after ping
+    steady_clock::time_point lastIdle; // ping cannot change idle state. If there is still pending calls, lastIdle is always "NOW".
+    unordered_map<int32_t, RpcRemoteCallPtr> pendingCalls;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcChannelKey.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannelKey.cpp b/depends/libhdfs3/src/rpc/RpcChannelKey.cpp
new file mode 100644
index 0000000..73dc653
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcChannelKey.cpp
@@ -0,0 +1,55 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RpcChannelKey.h"
+
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+RpcChannelKey::RpcChannelKey(const RpcAuth & a, const RpcProtocolInfo & p,
+                             const RpcServerInfo & s, const RpcConfig & c) :
+    auth(a), conf(c), protocol(p), server(s) {
+    const Token * temp = auth.getUser().selectToken(protocol.getTokenKind(),
+                         server.getTokenService());
+
+    if (temp) {
+        token = shared_ptr<Token> (new Token(*temp));
+    }
+}
+
+size_t RpcChannelKey::hash_value() const {
+    size_t tokenHash = token ? token->hash_value() : 0;
+    size_t values[] = { auth.hash_value(), protocol.hash_value(),
+                        server.hash_value(), conf.hash_value(), tokenHash
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcChannelKey.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannelKey.h b/depends/libhdfs3/src/rpc/RpcChannelKey.h
new file mode 100644
index 0000000..5eb9293
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcChannelKey.h
@@ -0,0 +1,95 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+
+#include "client/Token.h"
+#include "Hash.h"
+#include "RpcAuth.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include <Memory.h>
+
+namespace Hdfs {
+namespace Internal {
+
+class RpcChannelKey {
+public:
+    RpcChannelKey(const RpcAuth & a, const RpcProtocolInfo & p,
+                  const RpcServerInfo & s, const RpcConfig & c);
+
+public:
+    size_t hash_value() const;
+
+    const RpcAuth & getAuth() const {
+        return auth;
+    }
+
+    const RpcConfig & getConf() const {
+        return conf;
+    }
+
+    const RpcProtocolInfo & getProtocol() const {
+        return protocol;
+    }
+
+    const RpcServerInfo & getServer() const {
+        return server;
+    }
+
+    bool operator ==(const RpcChannelKey & other) const {
+        return this->auth == other.auth && this->protocol == other.protocol
+               && this->server == other.server && this->conf == other.conf
+               && ((token == NULL && other.token == NULL)
+                   || (token && other.token && *token == *other.token));
+    }
+
+    const Token & getToken() const {
+        assert(token != NULL);
+        return *token;
+    }
+
+    bool hasToken() {
+        return token != NULL;
+    }
+
+private:
+    const RpcAuth auth;
+    const RpcConfig conf;
+    const RpcProtocolInfo protocol;
+    const RpcServerInfo server;
+    shared_ptr<Token> token;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::RpcChannelKey);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcClient.cpp b/depends/libhdfs3/src/rpc/RpcClient.cpp
new file mode 100644
index 0000000..a2fe8da
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcClient.cpp
@@ -0,0 +1,198 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Logger.h"
+#include "Memory.h"
+#include "RpcClient.h"
+#include "Thread.h"
+
+#include <uuid/uuid.h>
+
+namespace Hdfs {
+namespace Internal {
+
+once_flag RpcClient::once;
+shared_ptr<RpcClient> RpcClient::client;
+
+void RpcClient::createSinglten() {
+    client = shared_ptr < RpcClient > (new RpcClientImpl());
+}
+
+RpcClient & RpcClient::getClient() {
+    call_once(once, &RpcClientImpl::createSinglten);
+    assert(client);
+    return *client;
+}
+
+RpcClientImpl::RpcClientImpl() :
+    cleaning(false), running(true), count(0) {
+    uuid_t id;
+    uuid_generate(id);
+    clientId.resize(sizeof(uuid_t));
+    memcpy(&clientId[0], id, sizeof(uuid_t));
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+RpcClientImpl::~RpcClientImpl() {
+    running = false;
+    cond.notify_all();
+
+    if (cleaner.joinable()) {
+        cleaner.join();
+    }
+
+    close();
+}
+
+void RpcClientImpl::clean() {
+    assert(cleaning);
+
+    try {
+        while (running) {
+            try {
+                unique_lock<mutex> lock(mut);
+                cond.wait_for(lock, seconds(1));
+
+                if (!running || allChannels.empty()) {
+                    break;
+                }
+
+                unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+                e = allChannels.end();
+
+                for (s = allChannels.begin(); s != e;) {
+                    if (s->second->checkIdle()) {
+                        s->second.reset();
+                        s = allChannels.erase(s);
+                    } else {
+                        ++s;
+                    }
+                }
+            } catch (const HdfsCanceled & e) {
+                /*
+                 * ignore cancel signal here.
+                 */
+            }
+        }
+    } catch (const Hdfs::HdfsException & e) {
+        std::string buffer;
+        LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s",
+            GetExceptionDetail(e, buffer));
+    } catch (const std::exception & e) {
+        LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what());
+    }
+
+    cleaning = false;
+}
+
+void RpcClientImpl::close() {
+    lock_guard<mutex> lock(mut);
+    running = false;
+    unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+    e = allChannels.end();
+
+    for (s = allChannels.begin(); s != e; ++s) {
+        s->second->waitForExit();
+    }
+
+    allChannels.clear();
+}
+
+bool RpcClientImpl::isRunning() {
+    return running;
+}
+
+RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth,
+                                       const RpcProtocolInfo & protocol, const RpcServerInfo & server,
+                                       const RpcConfig & conf) {
+    shared_ptr<RpcChannel> rc;
+    RpcChannelKey key(auth, protocol, server, conf);
+
+    try {
+        lock_guard<mutex> lock(mut);
+
+        if (!running) {
+            THROW(Hdfs::HdfsRpcException,
+                  "Cannot Setup RPC channel to \"%s:%s\" since RpcClient is closing",
+                  key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+        }
+
+        unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator it;
+        it = allChannels.find(key);
+
+        if (it != allChannels.end()) {
+            rc = it->second;
+        } else {
+            rc = createChannelInternal(key);
+            allChannels[key] = rc;
+        }
+
+        rc->addRef();
+
+        if (!cleaning) {
+            cleaning = true;
+
+            if (cleaner.joinable()) {
+                cleaner.join();
+            }
+
+            CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this));
+        }
+    } catch (const HdfsRpcException & e) {
+        throw;
+    } catch (...) {
+        NESTED_THROW(HdfsRpcException,
+                     "RpcClient failed to create a channel to \"%s:%s\"",
+                     server.getHost().c_str(), server.getPort().c_str());
+    }
+
+    return *rc;
+}
+
+shared_ptr<RpcChannel> RpcClientImpl::createChannelInternal(
+    const RpcChannelKey & key) {
+    shared_ptr<RpcChannel> channel;
+#ifdef MOCK
+
+    if (stub) {
+        channel = shared_ptr < RpcChannel > (stub->getChannel(key, *this));
+    } else {
+        channel = shared_ptr < RpcChannel > (new RpcChannelImpl(key, *this));
+    }
+
+#else
+    channel = shared_ptr<RpcChannel>(new RpcChannelImpl(key, *this));
+#endif
+    return channel;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcClient.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcClient.h b/depends/libhdfs3/src/rpc/RpcClient.h
new file mode 100644
index 0000000..2cd5b34
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcClient.h
@@ -0,0 +1,165 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+
+#include "Memory.h"
+#include "RpcAuth.h"
+#include "RpcCall.h"
+#include "RpcChannel.h"
+#include "RpcChannelKey.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include "Thread.h"
+#include "Unordered.h"
+
+#include <vector>
+
+#ifdef MOCK
+#include "TestRpcChannelStub.h"
+#endif
+
+namespace Hdfs {
+namespace Internal {
+
+class RpcClient {
+public:
+    /**
+     * Destroy an RpcClient instance.
+     */
+    virtual ~RpcClient() {
+    }
+
+    /**
+     * Get a RPC channel, create a new one if necessary.
+     * @param auth Authentication information used to setup RPC connection.
+     * @param protocol The RPC protocol used in this call.
+     * @param server Remote server information.
+     * @param conf RPC connection configuration.
+     * @param once If true, the RPC channel will not be reused.
+     */
+    virtual RpcChannel & getChannel(const RpcAuth & auth,
+                                    const RpcProtocolInfo & protocol, const RpcServerInfo & server,
+                                    const RpcConfig & conf) = 0;
+
+    /**
+     * Check the RpcClient is still running.
+     * @return true if the RpcClient is still running.
+     */
+    virtual bool isRunning() = 0;
+
+    virtual std::string getClientId() const = 0;
+
+    virtual int32_t getCallId() = 0;
+
+public:
+    static RpcClient & getClient();
+    static void createSinglten();
+
+private:
+    static once_flag once;
+    static shared_ptr<RpcClient> client;
+};
+
+class RpcClientImpl: public RpcClient {
+public:
+    /**
+     * Construct a RpcClient.
+     */
+    RpcClientImpl();
+
+    /**
+     * Destroy an RpcClient instance.
+     */
+    ~RpcClientImpl();
+
+    /**
+     * Get a RPC channel, create a new one if necessary.
+     * @param auth Authentication information used to setup RPC connection.
+     * @param protocol The RPC protocol used in this call.
+     * @param server Remote server information.
+     * @param conf RPC connection configuration.
+     * @param once If true, the RPC channel will not be reused.
+     */
+    RpcChannel & getChannel(const RpcAuth & auth,
+                            const RpcProtocolInfo & protocol, const RpcServerInfo & server,
+                            const RpcConfig & conf);
+
+    /**
+     * Close the RPC channel.
+     */
+    void close();
+
+    /**
+     * Check the RpcClient is still running.
+     * @return true if the RpcClient is still running.
+     */
+    bool isRunning();
+
+    std::string getClientId() const {
+        return clientId;
+    }
+
+    int32_t getCallId() {
+        static mutex mutid;
+        lock_guard<mutex> lock(mutid);
+        ++count;
+        count = count < std::numeric_limits<int32_t>::max() ? count : 0;
+        return count;
+    }
+
+private:
+    shared_ptr<RpcChannel> createChannelInternal(
+        const RpcChannelKey & key);
+
+    void clean();
+
+private:
+    atomic<bool> cleaning;
+    atomic<bool> running;
+    condition_variable cond;
+    int64_t count;
+    mutex mut;
+    std::string clientId;
+    thread cleaner;
+    unordered_map<RpcChannelKey, shared_ptr<RpcChannel> > allChannels;
+
+#ifdef MOCK
+private:
+    /*
+     * for test
+     */
+    Hdfs::Mock::TestRpcChannelStub * stub;
+#endif
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcConfig.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcConfig.cpp b/depends/libhdfs3/src/rpc/RpcConfig.cpp
new file mode 100644
index 0000000..baa96e4
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcConfig.cpp
@@ -0,0 +1,45 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RpcConfig.h"
+
+#include <vector>
+
+namespace Hdfs {
+namespace Internal {
+
+size_t RpcConfig::hash_value() const {
+    size_t values[] = { Int32Hasher(maxIdleTime), Int32Hasher(pingTimeout),
+                        Int32Hasher(connectTimeout), Int32Hasher(readTimeout), Int32Hasher(
+                            writeTimeout), Int32Hasher(maxRetryOnConnect), Int32Hasher(
+                            lingerTimeout), Int32Hasher(rpcTimeout), BoolHasher(tcpNoDelay)
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcConfig.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcConfig.h b/depends/libhdfs3/src/rpc/RpcConfig.h
new file mode 100644
index 0000000..be71019
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcConfig.h
@@ -0,0 +1,155 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+
+#include "Hash.h"
+#include "SessionConfig.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class RpcConfig {
+public:
+
+    RpcConfig(const SessionConfig & conf) {
+        connectTimeout = conf.getRpcConnectTimeout();
+        maxIdleTime = conf.getRpcMaxIdleTime();
+        maxRetryOnConnect = conf.getRpcMaxRetryOnConnect();
+        pingTimeout = conf.getRpcPingTimeout();
+        readTimeout = conf.getRpcReadTimeout();
+        writeTimeout = conf.getRpcWriteTimeout();
+        tcpNoDelay = conf.isRpcTcpNoDelay();
+        lingerTimeout = conf.getRpcSocketLingerTimeout();
+        rpcTimeout = conf.getRpcTimeout();
+    }
+
+    size_t hash_value() const;
+
+    int getConnectTimeout() const {
+        return connectTimeout;
+    }
+
+    void setConnectTimeout(int connectTimeout) {
+        this->connectTimeout = connectTimeout;
+    }
+
+    int getMaxIdleTime() const {
+        return maxIdleTime;
+    }
+
+    void setMaxIdleTime(int maxIdleTime) {
+        this->maxIdleTime = maxIdleTime;
+    }
+
+    int getMaxRetryOnConnect() const {
+        return maxRetryOnConnect;
+    }
+
+    void setMaxRetryOnConnect(int maxRetryOnConnect) {
+        this->maxRetryOnConnect = maxRetryOnConnect;
+    }
+
+    int getReadTimeout() const {
+        return readTimeout;
+    }
+
+    void setReadTimeout(int readTimeout) {
+        this->readTimeout = readTimeout;
+    }
+
+    bool isTcpNoDelay() const {
+        return tcpNoDelay;
+    }
+
+    void setTcpNoDelay(bool tcpNoDelay) {
+        this->tcpNoDelay = tcpNoDelay;
+    }
+
+    int getWriteTimeout() const {
+        return writeTimeout;
+    }
+
+    void setWriteTimeout(int writeTimeout) {
+        this->writeTimeout = writeTimeout;
+    }
+
+    int getPingTimeout() const {
+        return pingTimeout;
+    }
+
+    void setPingTimeout(int maxPingTimeout) {
+        this->pingTimeout = maxPingTimeout;
+    }
+
+    int getLingerTimeout() const {
+        return lingerTimeout;
+    }
+
+    void setLingerTimeout(int lingerTimeout) {
+        this->lingerTimeout = lingerTimeout;
+    }
+
+    int getRpcTimeout() const {
+        return rpcTimeout;
+    }
+
+    void setRpcTimeout(int rpcTimeout) {
+        this->rpcTimeout = rpcTimeout;
+    }
+
+    bool operator ==(const RpcConfig & other) const {
+        return this->maxIdleTime == other.maxIdleTime
+               && this->pingTimeout == other.pingTimeout
+               && this->connectTimeout == other.connectTimeout
+               && this->readTimeout == other.readTimeout
+               && this->writeTimeout == other.writeTimeout
+               && this->maxRetryOnConnect == other.maxRetryOnConnect
+               && this->tcpNoDelay == other.tcpNoDelay
+               && this->lingerTimeout == other.lingerTimeout
+               && this->rpcTimeout == other.rpcTimeout;
+    }
+
+private:
+    int maxIdleTime;
+    int pingTimeout;
+    int connectTimeout;
+    int readTimeout;
+    int writeTimeout;
+    int maxRetryOnConnect;
+    int lingerTimeout;
+    int rpcTimeout;
+    bool tcpNoDelay;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::Hdfs::Internal::RpcConfig);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/rpc/RpcContentWrapper.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcContentWrapper.cpp b/depends/libhdfs3/src/rpc/RpcContentWrapper.cpp
new file mode 100644
index 0000000..8cc9a84
--- /dev/null
+++ b/depends/libhdfs3/src/rpc/RpcContentWrapper.cpp
@@ -0,0 +1,65 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <google/protobuf/io/coded_stream.h>
+
+#include "RpcContentWrapper.h"
+
+using namespace ::google::protobuf;
+using namespace ::google::protobuf::io;
+
+namespace Hdfs {
+namespace Internal {
+
+RpcContentWrapper::RpcContentWrapper(Message * header, Message * msg) :
+    header(header), msg(msg) {
+}
+
+int RpcContentWrapper::getLength() {
+    int headerLen, msgLen = 0;
+    headerLen = header->ByteSize();
+    msgLen = msg == NULL ? 0 : msg->ByteSize();
+    return headerLen + CodedOutputStream::VarintSize32(headerLen)
+           + (msg == NULL ?
+              0 : msgLen + CodedOutputStream::VarintSize32(msgLen));
+}
+
+void RpcContentWrapper::writeTo(WriteBuffer & buffer) {
+    int size = header->ByteSize();
+    buffer.writeVarint32(size);
+    header->SerializeToArray(buffer.alloc(size), size);
+
+    if (msg != NULL) {
+        size = msg->ByteSize();
+        buffer.writeVarint32(size);
+        msg->SerializeToArray(buffer.alloc(size), size);
+    }
+}
+
+}
+}
+



Mime
View raw message