hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [32/33] HDFS-7012. Add hdfs native client RPC functionality (Zhanwei Wang via Colin P. McCabe)
Date Tue, 07 Oct 2014 17:24:42 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
new file mode 100644
index 0000000..fafb7fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+#define _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+
+#include <string>
+
+namespace hdfs {
+
+/**
+ * Action is used to describe a action the user is permitted to apply on a file.
+ */
+enum Action {
+    NONE, //("---"),
+    EXECUTE, //("--x"),
+    WRITE, //("-w-"),
+    WRITE_EXECUTE, //("-wx"),
+    READ, //("r--"),
+    READ_EXECUTE, //("r-x"),
+    READ_WRITE, //("rw-"),
+    ALL //("rwx");
+};
+
+/**
+ * To test Action a if implies Action b
+ * @param a Action to be tested.
+ * @param b Action target.
+ * @return return true if a implies b.
+ */
+static inline bool implies(const Action &a, const Action &b) {
+    return (a & b) == b;
+}
+
+/**
+ * To construct a new Action using a and b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator &(const Action &a, const Action &b) {
+    return (Action)(((unsigned int) a) & (unsigned int) b);
+}
+/**
+ * To construct a new Action using a or b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator |(const Action &a, const Action &b) {
+    return (Action)(((unsigned int) a) | (unsigned int) b);
+}
+/**
+ * To construct a new Action of complementary of a given Action
+ * @param a Action to be used.
+ * @return return a new Action
+ */
+static inline Action operator ~(const Action &a) {
+    return (Action)(7 - (unsigned int) a);
+}
+
+/**
+ * To convert a Action to a readable string.
+ * @param a the Action to be convert.
+ * @return a readable string
+ */
+static inline std::string toString(const Action &a) {
+    switch (a) {
+    case NONE:
+        return "---";
+
+    case EXECUTE:
+        return "--x";
+
+    case WRITE:
+        return "-w-";
+
+    case WRITE_EXECUTE:
+        return "-wx";
+
+    case READ:
+        return "r--";
+
+    case READ_EXECUTE:
+        return "r-x";
+
+    case READ_WRITE:
+        return "rw-";
+
+    case ALL:
+        return "rwx";
+    }
+}
+
+/**
+ * Permission is used to describe a file permission.
+ */
+class Permission {
+public:
+    /**
+     * To construct a Permission.
+     * @param u owner permission.
+     * @param g group permission.
+     * @param o other user permission.
+     */
+    Permission(const Action &u, const Action &g, const Action &o) :
+        userAction(u), groupAction(g), otherAction(o), stickyBit(false) {
+    }
+
+    /**
+     * To construct a Permission from a uint16.
+     * @param mode permission flag.
+     */
+    Permission(uint16_t mode);
+
+public:
+    /**
+     * To get group permission
+     * @return the group permission
+     */
+    Action getGroupAction() const {
+        return groupAction;
+    }
+
+    /**
+     * To set group permission
+     * @param groupAction the group permission
+     */
+    void setGroupAction(Action groupAction) {
+        this->groupAction = groupAction;
+    }
+
+    /**
+     * To get other user permission
+     * @return other user permission
+     */
+    Action getOtherAction() const {
+        return otherAction;
+    }
+
+    /**
+     * To set other user permission
+     * @param otherAction other user permission
+     */
+    void setOtherAction(Action otherAction) {
+        this->otherAction = otherAction;
+    }
+
+    /**
+     * To get owner permission
+     * @return the owner permission
+     */
+    Action getUserAction() const {
+        return userAction;
+    }
+
+    /**
+     * To set owner permission
+     * @param userAction the owner permission
+     */
+    void setUserAction(Action userAction) {
+        this->userAction = userAction;
+    }
+
+    /**
+     * To convert a Permission to a readable string
+     * @return a readable string
+     */
+    std::string toString() const {
+        return hdfs::toString(userAction) + hdfs::toString(groupAction)
+               + hdfs::toString(otherAction);
+    }
+
+    /**
+     * To convert a Permission to a uint16 flag
+     * @return a uint16 flag
+     */
+    uint16_t toShort() const {
+        return (uint16_t)((((uint16_t) userAction) << 6)
+                          + (((uint16_t) groupAction) << 3) + (((uint16_t) otherAction))
+                          + ((stickyBit ? 1 << 9 : 0)));
+    }
+
+    bool operator ==(const Permission &other) const {
+        return userAction == other.userAction
+               && groupAction == other.groupAction
+               && otherAction == other.otherAction
+               && stickyBit == other.stickyBit;
+    }
+
+private:
+    Action userAction;
+    Action groupAction;
+    Action otherAction;
+
+    bool stickyBit;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
new file mode 100644
index 0000000..02a2ddd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BigEndian.h"
+#include "DataTransferProtocolSender.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "HWCrc32c.h"
+#include "RemoteBlockReader.h"
+#include "SWCrc32c.h"
+#include "WriteBuffer.h"
+#include "datatransfer.pb.h"
+
+#include <inttypes.h>
+#include <vector>
+
+using hadoop::hdfs::ClientReadStatusProto;
+using hadoop::hdfs::BlockOpResponseProto;
+using hadoop::hdfs::ChecksumProto;
+using hadoop::hdfs::ChecksumTypeProto;
+using hadoop::hdfs::ReadOpChecksumInfoProto;
+using hadoop::hdfs::Status;
+
+namespace hdfs {
+namespace internal {
+
+RemoteBlockReader::RemoteBlockReader(const ExtendedBlock &eb,
+          DatanodeInfo &datanode, int64_t start, int64_t len,
+          const Token &token, const char * clientName, bool verify,
+          SessionConfig &conf) :
+    verify(verify), datanode(datanode), binfo(eb), checksumSize(0),
+      chunkSize(0), position(0), size(0), cursor(
+        start), endOffset(len + start), lastSeqNo(-1) {
+    try {
+        assert(start >= 0);
+        readTimeout = conf.getInputReadTimeout();
+        writeTimeout = conf.getInputWriteTimeout();
+        connTimeout = conf.getInputConnTimeout();
+        sock = shared_ptr<Socket>(new TcpSocketImpl());
+        in = shared_ptr<BufferedSocketReader>(
+                      new BufferedSocketReaderImpl(*sock));
+        sock->connect(datanode.getIpAddr().c_str(), datanode.getXferPort(),
+                      connTimeout);
+        sender = shared_ptr<DataTransferProtocol>(
+                     new DataTransferProtocolSender(*sock, writeTimeout,
+                                                    datanode.formatAddress()));
+        sender->readBlock(eb, token, clientName, start, len);
+        checkResponse();
+    } catch (const HdfsTimeoutException &e) {
+        NESTED_THROW(HdfsIOException,
+                     "RemoteBlockReader: Failed to setup remote block reader "
+                     "for block %s from node %s",
+                     eb.toString().c_str(), datanode.formatAddress().c_str());
+    }
+}
+
+RemoteBlockReader::~RemoteBlockReader() {
+    sock->close();
+}
+
+void RemoteBlockReader::checkResponse() {
+    std::vector<char> respBuffer;
+    int32_t respSize = in->readVarint32(readTimeout);
+
+    if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
+        THROW(HdfsIOException, "RemoteBlockReader get a invalid response "
+              "size: %d, Block: %s, from Datanode: %s",
+              respSize, binfo.toString().c_str(),
+              datanode.formatAddress().c_str());
+    }
+
+    respBuffer.resize(respSize);
+    in->readFully(&respBuffer[0], respSize, readTimeout);
+    BlockOpResponseProto resp;
+
+    if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
+        THROW(HdfsIOException, "RemoteBlockReader cannot parse "
+              "BlockOpResponseProto from Datanode response, "
+              "Block: %s, from Datanode: %s", binfo.toString().c_str(),
+              datanode.formatAddress().c_str());
+    }
+
+    if (resp.status() != hadoop::hdfs::SUCCESS) {
+        std::string msg;
+
+        if (resp.has_message()) {
+            msg = resp.message();
+        }
+
+        if (resp.status() == hadoop::hdfs::ERROR_ACCESS_TOKEN) {
+            THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token "
+                  "is invalid. Datanode: %s, Block: %s",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str());
+        } else {
+            THROW(HdfsIOException,
+                  "RemoteBlockReader: Datanode return an error when sending "
+                  "read request to Datanode: %s, Block: %s, %s.",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str(),
+                  (msg.empty() ? "check Datanode's log for more information" :
+                     msg.c_str()));
+        }
+    }
+
+    const ReadOpChecksumInfoProto &checksumInfo = resp.readopchecksuminfo();
+    const ChecksumProto &cs = checksumInfo.checksum();
+    chunkSize = cs.bytesperchecksum();
+
+    if (chunkSize < 0) {
+        THROW(HdfsIOException,
+              "RemoteBlockReader invalid chunk size: %d, expected range[0, %"
+              PRId64 "], Block: %s, from Datanode: %s",
+              chunkSize, binfo.getNumBytes(), binfo.toString().c_str(),
+              datanode.formatAddress().c_str());
+    }
+
+    switch (cs.type()) {
+    case ChecksumTypeProto::CHECKSUM_NULL:
+        verify = false;
+        checksumSize = 0;
+        break;
+
+    case ChecksumTypeProto::CHECKSUM_CRC32:
+        THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 "
+              "checksum, Block: %s, from Datanode: %s",
+              binfo.toString().c_str(), datanode.formatAddress().c_str());
+        break;
+
+    case ChecksumTypeProto::CHECKSUM_CRC32C:
+        if (HWCrc32c::available()) {
+            checksum = shared_ptr<Checksum>(new HWCrc32c());
+        } else {
+            checksum = shared_ptr<Checksum>(new SWCrc32c());
+        }
+
+        checksumSize = sizeof(int32_t);
+        break;
+
+    default:
+        THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum "
+              "type: %d, Block: %s, from Datanode: %s",
+              static_cast<int>(cs.type()), binfo.toString().c_str(),
+              datanode.formatAddress().c_str());
+    }
+
+    /*
+     * The offset into the block at which the first packet
+     * will start. This is necessary since reads will align
+     * backwards to a checksum chunk boundary.
+     */
+    int64_t firstChunkOffset = checksumInfo.chunkoffset();
+
+    if (firstChunkOffset < 0 || firstChunkOffset > cursor ||
+          firstChunkOffset <= cursor - chunkSize) {
+        THROW(HdfsIOException,
+              "RemoteBlockReader invalid first chunk offset: %" PRId64
+              ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s",
+              firstChunkOffset, cursor, binfo.toString().c_str(),
+              datanode.formatAddress().c_str());
+    }
+}
+
+shared_ptr<PacketHeader> RemoteBlockReader::readPacketHeader() {
+    try {
+        shared_ptr<PacketHeader> retval;
+        static const int packetHeaderLen = PacketHeader::GetPkgHeaderSize();
+        std::vector<char> buf(packetHeaderLen);
+
+        if (lastHeader && lastHeader->isLastPacketInBlock()) {
+            THROW(HdfsIOException, "RemoteBlockReader: read over block end "
+                 "from Datanode: %s, Block: %s.",
+                 datanode.formatAddress().c_str(), binfo.toString().c_str());
+        }
+
+        in->readFully(&buf[0], packetHeaderLen, readTimeout);
+        retval = shared_ptr<PacketHeader>(new PacketHeader);
+        retval->readFields(&buf[0], packetHeaderLen);
+        return retval;
+    } catch (const HdfsIOException &e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+                 "block header for Block: %s from Datanode: %s.",
+                 binfo.toString().c_str(), datanode.formatAddress().c_str());
+    }
+}
+
+void RemoteBlockReader::readNextPacket() {
+    assert(position >= size);
+    lastHeader = readPacketHeader();
+    int dataSize = lastHeader->getDataLen();
+    int64_t pendingAhead = 0;
+
+    if (!lastHeader->sanityCheck(lastSeqNo)) {
+        THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity "
+              "check for block %s from Datanode %s.",
+              binfo.toString().c_str(), datanode.formatAddress().c_str());
+    }
+
+    assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t));
+
+    if (dataSize > 0) {
+        int chunks = (dataSize + chunkSize - 1) / chunkSize;
+        int checksumLen = chunks * checksumSize;
+        size = checksumLen + dataSize;
+        assert(size ==
+             lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t)));
+        buffer.resize(size);
+        in->readFully(&buffer[0], size, readTimeout);
+        lastSeqNo = lastHeader->getSeqno();
+
+        if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) +
+            dataSize + checksumLen) {
+            THROW(HdfsIOException, "Invalid Packet, packetLen is %d, "
+                  "dataSize is %d, checksum size is %d",
+                  lastHeader->getPacketLen(), dataSize, checksumLen);
+        }
+
+        if (verify) {
+            verifyChecksum(chunks);
+        }
+
+        /*
+         * skip checksum
+         */
+        position = checksumLen;
+        /*
+         * the first packet we get may start at the position before we required
+         */
+        pendingAhead = cursor - lastHeader->getOffsetInBlock();
+        pendingAhead = pendingAhead > 0 ? pendingAhead : 0;
+        position += pendingAhead;
+    }
+
+    /*
+     * we reach the end of the range we required, send status to datanode
+     * if datanode do not sending data anymore.
+     */
+
+    if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) {
+        sendStatus();
+    }
+}
+
+bool RemoteBlockReader::readTrailingEmptyPacket() {
+    shared_ptr<PacketHeader> trailingHeader = readPacketHeader();
+
+    if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) {
+        return false;
+    }
+
+    return true;
+}
+
+void RemoteBlockReader::sendStatus() {
+    ClientReadStatusProto status;
+
+    if (verify) {
+        status.set_status(hadoop::hdfs::CHECKSUM_OK);
+    } else {
+        status.set_status(hadoop::hdfs::SUCCESS);
+    }
+
+    WriteBuffer buffer;
+    int size = status.ByteSize();
+    buffer.writeVarint32(size);
+    status.SerializeToArray(buffer.alloc(size), size);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
+}
+
+void RemoteBlockReader::verifyChecksum(int chunks) {
+    int dataSize = lastHeader->getDataLen();
+    char * pchecksum = &buffer[0];
+    char * pdata = &buffer[0] + (chunks * checksumSize);
+
+    for (int i = 0; i < chunks; ++i) {
+        int size = chunkSize < dataSize ? chunkSize : dataSize;
+        dataSize -= size;
+        checksum->reset();
+        checksum->update(pdata + (i * chunkSize), size);
+        uint32_t result = checksum->getValue();
+        uint32_t target =
+          ReadBigEndian32FromArray(pchecksum + (i * checksumSize));
+
+        if (result != target && size == chunkSize) {
+            THROW(ChecksumException, "RemoteBlockReader: checksum not match "
+                  "for Block: %s, on Datanode: %s",
+                  binfo.toString().c_str(), datanode.formatAddress().c_str());
+        }
+    }
+
+    assert(0 == dataSize);
+}
+
+int64_t RemoteBlockReader::available() {
+    return size - position > 0 ? size - position : 0;
+}
+
+int32_t RemoteBlockReader::read(char * buf, int32_t len) {
+    assert(0 != len && NULL != buf);
+
+    if (cursor >= endOffset) {
+        THROW(HdfsIOException, "RemoteBlockReader: read over block end from "
+              "Datanode: %s, Block: %s.",
+              datanode.formatAddress().c_str(), binfo.toString().c_str());
+    }
+
+    try {
+        if (position >= size) {
+            readNextPacket();
+        }
+
+        int32_t todo = len < size - position ? len : size - position;
+        memcpy(buf, &buffer[position], todo);
+        position += todo;
+        cursor += todo;
+        return todo;
+    } catch (const HdfsTimeoutException &e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+                  "Block: %s from Datanode: %s.",
+                  binfo.toString().c_str(), datanode.formatAddress().c_str());
+    } catch (const HdfsNetworkException &e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+                  "Block: %s from Datanode: %s.",
+                  binfo.toString().c_str(), datanode.formatAddress().c_str());
+    }
+}
+
+void RemoteBlockReader::skip(int64_t len) {
+    int64_t todo = len;
+    assert(cursor + len <= endOffset);
+
+    try {
+        while (todo > 0) {
+            if (cursor >= endOffset) {
+                THROW(HdfsIOException, "RemoteBlockReader: skip over block "
+                  "end from Datanode: %s, Block: %s.",
+                  datanode.formatAddress().c_str(), binfo.toString().c_str());
+            }
+
+            if (position >= size) {
+                readNextPacket();
+            }
+
+            int batch = size - position;
+            batch = batch < todo ? batch : static_cast<int>(todo);
+            position += batch;
+            cursor += batch;
+            todo -= batch;
+        }
+    } catch (const HdfsTimeoutException &e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+                 "Block: %s from Datanode: %s.",
+                 binfo.toString().c_str(), datanode.formatAddress().c_str());
+    } catch (const HdfsNetworkException &e) {
+        NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+                 "Block: %s from Datanode: %s.",
+                 binfo.toString().c_str(), datanode.formatAddress().c_str());
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
new file mode 100644
index 0000000..548118b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+#define _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+
+#include "BlockReader.h"
+#include "Checksum.h"
+#include "DataTransferProtocol.h"
+#include "PacketHeader.h"
+#include "SessionConfig.h"
+#include "common/SharedPtr.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "server/DatanodeInfo.h"
+#include "server/LocatedBlocks.h"
+
+#include <stdint.h>
+
+namespace hdfs {
+namespace internal {
+
+class RemoteBlockReader: public BlockReader {
+public:
+    RemoteBlockReader(const ExtendedBlock &eb, DatanodeInfo &datanode,
+                      int64_t start, int64_t len, const Token &token,
+                      const char *clientName, bool verify, SessionConfig &conf);
+
+    ~RemoteBlockReader();
+
+    /**
+     * Get how many bytes can be read without blocking.
+     * @return The number of bytes can be read without blocking.
+     */
+    virtual int64_t available();
+
+    /**
+     * To read data from block.
+     * @param buf the buffer used to filled.
+     * @param size the number of bytes to be read.
+     * @return return the number of bytes filled in the buffer,
+     *  it may less than size. Return 0 if reach the end of block.
+     */
+    virtual int32_t read(char *buf, int32_t len);
+
+    /**
+     * Move the cursor forward len bytes.
+     * @param len The number of bytes to skip.
+     */
+    virtual void skip(int64_t len);
+
+private:
+    bool readTrailingEmptyPacket();
+    shared_ptr<PacketHeader> readPacketHeader();
+    void checkResponse();
+    void readNextPacket();
+    void sendStatus();
+    void verifyChecksum(int chunks);
+
+private:
+    bool verify; //verify checksum or not.
+    DatanodeInfo &datanode;
+    const ExtendedBlock &binfo;
+    int checksumSize;
+    int chunkSize;
+    int connTimeout;
+    int position; //point in buffer.
+    int readTimeout;
+    int size;  //data size in buffer.
+    int writeTimeout;
+    int64_t cursor; //point in block.
+    int64_t endOffset; //offset in block requested to read to.
+    int64_t lastSeqNo; //segno of the last chunk received
+    shared_ptr<BufferedSocketReader> in;
+    shared_ptr<Checksum> checksum;
+    shared_ptr<DataTransferProtocol> sender;
+    shared_ptr<PacketHeader> lastHeader;
+    shared_ptr<Socket> sock;
+    std::vector<char> buffer;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
new file mode 100644
index 0000000..16ef93f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Hash.h"
+#include "Token.h"
+#include "WritableUtils.h"
+
+#include <gsasl.h>
+#include <string>
+#include <vector>
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+namespace internal {
+
+static std::string Base64Encode(const char *input, size_t len) {
+    int rc = 0;
+    size_t outLen;
+    char * output = NULL;
+    std::string retval;
+
+    if (GSASL_OK != (rc = gsasl_base64_to(input, len, &output, &outLen))) {
+        assert(GSASL_MALLOC_ERROR == rc);
+        throw std::bad_alloc();
+    }
+
+    assert(NULL != output);
+    retval = output;
+    gsasl_free(output);
+
+    for (size_t i = 0 ; i < retval.length(); ++i) {
+        switch (retval[i]) {
+        case '+':
+            retval[i] = '-';
+            break;
+
+        case '/':
+            retval[i] = '_';
+            break;
+
+        case '=':
+            retval.resize(i);
+            break;
+
+        default:
+            break;
+        }
+    }
+
+    return retval;
+}
+
+static void Base64Decode(const std::string &urlSafe,
+                         std::vector<char> &buffer) {
+    int retval = 0, append = 0;
+    size_t outLen;
+    char * output = NULL;
+    std::string input = urlSafe;
+
+    for (size_t i = 0; i < input.length(); ++i) {
+        switch (input[i]) {
+        case '-':
+            input[i] = '+';
+            break;
+
+        case '_':
+            input[i] = '/';
+            break;
+
+        default:
+            break;
+        }
+    }
+
+    while (true) {
+        retval = gsasl_base64_from(&input[0], input.length(), &output, &outLen);
+
+        if (GSASL_OK != retval) {
+            switch (retval) {
+            case GSASL_BASE64_ERROR:
+                if (append++ < 2) {
+                    input.append("=");
+                    continue;
+                }
+
+                throw std::invalid_argument(
+                    "invalid input of gsasl_base64_from");
+
+            case GSASL_MALLOC_ERROR:
+                throw std::bad_alloc();
+
+            default:
+                assert(
+                    false
+                    && "unexpected return value from gsasl_base64_from");
+            }
+        }
+
+        break;
+    }
+
+    assert(outLen >= 0);
+    buffer.resize(outLen);
+    memcpy(&buffer[0], output, outLen);
+    gsasl_free(output);
+}
+
+std::string Token::toString() const {
+    try {
+        size_t len = 0;
+        std::vector<char> buffer(1024);
+        WritableUtils out(&buffer[0], buffer.size());
+        len += out.WriteInt32(identifier.size());
+        len += out.WriteRaw(&identifier[0], identifier.size());
+        len += out.WriteInt32(password.size());
+        len += out.WriteRaw(&password[0], password.size());
+        len += out.WriteText(kind);
+        len += out.WriteText(service);
+        return Base64Encode(&buffer[0], len);
+    } catch (...) {
+        NESTED_THROW(HdfsIOException, "cannot convert token to string");
+    }
+}
+
+void Token::fromString(const std::string &str) {
+    int32_t len;
+
+    try {
+        std::vector<char> buffer;
+        Base64Decode(str, buffer);
+        WritableUtils in(&buffer[0], buffer.size());
+        len = in.ReadInt32();
+        identifier.resize(len);
+        in.ReadRaw(&identifier[0], len);
+        len = in.ReadInt32();
+        password.resize(len);
+        in.ReadRaw(&password[0], len);
+        kind = in.ReadText();
+        service = in.ReadText();
+    } catch (...) {
+        NESTED_THROW(HdfsInvalidBlockToken,
+                     "cannot construct a token from the string");
+    }
+}
+
+size_t Token::hash_value() const {
+    size_t values[] = { StringHasher(identifier), StringHasher(password),
+                        StringHasher(kind), StringHasher(service)
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
new file mode 100644
index 0000000..c72cd86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class Token {
+public:
+    std::string getIdentifier() const {
+        return identifier;
+    }
+
+    void setIdentifier(const std::string &identifier) {
+        this->identifier = identifier;
+    }
+
+    std::string getKind() const {
+        return kind;
+    }
+
+    void setKind(const std::string &kind) {
+        this->kind = kind;
+    }
+
+    std::string getPassword() const {
+        return password;
+    }
+
+    void setPassword(const std::string &password) {
+        this->password = password;
+    }
+
+    std::string getService() const {
+        return service;
+    }
+
+    void setService(const std::string &service) {
+        this->service = service;
+    }
+
+    bool operator ==(const Token &other) const {
+        return identifier == other.identifier && password == other.password
+               && kind == other.kind && service == other.service;
+    }
+
+    std::string toString() const;
+
+    void fromString(const std::string &str);
+
+    size_t hash_value() const;
+
+private:
+    std::string identifier;
+    std::string password;
+    std::string kind;
+    std::string service;
+};
+
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
new file mode 100644
index 0000000..a68bca0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UserInfo.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+#include <pwd.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+UserInfo UserInfo::LocalUser() {
+    UserInfo retval;
+    uid_t uid, euid;
+    int bufsize;
+    struct passwd pwd, epwd, *result = NULL;
+    euid = geteuid();
+    uid = getuid();
+
+    if ((bufsize = sysconf(_SC_GETPW_R_SIZE_MAX)) == -1) {
+        THROW(InvalidParameter,
+              "Invalid input: \"sysconf\" function failed to get the "
+              "configure with key \"_SC_GETPW_R_SIZE_MAX\".");
+    }
+
+    std::vector<char> buffer(bufsize);
+
+    if (getpwuid_r(euid, &epwd, &buffer[0], bufsize, &result) != 0 || !result) {
+        THROW(InvalidParameter,
+              "Invalid input: effective user name cannot be found with UID %u.",
+              euid);
+    }
+
+    retval.setEffectiveUser(epwd.pw_name);
+
+    if (getpwuid_r(uid, &pwd, &buffer[0], bufsize, &result) != 0 || !result) {
+        THROW(InvalidParameter,
+              "Invalid input: real user name cannot be found with UID %u.",
+              uid);
+    }
+
+    retval.setRealUser(pwd.pw_name);
+    return retval;
+}
+
+size_t UserInfo::hash_value() const {
+    size_t values[] = { StringHasher(realUser), effectiveUser.hash_value() };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
new file mode 100644
index 0000000..efc2c60
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_USERINFO_H_
+#define _HDFS_LIBHDFS3_CLIENT_USERINFO_H_
+
+#include "Hash.h"
+#include "KerberosName.h"
+#include "Logger.h"
+#include "Token.h"
+
+#include <map>
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class UserInfo {
+public:
+    UserInfo() {
+    }
+
+    explicit UserInfo(const std::string &u) :
+        effectiveUser(u) {
+    }
+
+    const std::string &getRealUser() const {
+        return realUser;
+    }
+
+    void setRealUser(const std::string &user) {
+        this->realUser = user;
+    }
+
+    const std::string &getEffectiveUser() const {
+        return effectiveUser.getName();
+    }
+
+    void setEffectiveUser(const std::string &effectiveUser) {
+        this->effectiveUser = KerberosName(effectiveUser);
+    }
+
+    std::string getPrincipal() const {
+        return effectiveUser.getPrincipal();
+    }
+
+    bool operator ==(const UserInfo &other) const {
+        return realUser == other.realUser
+               && effectiveUser == other.effectiveUser;
+    }
+
+    void addToken(const Token &token) {
+        tokens[std::make_pair(token.getKind(), token.getService())] = token;
+    }
+
+    const Token * selectToken(const std::string &kind,
+                              const std::string &service) const {
+        std::map<std::pair<std::string, std::string>,
+            Token>::const_iterator it;
+        it = tokens.find(std::make_pair(kind, service));
+        if (it == tokens.end()) {
+            return NULL;
+        }
+        return &it->second;
+    }
+
+    size_t hash_value() const;
+
+public:
+    static UserInfo LocalUser();
+
+private:
+    KerberosName effectiveUser;
+    std::map<std::pair<std::string, std::string>, Token> tokens;
+    std::string realUser;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::UserInfo);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_USERINFO_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
index a434aaf..fd62d5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
@@ -20,7 +20,7 @@
 #define _HDFS_LIBHDFS3_COMMON_LRUMAP_H_
 
 #include "Thread.h"
-#include "Unordered.h"
+#include "UnorderedMap.h"
 
 #include <list>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
index 8e0a40e..76ab1d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
@@ -19,6 +19,18 @@
 #ifndef _HDFS_LIBHDFS3_COMMON_SHARED_PTR_H_
 #define _HDFS_LIBHDFS3_COMMON_SHARED_PTR_H_
 
+#ifdef _LIBCPP_VERSION
+#include <memory>
+
+namespace hdfs {
+namespace internal {
+
+using std::shared_ptr;
+
+}
+}
+
+#else
 #include <tr1/memory>
 
 namespace hdfs {
@@ -30,3 +42,4 @@ using std::tr1::shared_ptr;
 }
 
 #endif
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
index 4dff889..00e3a2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
@@ -23,7 +23,7 @@
 
 #include <string>
 
-#ifndef DEFAULT_STACK_PREFIX 
+#ifndef DEFAULT_STACK_PREFIX
 #define DEFAULT_STACK_PREFIX "\t@\t"
 #endif
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
index 3bb08af..8c2c549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
@@ -19,7 +19,21 @@
 #ifndef _HDFS_LIBHDFS3_COMMON_UNORDERED_MAP_H_
 #define _HDFS_LIBHDFS3_COMMON_UNORDERED_MAP_H_
 
-#include <tr1/unordred_map>
+#ifdef _LIBCPP_VERSION
+
+#include <unordered_map>
+
+namespace hdfs {
+namespace internal {
+
+using std::unordered_map;
+
+}
+}
+
+#else
+
+#include <tr1/unordered_map>
 
 namespace hdfs {
 namespace internal {
@@ -30,3 +44,4 @@ using std::tr1::unordered_map;
 }
 
 #endif
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
new file mode 100644
index 0000000..fe30d68
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BufferedSocketReader.h"
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+using namespace google::protobuf::io;
+
+namespace hdfs {
+namespace internal {
+
+BufferedSocketReaderImpl::BufferedSocketReaderImpl(Socket & s) :
+    cursor(0), size(0), sock(s), buffer(sizeof(int64_t)) {
+}
+
+int32_t BufferedSocketReaderImpl::read(char * b, int32_t s) {
+    assert(s > 0 && NULL != b);
+    int32_t done = s < size - cursor ? s : size - cursor;
+
+    if (done > 0) {
+        memcpy(b, &buffer[cursor], done);
+        cursor += done;
+        return done;
+    } else {
+        assert(size == cursor);
+        size = cursor = 0;
+        return sock.read(b, s);
+    }
+}
+
+void BufferedSocketReaderImpl::readFully(char * b, int32_t s, int timeout) {
+    assert(s > 0 && NULL != b);
+    int32_t done = s < size - cursor ? s : size - cursor;
+    memcpy(b, &buffer[cursor], done);
+    cursor += done;
+
+    if (done < s) {
+        assert(size == cursor);
+        size = cursor = 0;
+        sock.readFully(b + done, s - done, timeout);
+    }
+}
+
+int32_t BufferedSocketReaderImpl::readBigEndianInt32(int timeout) {
+    char buf[sizeof(int32_t)];
+    readFully(buf, sizeof(buf), timeout);
+    return ntohl(*reinterpret_cast<int32_t *>(buf));
+}
+
+int32_t BufferedSocketReaderImpl::readVarint32(int timeout) {
+    int32_t value;
+    bool rc = false;
+    int deadline = timeout;
+    memmove(&buffer[0], &buffer[cursor], size - cursor);
+    size -= cursor;
+    cursor = 0;
+
+    while (!rc) {
+        CodedInputStream in(reinterpret_cast<uint8_t *>(&buffer[cursor]),
+                            size - cursor);
+        in.PushLimit(size - cursor);
+        rc = in.ReadVarint32(reinterpret_cast<uint32_t *>(&value));
+
+        if (rc) {
+            cursor += size - cursor - in.BytesUntilLimit();
+            return value;
+        }
+
+        steady_clock::time_point s = steady_clock::now();
+        CheckOperationCanceled();
+
+        if (size == static_cast<int32_t>(buffer.size())) {
+            THROW(HdfsNetworkException,
+                  "Invalid varint type or buffer is too small, buffer size = %d.",
+                  static_cast<int>(buffer.size()));
+        }
+
+        if (sock.poll(true, false, deadline)) {
+            size += sock.read(&buffer[size], buffer.size() - size);
+        }
+
+        steady_clock::time_point e = steady_clock::now();
+
+        if (timeout > 0) {
+            deadline -= ToMilliSeconds(s, e);
+        }
+
+        if (timeout >= 0 && deadline <= 0) {
+            THROW(HdfsTimeoutException, "Read %d bytes timeout", size);
+        }
+    }
+
+    return 0;
+}
+
+bool BufferedSocketReaderImpl::poll(int timeout) {
+    if (cursor < size) {
+        return true;
+    }
+
+    return sock.poll(true, false, timeout);
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
new file mode 100644
index 0000000..efe7826
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
+
+#include <vector>
+#include <stdint.h>
+#include <cstdlib>
+
+#include "Socket.h"
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A warper of Socket, read big endian int and varint from socket.
+ */
+class BufferedSocketReader {
+public:
+    virtual ~BufferedSocketReader() {
+    }
+
+    /**
+     * Read data from socket, if there is data buffered, read from buffer first.
+     * If there is nothing can be read, the caller will be blocked.
+     * @param b The buffer used to receive data.
+     * @param s The size of bytes to be read.
+     * @return The size of data already read.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     */
+    virtual int32_t read(char * b, int32_t s) = 0;
+
+    /**
+     * Read data form socket, if there is data buffered, read from buffer first.
+     * If there is not enough data can be read, the caller will be blocked.
+     * @param b The buffer used to receive data.
+     * @param s The size of bytes to read.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     * @throw HdfsTimeout
+     */
+    virtual void readFully(char * b, int32_t s, int timeout) = 0;
+
+    /**
+     * Read a 32 bit big endian integer from socket.
+     * If there is not enough data can be read, the caller will be blocked.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @return A 32 bit integer.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     * @throw HdfsTimeout
+     */
+    virtual int32_t readBigEndianInt32(int timeout) = 0;
+
+    /**
+     * Read a variable length encoding 32bit integer from socket.
+     * If there is not enough data can be read, the caller will be blocked.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @return A 32 bit integer.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     * @throw HdfsTimeout
+     */
+    virtual int32_t readVarint32(int timeout) = 0;
+
+    /**
+     * Test if the socket can be read without blocking.
+     * @param timeout Time timeout interval of this operation, negative means infinite.
+     * @return Return true if the socket can be read without blocking, false on timeout.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual bool poll(int timeout) = 0;
+
+};
+
+/**
+ * An implement of BufferedSocketReader.
+ */
+class BufferedSocketReaderImpl: public BufferedSocketReader {
+public:
+    BufferedSocketReaderImpl(Socket & s);
+
+    int32_t read(char * b, int32_t s);
+
+    void readFully(char * b, int32_t s, int timeout);
+
+    int32_t readBigEndianInt32(int timeout);
+
+    int32_t readVarint32(int timeout);
+
+    bool poll(int timeout);
+
+private:
+    //for test
+    BufferedSocketReaderImpl(Socket & s, const std::vector<char> & buffer) :
+        cursor(0), size(buffer.size()), sock(s), buffer(buffer) {
+    }
+
+private:
+    int32_t cursor;
+    int32_t size;
+    Socket & sock;
+    std::vector<char> buffer;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
new file mode 100644
index 0000000..43968dc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_SOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_SOCKET_H_
+
+#include <netdb.h>
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class Socket {
+public:
+
+    virtual ~Socket() {
+    }
+
+    /**
+     * Read data from socket.
+     * If there is nothing can be read, the caller will be blocked.
+     * @param buffer The buffer to store the data.
+     * @param size The size of bytes to be read.
+     * @return The size of data already read.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     */
+    virtual int32_t read(char * buffer, int32_t size) = 0;
+
+    /**
+     * Read data from socket until get enough data.
+     * If there is not enough data can be read, the caller will be blocked.
+     * @param buffer The buffer to store the data.
+     * @param size The size of bytes to be read.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     * @throw HdfsTimeout
+     */
+    virtual void readFully(char * buffer, int32_t size, int timeout) = 0;
+
+    /**
+     * Send data to socket.
+     * The caller will be blocked until send operation finished,
+     *      but not guarantee that all data has been sent.
+     * @param buffer The data to be sent.
+     * @param size The size of bytes to be sent.
+     * @return The size of data already be sent.
+     * @throw HdfsNetworkException
+     */
+    virtual int32_t write(const char * buffer, int32_t size) = 0;
+
+    /**
+     * Send all data to socket.
+     * The caller will be blocked until all data has been sent.
+     * @param buffer The data to be sent.
+     * @param size The size of bytes to be sent.
+     * @param timeout The timeout interval of this write operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual void writeFully(const char * buffer, int32_t size, int timeout) = 0;
+
+    /**
+     * Connection to a tcp server.
+     * @param host The host of server.
+     * @param port The port of server.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual void connect(const char * host, int port, int timeout) = 0;
+
+    /**
+     * Connection to a tcp server.
+     * @param host The host of server.
+     * @param port The port of server.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual void connect(const char * host, const char * port, int timeout) = 0;
+
+    /**
+     * Connection to a tcp server.
+     * @param paddr The address of server.
+     * @param host The host of server used in error message.
+     * @param port The port of server used in error message.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual void connect(struct addrinfo * paddr, const char * host,
+                         const char * port, int timeout) = 0;
+
+    /**
+     * Test if the socket can be read or written without blocking.
+     * @param read Test socket if it can be read.
+     * @param write Test socket if it can be written.
+     * @param timeout Time timeout interval of this operation, negative means infinite.
+     * @return Return true if the socket can be read or written without blocking, false on timeout.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    virtual bool poll(bool read, bool write, int timeout) = 0;
+
+    /**
+     * Set socket no delay mode.
+     * @param enable If true, set socket into no delay mode, else delay mode.
+     * @throw HdfsNetworkException
+     */
+    virtual void setNoDelay(bool enable) = 0;
+
+    /**
+     * Set socket blocking mode.
+     * @param enable If true, set socket into blocking mode, else non-block mode.
+     * @throw HdfsNetworkException
+     */
+    virtual void setBlockMode(bool enable) = 0;
+
+    /**
+     * Set socket linger timeout
+     * @param timeout Linger timeout of the socket in millisecond, disable linger if it is less than 0.
+     * @throw HdfsNetworkException
+     */
+    virtual void setLingerTimeout(int timeout) = 0;
+
+    /**
+     * Shutdown and close the socket.
+     * @throw nothrow
+     */
+    virtual void close() = 0;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_SOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
new file mode 100644
index 0000000..5dab57c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_
+#define _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_
+
+#include <fcntl.h>
+#include <netdb.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace real_syscalls {
+
+using ::recv;
+using ::send;
+using ::getaddrinfo;
+using ::freeaddrinfo;
+using ::socket;
+using ::connect;
+using ::getpeername;
+using ::fcntl;
+using ::setsockopt;
+using ::poll;
+using ::shutdown;
+using ::close;
+
+}
+
+#ifdef MOCK
+
+#include "MockSystem.h"
+namespace syscalls = mock_systems;
+
+#else
+
+namespace syscalls = real_syscalls;
+
+#endif
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
new file mode 100644
index 0000000..de2db9d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "platform.h"
+
+#include <arpa/inet.h>
+#include <cassert>
+#include <climits>
+#include <cstring>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <sstream>
+
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "TcpSocket.h"
+#include "Syscall.h"
+
+// Linux defines a constant that you can use inside send() to prevent SIGPIPE
+// from being raised.  When this constant is present, we want to use it.  When
+// it is not present, we just pass 0 (no flag).
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+namespace hdfs {
+namespace internal {
+
+// MacOS and some other BSD-based operating systems allow you to set
+// SO_NOSIGPIPE on a socket to prevent writes to that socket from raising
+// SIGPIPE.
+void TcpSocketImpl::setNoSigPipe() {
+#ifdef SO_NOSIGPIPE
+    int flag = 1;
+
+    if (syscalls::setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (char *) &flag,
+                               sizeof(flag))) {
+        THROW(HdfsNetworkException, "Set socket flag failed for remote "
+              "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+#endif
+}
+
+TcpSocketImpl::TcpSocketImpl() :
+    sock(-1), lingerTimeout(-1) {
+}
+
+TcpSocketImpl::~TcpSocketImpl() {
+    close();
+}
+
+int32_t TcpSocketImpl::read(char * buffer, int32_t size) {
+    assert(-1 != sock);
+    assert(NULL != buffer && size > 0);
+    int32_t rc;
+
+    do {
+        rc = syscalls::recv(sock, buffer, size, 0);
+    } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+    if (-1 == rc) {
+        THROW(HdfsNetworkException, "Read %d bytes failed from %s: %s",
+              size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+
+    if (0 == rc) {
+        THROW(HdfsEndOfStream, "Read %d bytes failed from %s: End of the stream", size, remoteAddr.c_str());
+    }
+
+    return rc;
+}
+
+void TcpSocketImpl::readFully(char * buffer, int32_t size, int timeout) {
+    assert(-1 != sock);
+    assert(NULL != buffer && size > 0);
+    int32_t todo = size, rc;
+    int deadline = timeout;
+
+    while (todo > 0) {
+        steady_clock::time_point s = steady_clock::now();
+        CheckOperationCanceled();
+
+        if (poll(true, false, deadline)) {
+            rc = read(buffer + (size - todo), todo);
+            todo -= rc;
+        }
+
+        steady_clock::time_point e = steady_clock::now();
+
+        if (timeout > 0) {
+            deadline -= ToMilliSeconds(s, e);
+        }
+
+        if (todo > 0 && timeout >= 0 && deadline <= 0) {
+            THROW(HdfsTimeoutException, "Read %d bytes timeout from %s", size, remoteAddr.c_str());
+        }
+    }
+}
+
+int32_t TcpSocketImpl::write(const char * buffer, int32_t size) {
+    assert(-1 != sock);
+    assert(NULL != buffer && size > 0);
+    int32_t rc;
+
+    do {
+        rc = syscalls::send(sock, buffer, size, MSG_NOSIGNAL);
+    } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+    if (-1 == rc) {
+        THROW(HdfsNetworkException, "Write %d bytes failed to %s: %s",
+              size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+
+    return rc;
+}
+
+void TcpSocketImpl::writeFully(const char * buffer, int32_t size, int timeout) {
+    assert(-1 != sock);
+    assert(NULL != buffer && size > 0);
+    int32_t todo = size, rc;
+    int deadline = timeout;
+
+    while (todo > 0) {
+        steady_clock::time_point s = steady_clock::now();
+        CheckOperationCanceled();
+
+        if (poll(false, true, deadline)) {
+            rc = write(buffer + (size - todo), todo);
+            todo -= rc;
+        }
+
+        steady_clock::time_point e = steady_clock::now();
+
+        if (timeout > 0) {
+            deadline -= ToMilliSeconds(s, e);
+        }
+
+        if (todo > 0 && timeout >= 0 && deadline <= 0) {
+            THROW(HdfsTimeoutException, "Write %d bytes timeout to %s",
+                  size, remoteAddr.c_str());
+        }
+    }
+}
+
+void TcpSocketImpl::connect(const char * host, int port, int timeout) {
+    std::stringstream ss;
+    ss << port;
+    connect(host, ss.str().c_str(), timeout);
+}
+
+void TcpSocketImpl::connect(const char * host, const char * port, int timeout) {
+    assert(-1 == sock);
+    struct addrinfo hints, *addrs, *paddr;
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = PF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+    int retval = syscalls::getaddrinfo(host, port, &hints, &addrs);
+
+    if (0 != retval) {
+        THROW(HdfsNetworkConnectException, "Failed to resolve address \"%s:%s\" %s",
+              host, port, gai_strerror(retval));
+    }
+
+    int deadline = timeout;
+    std::stringstream ss;
+    ss << "\"" << host << ":" << port << "\"";
+    remoteAddr = ss.str();
+
+    try {
+        for (paddr = addrs; NULL != paddr; paddr = paddr->ai_next) {
+            steady_clock::time_point s = steady_clock::now();
+            CheckOperationCanceled();
+
+            try {
+                connect(paddr, host, port, deadline);
+            } catch (HdfsNetworkConnectException & e) {
+                if (NULL == paddr->ai_next) {
+                    throw;
+                }
+            } catch (HdfsTimeoutException & e) {
+                if (NULL == paddr->ai_next) {
+                    throw;
+                }
+            }
+
+            if (-1 != sock) {
+                syscalls::freeaddrinfo(addrs);
+                return;
+            }
+
+            steady_clock::time_point e = steady_clock::now();
+
+            if (timeout > 0) {
+                deadline -= ToMilliSeconds(s, e);
+            }
+
+            if (-1 == sock && timeout >= 0 && deadline <= 0) {
+                THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
+            }
+        }
+    } catch (...) {
+        syscalls::freeaddrinfo(addrs);
+        throw;
+    }
+}
+
+void TcpSocketImpl::connect(struct addrinfo * paddr, const char * host,
+                            const char * port, int timeout) {
+    assert(-1 == sock);
+    sock = syscalls::socket(paddr->ai_family, paddr->ai_socktype,
+                              paddr->ai_protocol);
+
+    if (-1 == sock) {
+        THROW(HdfsNetworkException,
+              "Create socket failed when connect to %s: %s",
+              remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+
+    if (lingerTimeout >= 0) {
+        setLingerTimeoutInternal(lingerTimeout);
+    }
+
+#ifdef __linux__
+    /*
+     * on linux some kernel use SO_SNDTIMEO as connect timeout.
+     * It is OK to set a very large value here since the user has its own timeout mechanism.
+     */
+    setSendTimeout(3600000);
+#endif
+
+    try {
+        setBlockMode(false);
+        setNoSigPipe();
+
+        int rc = 0;
+        do {
+            rc = syscalls::connect(sock, paddr->ai_addr, paddr->ai_addrlen);
+        } while (rc < 0 && EINTR == errno && !CheckOperationCanceled());
+
+        if (rc < 0) {
+            if (EINPROGRESS != errno && EWOULDBLOCK != errno) {
+                if (ETIMEDOUT == errno) {
+                    THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
+                          host, port);
+                } else {
+                    THROW(HdfsNetworkConnectException,
+                          "Connect to \"%s:%s\" failed: %s",
+                          host, port, GetSystemErrorInfo(errno));
+                }
+            }
+
+            if (!poll(false, true, timeout)) {
+                THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
+            }
+
+            struct sockaddr peer;
+
+            unsigned int len = sizeof(peer);
+
+            memset(&peer, 0, sizeof(peer));
+
+            if (syscalls::getpeername(sock, &peer, &len)) {
+                /*
+                 * connect failed, find out the error info.
+                 */
+                char c;
+                rc = syscalls::recv(sock, &c, 1, 0);
+                assert(rc < 0);
+
+                if (ETIMEDOUT == errno) {
+                    THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
+                          host, port);
+                }
+
+                THROW(HdfsNetworkConnectException, "Connect to \"%s:%s\" failed: %s",
+                      host, port, GetSystemErrorInfo(errno));
+            }
+        }
+
+        setBlockMode(true);
+    } catch (...) {
+        close();
+        throw;
+    }
+}
+
+void TcpSocketImpl::setBlockMode(bool enable) {
+    int flag;
+    flag = syscalls::fcntl(sock, F_GETFL, 0);
+
+    if (-1 == flag) {
+        THROW(HdfsNetworkException, "Get socket flag failed for remote node %s: %s",
+              remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+
+    flag = enable ? (flag & ~O_NONBLOCK) : (flag | O_NONBLOCK);
+
+    if (-1 == syscalls::fcntl(sock, F_SETFL, flag)) {
+        THROW(HdfsNetworkException, "Set socket flag failed for remote "
+              "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+}
+
+bool TcpSocketImpl::poll(bool read, bool write, int timeout) {
+    assert(-1 != sock);
+    int rc;
+    struct pollfd pfd;
+
+    do {
+        memset(&pfd, 0, sizeof(pfd));
+        pfd.fd = sock;
+
+        if (read) {
+            pfd.events |= POLLIN;
+        }
+
+        if (write) {
+            pfd.events |= POLLOUT;
+        }
+
+        rc = syscalls::poll(&pfd, 1, timeout);
+    } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+    if (-1 == rc) {
+        THROW(HdfsNetworkException, "Poll failed for remote node %s: %s",
+              remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+
+    return 0 != rc;
+}
+
+void TcpSocketImpl::setNoDelay(bool enable) {
+    assert(-1 != sock);
+    int flag = enable ? 1 : 0;
+
+    if (syscalls::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &flag,
+                               sizeof(flag))) {
+        THROW(HdfsNetworkException, "Set socket flag failed for remote "
+              "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+}
+
+void TcpSocketImpl::setLingerTimeout(int timeout) {
+    lingerTimeout = timeout;
+}
+
+void TcpSocketImpl::setLingerTimeoutInternal(int timeout) {
+    assert(-1 != sock);
+    struct linger l;
+    l.l_onoff = timeout > 0 ? true : false;
+    l.l_linger = timeout > 0 ? timeout : 0;
+
+    if (syscalls::setsockopt(sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l))) {
+        THROW(HdfsNetworkException, "Set socket flag failed for remote "
+              "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+}
+
+void TcpSocketImpl::setSendTimeout(int timeout) {
+    assert(-1 != sock);
+    struct timeval timeo;
+    timeo.tv_sec = timeout / 1000;
+    timeo.tv_usec = (timeout % 1000) * 1000;
+
+    if (syscalls::setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO,
+                                &timeo, sizeof(timeo))) {
+        THROW(HdfsNetworkException, "Set socket flag failed for remote "
+              "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+    }
+}
+
+void TcpSocketImpl::close() {
+    if (-1 != sock) {
+        syscalls::shutdown(sock, SHUT_RDWR);
+        syscalls::close(sock);
+        sock = -1;
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
new file mode 100644
index 0000000..ff90b20
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_
+
+#include "Socket.h"
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A tcp socket client
+ */
+class TcpSocketImpl: public Socket {
+public:
+    /**
+     * Construct a Socket object.
+     * @throw nothrow
+     */
+    TcpSocketImpl();
+
+    /**
+     * Destroy a TcpSocketImpl instance.
+     */
+    ~TcpSocketImpl();
+
+    /**
+     * Read data from socket.
+     * If there is nothing can be read, the caller will be blocked.
+     * @param buffer The buffer to store the data.
+     * @param size The size of bytes to be read.
+     * @return The size of data already read.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     */
+    int32_t read(char * buffer, int32_t size);
+
+    /**
+     * Read data from socket until get enough data.
+     * If there is not enough data can be read, the caller will be blocked.
+     * @param buffer The buffer to store the data.
+     * @param size The size of bytes to be read.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsEndOfStream
+     * @throw HdfsTimeout
+     */
+    void readFully(char * buffer, int32_t size, int timeout);
+
+    /**
+     * Send data to socket.
+     * The caller will be blocked until send operation finished,
+     *      but not guarantee that all data has been sent.
+     * @param buffer The data to be sent.
+     * @param size The size of bytes to be sent.
+     * @return The size of data already be sent.
+     * @throw HdfsNetworkException
+     */
+    int32_t write(const char * buffer, int32_t size);
+
+    /**
+     * Send all data to socket.
+     * The caller will be blocked until all data has been sent.
+     * @param buffer The data to be sent.
+     * @param size The size of bytes to be sent.
+     * @param timeout The timeout interval of this write operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void writeFully(const char * buffer, int32_t size, int timeout);
+
+    /**
+     * Connection to a tcp server.
+     * @param host The host of server.
+     * @param port The port of server.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void connect(const char * host, int port, int timeout);
+
+    /**
+     * Connection to a tcp server.
+     * @param host The host of server.
+     * @param port The port of server.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void connect(const char * host, const char * port, int timeout);
+
+    /**
+     * Connection to a tcp server.
+     * @param paddr The address of server.
+     * @param host The host of server used in error message.
+     * @param port The port of server used in error message.
+     * @param timeout The timeout interval of this read operation, negative means infinite.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void connect(struct addrinfo * paddr, const char * host, const char * port,
+                 int timeout);
+
+    /**
+     * Test if the socket can be read or written without blocking.
+     * @param read Test socket if it can be read.
+     * @param write Test socket if it can be written.
+     * @param timeout Time timeout interval of this operation, negative means infinite.
+     * @return Return true if the socket can be read or written without blocking, false on timeout.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    bool poll(bool read, bool write, int timeout);
+
+    /**
+     * Set socket no delay mode.
+     * @param enable If true, set socket into no delay mode, else delay mode.
+     * @throw HdfsNetworkException
+     */
+    void setNoDelay(bool enable);
+
+    /**
+     * Set socket blocking mode.
+     * @param enable If true, set socket into blocking mode, else non-block mode.
+     * @throw HdfsNetworkException
+     */
+    void setBlockMode(bool enable);
+
+    /**
+     * Set socket linger timeout
+     * @param timeout Linger timeout of the socket in millisecond, disable linger if it is less than 0.
+     * @throw HdfsNetworkException
+     */
+    void setLingerTimeout(int timeout);
+
+    /**
+     * Shutdown and close the socket.
+     * @throw nothrow
+     */
+    void close();
+
+private:
+    void setNoSigPipe();
+    void setLingerTimeoutInternal(int timeout);
+    void setSendTimeout(int timeout);
+
+private:
+    int sock;
+    int lingerTimeout;
+    std::string remoteAddr;  //used for error message
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
new file mode 100644
index 0000000..3f63ff6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcAuth.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+namespace hdfs {
+namespace internal {
+
+AuthMethod RpcAuth::ParseMethod(const std::string &str) {
+    if (0 == strcasecmp(str.c_str(), "SIMPLE")) {
+        return AuthMethod::SIMPLE;
+    } else if (0 == strcasecmp(str.c_str(), "KERBEROS")) {
+        return AuthMethod::KERBEROS;
+    } else if (0 == strcasecmp(str.c_str(), "TOKEN")) {
+        return AuthMethod::TOKEN;
+    } else {
+        THROW(InvalidParameter, "RpcAuth: Unknown auth mechanism type: %s",
+              str.c_str());
+    }
+}
+
+size_t RpcAuth::hash_value() const {
+    size_t values[] = { Int32Hasher(method), user.hash_value() };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
new file mode 100644
index 0000000..e82c28d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+#define _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+
+#include "client/UserInfo.h"
+#include "Hash.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+enum AuthMethod {
+    SIMPLE = 80, KERBEROS = 81, //"GSSAPI"
+    TOKEN = 82, //"DIGEST-MD5"
+    UNKNOWN = 255
+};
+
+enum AuthProtocol {
+    NONE = 0, SASL = -33
+};
+
+class RpcAuth {
+public:
+    RpcAuth() :
+        method(SIMPLE) {
+    }
+
+    explicit RpcAuth(AuthMethod mech) :
+        method(mech) {
+    }
+
+    RpcAuth(const UserInfo &ui, AuthMethod mech) :
+        method(mech), user(ui) {
+    }
+
+    AuthProtocol getProtocol() const {
+        return method == SIMPLE ? AuthProtocol::NONE : AuthProtocol::SASL;
+    }
+
+    const UserInfo &getUser() const {
+        return user;
+    }
+
+    UserInfo &getUser() {
+        return user;
+    }
+
+    void setUser(const UserInfo &user) {
+        this->user = user;
+    }
+
+    AuthMethod getMethod() const {
+        return method;
+    }
+
+    size_t hash_value() const;
+
+    bool operator ==(const RpcAuth &other) const {
+        return method == other.method && user == other.user;
+    }
+
+public:
+    static AuthMethod ParseMethod(const std::string &str);
+
+private:
+    AuthMethod method;
+    UserInfo user;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcAuth);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCAUTH_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d28f73b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
new file mode 100644
index 0000000..7c6e316
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+
+#include "google/protobuf/message.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class RpcCall {
+public:
+    RpcCall(bool idemp, std::string n, google::protobuf::Message *req,
+            google::protobuf::Message *resp) :
+        idempotent(idemp), name(n), request(req), response(resp) {
+    }
+
+    bool isIdempotent() const {
+        return idempotent;
+    }
+
+    const char *getName() const {
+        return name.c_str();
+    }
+
+    void setIdempotent(bool idempotent) {
+        this->idempotent = idempotent;
+    }
+
+    void setName(const std::string &name) {
+        this->name = name;
+    }
+
+    google::protobuf::Message *getRequest() {
+        return request;
+    }
+
+    void setRequest(google::protobuf::Message *request) {
+        this->request = request;
+    }
+
+    google::protobuf::Message *getResponse() {
+        return response;
+    }
+
+    void setResponse(google::protobuf::Message *response) {
+        this->response = response;
+    }
+
+private:
+    bool idempotent;
+    std::string name;
+    google::protobuf::Message *request;
+    google::protobuf::Message *response;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCALL_H_ */


Mime
View raw message