hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhuvnesh2...@apache.org
Subject [18/48] incubator-hawq git commit: HAWQ-618. Import libhdfs3 library for internal management and LICENSE modified
Date Mon, 04 Apr 2016 05:09:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamImpl.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.cpp b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
new file mode 100644
index 0000000..0c9f813
--- /dev/null
+++ b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
@@ -0,0 +1,642 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Atomic.h"
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "FileSystemInter.h"
+#include "HWCrc32c.h"
+#include "LeaseRenewer.h"
+#include "Logger.h"
+#include "OutputStream.h"
+#include "OutputStreamImpl.h"
+#include "Packet.h"
+#include "PacketHeader.h"
+#include "SWCrc32c.h"
+
+#include <cassert>
+#include <inttypes.h>
+
+namespace Hdfs {
+namespace Internal {
+
+OutputStreamImpl::OutputStreamImpl() :
+/*heartBeatStop(true),*/ closed(true), isAppend(false), syncBlock(false), checksumSize(0), chunkSize(
+        0), chunksPerPacket(0), closeTimeout(0), heartBeatInterval(0), packetSize(0), position(
+            0), replication(0), blockSize(0), bytesWritten(0), cursor(0), lastFlushed(
+                0), nextSeqNo(0), packets(0) {
+    if (HWCrc32c::available()) {
+        checksum = shared_ptr < Checksum > (new HWCrc32c());
+    } else {
+        checksum = shared_ptr < Checksum > (new SWCrc32c());
+    }
+
+    checksumSize = sizeof(int32_t);
+    lastSend = steady_clock::now();
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+OutputStreamImpl::~OutputStreamImpl() {
+    if (!closed) {
+        try {
+            close();
+        } catch (...) {
+        }
+    }
+}
+
+void OutputStreamImpl::checkStatus() {
+    if (closed) {
+        THROW(HdfsIOException, "OutputStreamImpl: stream is not opened.");
+    }
+
+    lock_guard < mutex > lock(mut);
+
+    if (lastError != exception_ptr()) {
+        rethrow_exception(lastError);
+    }
+}
+
+void OutputStreamImpl::setError(const exception_ptr & error) {
+    try {
+        lock_guard < mutex > lock(mut);
+        lastError = error;
+    } catch (...) {
+    }
+}
+
+/**
+ * To create or append a file.
+ * @param fs hdfs file system.
+ * @param path the file path.
+ * @param flag creation flag, can be Create, Append or Create|Overwrite.
+ * @param permission create a new file with given permission.
+ * @param createParent if the parent does not exist, create it.
+ * @param replication create a file with given number of replication.
+ * @param blockSize  create a file with given block size.
+ */
+void OutputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
+                            const Permission & permission, bool createParent, int replication,
+                            int64_t blockSize) {
+    if (NULL == path || 0 == strlen(path) || replication < 0 || blockSize < 0) {
+        THROW(InvalidParameter, "Invalid parameter.");
+    }
+
+    if (!(flag == Create || flag == (Create | SyncBlock) || flag == Overwrite
+            || flag == (Overwrite | SyncBlock) || flag == Append
+            || flag == (Append | SyncBlock) || flag == (Create | Overwrite)
+            || flag == (Create | Overwrite | SyncBlock)
+            || flag == (Create | Append)
+            || flag == (Create | Append | SyncBlock))) {
+        THROW(InvalidParameter, "Invalid flag.");
+    }
+
+    try {
+        openInternal(fs, path, flag, permission, createParent, replication,
+                     blockSize);
+    } catch (...) {
+        reset();
+        throw;
+    }
+}
+
+void OutputStreamImpl::computePacketChunkSize() {
+    int chunkSizeWithChecksum = chunkSize + checksumSize;
+    static const int packetHeaderSize = PacketHeader::GetPkgHeaderSize();
+    chunksPerPacket =
+        (packetSize - packetHeaderSize + chunkSizeWithChecksum - 1)
+        / chunkSizeWithChecksum;
+    chunksPerPacket = chunksPerPacket > 1 ? chunksPerPacket : 1;
+    packetSize = chunksPerPacket * chunkSizeWithChecksum + packetHeaderSize;
+    buffer.resize(chunkSize);
+}
+
+void OutputStreamImpl::initAppend() {
+    FileStatus fileInfo;
+    std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
+    lastBlockWithStatus = filesystem->append(this->path);
+    lastBlock = lastBlockWithStatus.first;
+
+    if (lastBlockWithStatus.second) {
+        fileInfo = *lastBlockWithStatus.second;
+    } else {
+        fileInfo = filesystem->getFileStatus(this->path.c_str());
+    }
+
+    closed = false;
+
+    try {
+        this->blockSize = fileInfo.getBlockSize();
+        cursor = fileInfo.getLength();
+
+        if (lastBlock) {
+            isAppend = true;
+            bytesWritten = lastBlock->getNumBytes();
+            int64_t usedInLastBlock = fileInfo.getLength() % blockSize;
+            int64_t freeInLastBlock = blockSize - usedInLastBlock;
+
+            if (freeInLastBlock == this->blockSize) {
+                THROW(HdfsIOException,
+                      "OutputStreamImpl: the last block for file %s is full.",
+                      this->path.c_str());
+            }
+
+            int usedInCksum = cursor % chunkSize;
+            int freeInCksum = chunkSize - usedInCksum;
+
+            if (usedInCksum > 0 && freeInCksum > 0) {
+                /*
+                 * if there is space in the last partial chunk, then
+                 * setup in such a way that the next packet will have only
+                 * one chunk that fills up the partial chunk.
+                 */
+                packetSize = 0;
+                chunkSize = freeInCksum;
+            } else {
+                /*
+                 * if the remaining space in the block is smaller than
+                 * that expected size of of a packet, then create
+                 * smaller size packet.
+                 */
+                packetSize =
+                    packetSize < freeInLastBlock ?
+                    packetSize : static_cast<int>(freeInLastBlock);
+            }
+        }
+    } catch (...) {
+        completeFile(false);
+        reset();
+        throw;
+    }
+
+    computePacketChunkSize();
+}
+
+void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path,
+                                    int flag, const Permission & permission, bool createParent,
+                                    int replication, int64_t blockSize) {
+    filesystem = fs;
+    this->path = fs->getStandardPath(path);
+    this->replication = replication;
+    this->blockSize = blockSize;
+    syncBlock = flag & SyncBlock;
+    conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf()));
+    LOG(DEBUG2, "open file %s for %s", this->path.c_str(), (flag & Append ? "append" : "write"));
+    packets.setMaxSize(conf->getPacketPoolSize());
+
+    if (0 == replication) {
+        this->replication = conf->getDefaultReplica();
+    } else {
+        this->replication = replication;
+    }
+
+    if (0 == blockSize) {
+        this->blockSize = conf->getDefaultBlockSize();
+    } else {
+        this->blockSize = blockSize;
+    }
+
+    chunkSize = conf->getDefaultChunkSize();
+    packetSize = conf->getDefaultPacketSize();
+    heartBeatInterval = conf->getHeartBeatInterval();
+    closeTimeout = conf->getCloseFileTimeout();
+
+    if (packetSize < chunkSize) {
+        THROW(InvalidParameter,
+              "OutputStreamImpl: packet size %d is less than the chunk size %d.",
+              packetSize, chunkSize);
+    }
+
+    if (0 != this->blockSize % chunkSize) {
+        THROW(InvalidParameter,
+              "OutputStreamImpl: block size %" PRId64 " is not the multiply of chunk size %d.",
+              this->blockSize, chunkSize);
+    }
+
+    try {
+        if (flag & Append) {
+            initAppend();
+            LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
+            return;
+        }
+    } catch (const FileNotFoundException & e) {
+        if (!(flag & Create)) {
+            throw;
+        }
+    }
+
+    assert((flag & Create) || (flag & Overwrite));
+    fs->create(this->path, permission, flag, createParent, this->replication,
+               this->blockSize);
+    closed = false;
+    computePacketChunkSize();
+    LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
+}
+
+/**
+ * To append data to file.
+ * @param buf the data used to append.
+ * @param size the data size.
+ */
+void OutputStreamImpl::append(const char * buf, int64_t size) {
+    LOG(DEBUG3, "append file %s size is %" PRId64 ", offset %" PRId64 " next pos %" PRId64, path.c_str(), size, cursor, size + cursor);
+
+    if (NULL == buf || size < 0) {
+        THROW(InvalidParameter, "Invalid parameter.");
+    }
+
+    checkStatus();
+
+    try {
+        appendInternal(buf, size);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::appendInternal(const char * buf, int64_t size) {
+    int64_t todo = size;
+
+    while (todo > 0) {
+        int batch = buffer.size() - position;
+        batch = batch < todo ? batch : static_cast<int>(todo);
+
+        /*
+         * bypass buffer.
+         */
+        if (0 == position && todo >= static_cast<int64_t>(buffer.size())) {
+            checksum->update(buf + size - todo, batch);
+            appendChunkToPacket(buf + size - todo, batch);
+            bytesWritten += batch;
+            checksum->reset();
+        } else {
+            checksum->update(buf + size - todo, batch);
+            memcpy(&buffer[position], buf + size - todo, batch);
+            position += batch;
+
+            if (position == static_cast<int>(buffer.size())) {
+                appendChunkToPacket(&buffer[0], buffer.size());
+                bytesWritten += buffer.size();
+                checksum->reset();
+                position = 0;
+            }
+        }
+
+        todo -= batch;
+
+        if (currentPacket
+                && (currentPacket->isFull() || bytesWritten == blockSize)) {
+            sendPacket(currentPacket);
+
+            if (isAppend) {
+                isAppend = false;
+                chunkSize = conf->getDefaultChunkSize();
+                packetSize = conf->getDefaultPacketSize();
+                computePacketChunkSize();
+            }
+
+            if (bytesWritten == blockSize) {
+                closePipeline();
+            }
+        }
+    }
+
+    cursor += size;
+}
+
+void OutputStreamImpl::appendChunkToPacket(const char * buf, int size) {
+    assert(NULL != buf && size > 0);
+
+    if (!currentPacket) {
+        currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
+                                          nextSeqNo++, checksumSize);
+    }
+
+    currentPacket->addChecksum(checksum->getValue());
+    currentPacket->addData(buf, size);
+    currentPacket->increaseNumChunks();
+}
+
+void OutputStreamImpl::sendPacket(shared_ptr<Packet> packet) {
+    if (!pipeline) {
+        setupPipeline();
+    }
+
+    pipeline->send(currentPacket);
+    currentPacket.reset();
+    lastSend = steady_clock::now();
+}
+
+void OutputStreamImpl::setupPipeline() {
+    assert(currentPacket);
+#ifdef MOCK
+    pipeline = stub->getPipeline();
+#else
+    pipeline = shared_ptr<Pipeline>(new PipelineImpl(isAppend, path.c_str(), *conf, filesystem,
+                                    CHECKSUM_TYPE_CRC32C, conf->getDefaultChunkSize(), replication,
+                                    currentPacket->getOffsetInBlock(), packets, lastBlock));
+#endif
+    lastSend = steady_clock::now();
+    /*
+     * start heart beat beat thread
+     */
+    /*if (heartBeatStop) {
+        if (heartBeatSender.joinable()) {
+            heartBeatSender.join();
+        }
+
+        heartBeatStop = false;
+        heartBeatSender = thread(&OutputStreamImpl::heartBeatSenderRoutine, this);
+    }*/
+}
+
+/**
+ * Flush all data in buffer and waiting for ack.
+ * Will block until get all acks.
+ */
+void OutputStreamImpl::flush() {
+    LOG(DEBUG3, "flush file %s at offset %" PRId64, path.c_str(), cursor);
+    checkStatus();
+
+    try {
+        flushInternal(false);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::flushInternal(bool needSync) {
+    if (lastFlushed == cursor && !needSync) {
+        return;
+    } else {
+        lastFlushed = cursor;
+    }
+
+    if (position > 0) {
+        appendChunkToPacket(&buffer[0], position);
+    }
+
+    /*
+     * if the pipeline and currentPacket are both NULL,
+     * that means the pipeline has been closed and no more data in buffer/packet.
+     * already synced when closing pipeline.
+     */
+    if (!currentPacket && needSync && pipeline) {
+        currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
+                                          nextSeqNo++, checksumSize);
+    }
+
+    lock_guard < mutex > lock(mut);
+
+    if (currentPacket) {
+        currentPacket->setSyncFlag(needSync);
+        sendPacket(currentPacket);
+    }
+
+    if (pipeline) {
+        pipeline->flush();
+    }
+}
+
+/**
+ * return the current file length.
+ * @return current file length.
+ */
+int64_t OutputStreamImpl::tell() {
+    checkStatus();
+    return cursor;
+}
+
+/**
+ * @ref OutputStream::sync
+ */
+void OutputStreamImpl::sync() {
+    LOG(DEBUG3, "sync file %s at offset %" PRId64, path.c_str(), cursor);
+    checkStatus();
+
+    try {
+        flushInternal(true);
+    } catch (...) {
+        setError(current_exception());
+        throw;
+    }
+}
+
+void OutputStreamImpl::completeFile(bool throwError) {
+    steady_clock::time_point start = steady_clock::now();
+
+    while (true) {
+        try {
+            bool success;
+            success = filesystem->complete(path, lastBlock.get());
+
+            if (success) {
+                return;
+            }
+        } catch (HdfsIOException & e) {
+            if (throwError) {
+                NESTED_THROW(HdfsIOException,
+                             "OutputStreamImpl: failed to complete file %s.",
+                             path.c_str());
+            } else {
+                return;
+            }
+        }
+
+        if (closeTimeout > 0) {
+            steady_clock::time_point end = steady_clock::now();
+
+            if (ToMilliSeconds(start, end) >= closeTimeout) {
+                if (throwError) {
+                    THROW(HdfsIOException,
+                          "OutputStreamImpl: timeout when complete file %s, timeout interval %d ms.",
+                          path.c_str(), closeTimeout);
+                } else {
+                    return;
+                }
+            }
+        }
+
+        try {
+            sleep_for(milliseconds(400));
+        } catch (...) {
+        }
+    }
+}
+
+/**
+ * close the stream.
+ */
+void OutputStreamImpl::closePipeline() {
+    lock_guard < mutex > lock(mut);
+
+    if (!pipeline) {
+        return;
+    }
+
+    if (currentPacket) {
+        sendPacket(currentPacket);
+    }
+
+    currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++,
+                                      checksumSize);
+
+    if (syncBlock) {
+        currentPacket->setSyncFlag(syncBlock);
+    }
+
+    lastBlock = pipeline->close(currentPacket);
+    assert(lastBlock);
+    currentPacket.reset();
+    pipeline.reset();
+    filesystem->fsync(path);
+    bytesWritten = 0;
+}
+
+void OutputStreamImpl::close() {
+    exception_ptr e;
+
+    if (closed) {
+        return;
+    }
+
+    try {
+        //pipeline may be broken
+        if (!lastError) {
+            if (lastFlushed != cursor && position > 0) {
+                appendChunkToPacket(&buffer[0], position);
+            }
+
+            if (lastFlushed != cursor && currentPacket) {
+                sendPacket(currentPacket);
+            }
+
+            closePipeline();
+            /*heartBeatStop = true;
+             condHeartBeatSender.notify_all();
+
+             if (heartBeatSender.joinable()) {
+             heartBeatSender.join();
+             }*/
+            completeFile(true);
+        }
+    } catch (...) {
+        e = current_exception();
+    }
+
+    LeaseRenewer::GetLeaseRenewer().StopRenew(filesystem);
+    LOG(DEBUG3, "close file %s for write with length %" PRId64, path.c_str(), cursor);
+    reset();
+
+    if (e) {
+        rethrow_exception(e);
+    }
+}
+
+void OutputStreamImpl::reset() {
+    blockSize = 0;
+    bytesWritten = 0;
+    checksum->reset();
+    chunkSize = 0;
+    chunksPerPacket = 0;
+    closed = true;
+    closeTimeout = 0;
+    conf.reset();
+    currentPacket.reset();
+    cursor = 0;
+    filesystem.reset();
+    heartBeatInterval = 0;
+    isAppend = false;
+    lastBlock.reset();
+    lastError = exception_ptr();
+    lastFlushed = 0;
+    nextSeqNo = 0;
+    packetSize = 0;
+    path.clear();
+    pipeline.reset();
+    position = 0;
+    replication = 0;
+    syncBlock = false;
+}
+
+std::string OutputStreamImpl::toString() {
+    if (path.empty()) {
+        return std::string("OutputStream for path ") + path;
+    } else {
+        return std::string("OutputStream (not opened)");
+    }
+}
+
+/*void OutputStreamImpl::heartBeatSenderRoutine() {
+    assert(heartBeatStop == false);
+
+    while (!heartBeatStop) {
+        try {
+            unique_lock < mutex > lock(mut);
+            condHeartBeatSender.wait_for(lock, milliseconds(1000));
+
+            try {
+                try {
+                    if (pipeline
+                            && ToMilliSeconds(lastSend, steady_clock::now())
+                            >= heartBeatInterval) {
+                        pipeline->send(shared_ptr < Packet > (new Packet()));
+                        lastSend = steady_clock::now();
+                    }
+                } catch (...) {
+                    NESTED_THROW(Hdfs::HdfsIOException, "Failed to send heart beat, path: %s",
+                                 path.c_str());
+                }
+            } catch (...) {
+                lastError = current_exception();
+                throw;
+            }
+        } catch (const std::bad_alloc & e) {
+
+             * keep quiet if we run out of memory, since writing log need memory,
+             * that may cause the process terminated.
+
+            break;
+        } catch (const Hdfs::HdfsException & e) {
+            LOG(LOG_ERROR, "Heart beat thread exit since %s",
+                GetExceptionDetail(e));
+        } catch (const std::exception & e) {
+            LOG(LOG_ERROR, "Heart beat thread exit since %s",
+                e.what());
+        }
+    }
+
+    heartBeatStop = true;
+}*/
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamImpl.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.h b/depends/libhdfs3/src/client/OutputStreamImpl.h
new file mode 100644
index 0000000..4967e0f
--- /dev/null
+++ b/depends/libhdfs3/src/client/OutputStreamImpl.h
@@ -0,0 +1,173 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
+#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_
+
+#include "Atomic.h"
+#include "Checksum.h"
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "FileSystem.h"
+#include "Memory.h"
+#include "OutputStreamInter.h"
+#include "PacketPool.h"
+#include "Permission.h"
+#include "Pipeline.h"
+#include "server/LocatedBlock.h"
+#include "SessionConfig.h"
+#include "Thread.h"
+#ifdef MOCK
+#include "PipelineStub.h"
+#endif
+
+namespace Hdfs {
+namespace Internal {
+/**
+ * A output stream used to write data to hdfs.
+ */
+class OutputStreamImpl: public OutputStreamInter {
+public:
+    OutputStreamImpl();
+
+    ~OutputStreamImpl();
+
+    /**
+     * To create or append a file.
+     * @param fs hdfs file system.
+     * @param path the file path.
+     * @param flag creation flag, can be Create, Append or Create|Overwrite.
+     * @param permission create a new file with given permission.
+     * @param createParent if the parent does not exist, create it.
+     * @param replication create a file with given number of replication.
+     * @param blockSize  create a file with given block size.
+     */
+    void open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
+              const Permission & permission, bool createParent, int replication,
+              int64_t blockSize);
+
+    /**
+     * To append data to file.
+     * @param buf the data used to append.
+     * @param size the data size.
+     */
+    void append(const char * buf, int64_t size);
+
+    /**
+     * Flush all data in buffer and waiting for ack.
+     * Will block until get all acks.
+     */
+    void flush();
+
+    /**
+     * return the current file length.
+     * @return current file length.
+     */
+    int64_t tell();
+
+    /**
+     * @ref OutputStream::sync
+     */
+    void sync();
+
+    /**
+     * close the stream.
+     */
+    void close();
+
+    /**
+     * Output a readable string of this output stream.
+     */
+    std::string toString();
+
+    /**
+     * Keep the last error of this stream.
+     * @error the error to be kept.
+     */
+    void setError(const exception_ptr & error);
+
+private:
+    void appendChunkToPacket(const char * buf, int size);
+    void appendInternal(const char * buf, int64_t size);
+    void checkStatus();
+    void closePipeline();
+    void completeFile(bool throwError);
+    void computePacketChunkSize();
+    void flushInternal(bool needSync);
+    //void heartBeatSenderRoutine();
+    void initAppend();
+    void openInternal(shared_ptr<FileSystemInter> fs, const char * path, int flag,
+                      const Permission & permission, bool createParent, int replication,
+                      int64_t blockSize);
+    void reset();
+    void sendPacket(shared_ptr<Packet> packet);
+    void setupPipeline();
+
+private:
+    //atomic<bool> heartBeatStop;
+    bool closed;
+    bool isAppend;
+    bool syncBlock;
+    //condition_variable condHeartBeatSender;
+    exception_ptr lastError;
+    int checksumSize;
+    int chunkSize;
+    int chunksPerPacket;
+    int closeTimeout;
+    int heartBeatInterval;
+    int packetSize;
+    int position; //cursor in buffer
+    int replication;
+    int64_t blockSize; //max size of block
+    int64_t bytesWritten; //the size of bytes has be written into packet (not include the data in chunk buffer).
+    int64_t cursor; //cursor in file.
+    int64_t lastFlushed; //the position last flushed
+    int64_t nextSeqNo;
+    mutex mut;
+    PacketPool packets;
+    shared_ptr<Checksum> checksum;
+    shared_ptr<FileSystemInter> filesystem;
+    shared_ptr<LocatedBlock> lastBlock;
+    shared_ptr<Packet> currentPacket;
+    shared_ptr<Pipeline> pipeline;
+    shared_ptr<SessionConfig> conf;
+    std::string path;
+    std::vector<char> buffer;
+    steady_clock::time_point lastSend;
+    //thread heartBeatSender;
+
+    friend class Pipeline;
+#ifdef MOCK
+private:
+    Hdfs::Mock::PipelineStub * stub;
+#endif
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMIMPL_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/OutputStreamInter.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStreamInter.h b/depends/libhdfs3/src/client/OutputStreamInter.h
new file mode 100644
index 0000000..9477a0d
--- /dev/null
+++ b/depends/libhdfs3/src/client/OutputStreamInter.h
@@ -0,0 +1,98 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_
+#define _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_
+
+#include "ExceptionInternal.h"
+#include "FileSystemInter.h"
+#include "Memory.h"
+#include "Permission.h"
+
+namespace Hdfs {
+namespace Internal {
+
+/**
+ * A output stream used to write data to hdfs.
+ */
+class OutputStreamInter {
+public:
+    virtual ~OutputStreamInter() {
+    }
+
+    /**
+     * To create or append a file.
+     * @param fs hdfs file system.
+     * @param path the file path.
+     * @param flag creation flag, can be Create, Append or Create|Overwrite.
+     * @param permission create a new file with given permission.
+     * @param createParent if the parent does not exist, create it.
+     * @param replication create a file with given number of replication.
+     * @param blockSize  create a file with given block size.
+     */
+    virtual void open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
+                      const Permission & permission, bool createParent, int replication,
+                      int64_t blockSize) = 0;
+
+    /**
+     * To append data to file.
+     * @param buf the data used to append.
+     * @param size the data size.
+     */
+    virtual void append(const char * buf, int64_t size) = 0;
+
+    /**
+     * Flush all data in buffer and waiting for ack.
+     * Will block until get all acks.
+     */
+    virtual void flush() = 0;
+
+    /**
+     * return the current file length.
+     * @return current file length.
+     */
+    virtual int64_t tell() = 0;
+
+    /**
+     * @ref OutputStream::sync
+     */
+    virtual void sync() = 0;
+
+    /**
+     * close the stream.
+     */
+    virtual void close() = 0;
+
+    virtual std::string toString() = 0;
+
+    virtual void setError(const exception_ptr & error) = 0;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_OUTPUTSTREAMINTER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Packet.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Packet.cpp b/depends/libhdfs3/src/client/Packet.cpp
new file mode 100644
index 0000000..894e6bf
--- /dev/null
+++ b/depends/libhdfs3/src/client/Packet.cpp
@@ -0,0 +1,156 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "BigEndian.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Packet.h"
+#include "PacketHeader.h"
+
+namespace Hdfs {
+namespace Internal {
+
+Packet::Packet() :
+    lastPacketInBlock(false), syncBlock(false), checksumPos(0), checksumSize(0),
+    checksumStart(0), dataPos(0), dataStart(0), headerStart(0), maxChunks(
+        0), numChunks(0), offsetInBlock(0), seqno(HEART_BEAT_SEQNO) {
+    buffer.resize(PacketHeader::GetPkgHeaderSize());
+}
+
+Packet::Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
+               int64_t seqno, int checksumSize) :
+    lastPacketInBlock(false), syncBlock(false), checksumSize(checksumSize), headerStart(0),
+    maxChunks(chunksPerPkt), numChunks(0), offsetInBlock(offsetInBlock), seqno(seqno), buffer(pktSize) {
+    checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
+    dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
+    assert(dataPos >= 0);
+}
+
+void Packet::reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
+                   int64_t seqno, int checksumSize) {
+    lastPacketInBlock = false;
+    syncBlock = false;
+    this->checksumSize = checksumSize;
+    headerStart = 0;
+    maxChunks = chunksPerPkt;
+    numChunks = 0;
+    this->offsetInBlock = offsetInBlock;
+    this->seqno = seqno;
+    checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
+    dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
+
+    if (pktSize > static_cast<int>(buffer.size())) {
+        buffer.resize(pktSize);
+    }
+
+    assert(dataPos >= 0);
+}
+
+void Packet::addChecksum(uint32_t checksum) {
+    if (checksumPos + static_cast<int>(sizeof(uint32_t)) > dataStart) {
+        THROW(HdfsIOException,
+              "Packet: failed to add checksum into packet, checksum is too large");
+    }
+
+    WriteBigEndian32ToArray(checksum, &buffer[checksumPos]);
+    checksumPos += checksumSize;
+}
+
+void Packet::addData(const char * buf, int size) {
+    if (size + dataPos > static_cast<int>(buffer.size())) {
+        THROW(HdfsIOException,
+              "Packet: failed add data to packet, packet size is too small");
+    }
+
+    memcpy(&buffer[dataPos], buf, size);
+    dataPos += size;
+    assert(dataPos >= 0);
+}
+
+void Packet::setSyncFlag(bool sync) {
+    syncBlock = sync;
+}
+
+void Packet::increaseNumChunks() {
+    ++numChunks;
+}
+
+bool Packet::isFull() {
+    return numChunks >= maxChunks;
+}
+
+bool Packet::isHeartbeat() {
+    return HEART_BEAT_SEQNO == seqno;
+}
+
+void Packet::setLastPacketInBlock(bool lastPacket) {
+    lastPacketInBlock = lastPacket;
+}
+
+int Packet::getDataSize() {
+    return dataPos - dataStart;
+}
+
+int64_t Packet::getLastByteOffsetBlock() {
+    assert(offsetInBlock >= 0 && dataPos >= dataStart);
+    assert(dataPos - dataStart <= maxChunks * static_cast<int>(buffer.size()));
+    return offsetInBlock + dataPos - dataStart;
+}
+
+const ConstPacketBuffer Packet::getBuffer() {
+    /*
+     * Once this is called, no more data can be added to the packet.
+     * This is called only when the packet is ready to be sent.
+     */
+    int dataLen = dataPos - dataStart;
+    int checksumLen = checksumPos - checksumStart;
+
+    if (checksumPos != dataStart) {
+        /*
+         * move the checksum to cover the gap.
+         * This can happen for the last packet.
+         */
+        memmove(&buffer[dataStart - checksumLen], &buffer[checksumStart],
+                checksumLen);
+        headerStart = dataStart - checksumPos;
+        checksumStart += dataStart - checksumPos;
+        checksumPos = dataStart;
+    }
+
+    assert(dataPos >= 0);
+    int pktLen = dataLen + checksumLen;
+    PacketHeader header(pktLen + sizeof(int32_t)
+                        /* why we add 4 bytes? Because the server will reduce 4 bytes. -_-*/
+                        , offsetInBlock, seqno, lastPacketInBlock, dataLen);
+    header.writeInBuffer(&buffer[headerStart],
+                         PacketHeader::GetPkgHeaderSize());
+    return ConstPacketBuffer(&buffer[headerStart],
+                             PacketHeader::GetPkgHeaderSize() + pktLen);
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Packet.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Packet.h b/depends/libhdfs3/src/client/Packet.h
new file mode 100644
index 0000000..d51bd1b
--- /dev/null
+++ b/depends/libhdfs3/src/client/Packet.h
@@ -0,0 +1,131 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PACKET_H_
+#define _HDFS_LIBHDFS3_CLIENT_PACKET_H_
+
+#include <stdint.h>
+#include <vector>
+
+#define HEART_BEAT_SEQNO -1
+
+namespace Hdfs {
+namespace Internal {
+
+class ConstPacketBuffer {
+public:
+    ConstPacketBuffer(const char * buf, int size) :
+        buffer(buf), size(size) {
+    }
+
+    const char * getBuffer() const {
+        return buffer;
+    }
+
+    const int getSize() const {
+        return size;
+    }
+
+private:
+    const char * buffer;
+    const int size;
+};
+
+/**
+ * buffer is pointed into like follows:
+ *  (C is checksum data, D is payload data)
+ *
+ * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
+ *       ^    ^               ^               ^
+ *       |    checksumPos     dataStart       dataPos
+ *   checksumStart
+ */
+class Packet {
+public:
+    /**
+     * create a heart beat packet
+     */
+    Packet();
+
+    /**
+     * create a new packet
+     */
+    Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+
+    void reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock, int64_t seqno, int checksumSize);
+
+    void addChecksum(uint32_t checksum);
+
+    void addData(const char * buf, int size);
+
+    void setSyncFlag(bool sync);
+
+    void increaseNumChunks();
+
+    bool isFull();
+
+    bool isHeartbeat();
+
+    void setLastPacketInBlock(bool lastPacket);
+
+    int getDataSize();
+
+    const ConstPacketBuffer getBuffer();
+
+    int64_t getLastByteOffsetBlock();
+
+    int64_t getSeqno() const {
+        return seqno;
+    }
+
+    bool isLastPacketInBlock() const {
+        return lastPacketInBlock;
+    }
+
+    int64_t getOffsetInBlock() const {
+        return offsetInBlock;
+    }
+
+private:
+    bool lastPacketInBlock; // is this the last packet in block
+    bool syncBlock; // sync block to disk?
+    int checksumPos;
+    int checksumSize;
+    int checksumStart;
+    int dataPos;
+    int dataStart;
+    int headerStart;
+    int maxChunks; // max chunks in packet
+    int numChunks; // number of chunks currently in packet
+    int64_t offsetInBlock; // offset in block
+    int64_t seqno; // sequence number of packet in block
+    std::vector<char> buffer;
+};
+
+}
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_PACKET_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketHeader.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PacketHeader.cpp b/depends/libhdfs3/src/client/PacketHeader.cpp
new file mode 100644
index 0000000..b0bd687
--- /dev/null
+++ b/depends/libhdfs3/src/client/PacketHeader.cpp
@@ -0,0 +1,126 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "BigEndian.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "PacketHeader.h"
+
+namespace Hdfs {
+namespace Internal {
+
+int PacketHeader::PkgHeaderSize = PacketHeader::CalcPkgHeaderSize();
+
+int PacketHeader::CalcPkgHeaderSize() {
+    PacketHeaderProto header;
+    header.set_offsetinblock(0);
+    header.set_datalen(0);
+    header.set_lastpacketinblock(false);
+    header.set_seqno(0);
+    return header.ByteSize() + sizeof(int32_t) /*packet length*/ + sizeof(int16_t)/* proto length */;
+}
+
+int PacketHeader::GetPkgHeaderSize() {
+    return PkgHeaderSize;
+}
+
+PacketHeader::PacketHeader() :
+    packetLen(0) {
+}
+
+PacketHeader::PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno,
+                           bool lastPacketInBlock, int dataLen) :
+    packetLen(packetLen) {
+    proto.set_offsetinblock(offsetInBlock);
+    proto.set_seqno(seqno);
+    proto.set_lastpacketinblock(lastPacketInBlock);
+    proto.set_datalen(dataLen);
+}
+
+int PacketHeader::getDataLen() {
+    return proto.datalen();
+}
+
+bool PacketHeader::isLastPacketInBlock() {
+    return proto.lastpacketinblock();
+}
+
+bool PacketHeader::sanityCheck(int64_t lastSeqNo) {
+    // We should only have a non-positive data length for the last packet
+    if (proto.datalen() <= 0 && !proto.lastpacketinblock())
+        return false;
+
+    // The last packet should not contain data
+    if (proto.lastpacketinblock() && proto.datalen() != 0)
+        return false;
+
+    // Seqnos should always increase by 1 with each packet received
+    if (proto.seqno() != lastSeqNo + 1)
+        return false;
+
+    return true;
+}
+
+int64_t PacketHeader::getSeqno() {
+    return proto.seqno();
+}
+
+int64_t PacketHeader::getOffsetInBlock() {
+    return proto.offsetinblock();
+}
+
+int PacketHeader::getPacketLen() {
+    return packetLen;
+}
+
+void PacketHeader::readFields(const char * buf, size_t size) {
+    int16_t protoLen;
+    assert(size > sizeof(packetLen) + sizeof(protoLen));
+    packetLen = ReadBigEndian32FromArray(buf);
+    protoLen = ReadBigEndian16FromArray(buf + sizeof(packetLen));
+
+    if (packetLen < static_cast<int>(sizeof(int32_t)) || protoLen < 0
+            || static_cast<int>(sizeof(packetLen) + sizeof(protoLen)) + protoLen > static_cast<int>(size)) {
+        THROW(HdfsIOException, "Invalid PacketHeader, packetLen is %d, protoLen is %hd, buf size is %zu", packetLen,
+              protoLen, size);
+    }
+
+    if (!proto.ParseFromArray(buf + sizeof(packetLen) + sizeof(protoLen),
+                              protoLen)) {
+        THROW(HdfsIOException,
+              "PacketHeader cannot parse PacketHeaderProto from datanode response.");
+    }
+}
+
+void PacketHeader::writeInBuffer(char * buf, size_t size) {
+    buf = WriteBigEndian32ToArray(packetLen, buf);
+    buf = WriteBigEndian16ToArray(proto.ByteSize(), buf);
+    proto.SerializeToArray(buf, size - sizeof(int32_t) - sizeof(int16_t));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketHeader.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PacketHeader.h b/depends/libhdfs3/src/client/PacketHeader.h
new file mode 100644
index 0000000..940c0c4
--- /dev/null
+++ b/depends/libhdfs3/src/client/PacketHeader.h
@@ -0,0 +1,68 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_
+#define _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_
+
+#include "datatransfer.pb.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class PacketHeader {
+public:
+    PacketHeader();
+    PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno,
+                 bool lastPacketInBlock, int dataLen);
+    bool isLastPacketInBlock();
+    bool sanityCheck(int64_t lastSeqNo);
+    int getDataLen();
+    int getPacketLen();
+    int64_t getOffsetInBlock();
+    int64_t getSeqno();
+    void readFields(const char * buf, size_t size);
+    /**
+     * Write the header into the buffer.
+     * This requires that PKT_HEADER_LEN bytes are available.
+     */
+    void writeInBuffer(char * buf, size_t size);
+
+public:
+    static int GetPkgHeaderSize();
+    static int CalcPkgHeaderSize();
+
+private:
+    static int PkgHeaderSize;
+private:
+    int32_t packetLen;
+    PacketHeaderProto proto;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PACKETHEADER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketPool.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PacketPool.cpp b/depends/libhdfs3/src/client/PacketPool.cpp
new file mode 100644
index 0000000..828c1ec
--- /dev/null
+++ b/depends/libhdfs3/src/client/PacketPool.cpp
@@ -0,0 +1,63 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Logger.h"
+#include "Packet.h"
+#include "PacketPool.h"
+
+namespace Hdfs {
+namespace Internal {
+
+PacketPool::PacketPool(int size) :
+    maxSize(size) {
+}
+
+shared_ptr<Packet> PacketPool::getPacket(int pktSize, int chunksPerPkt,
+        int64_t offsetInBlock, int64_t seqno, int checksumSize) {
+    if (packets.empty()) {
+        return shared_ptr<Packet>(
+                   new Packet(pktSize, chunksPerPkt, offsetInBlock, seqno,
+                              checksumSize));
+    } else {
+        shared_ptr<Packet> retval = packets.front();
+        packets.pop_front();
+        retval->reset(pktSize, chunksPerPkt, offsetInBlock, seqno,
+                      checksumSize);
+        return retval;
+    }
+}
+
+void PacketPool::relesePacket(shared_ptr<Packet> packet) {
+    if (static_cast<int>(packets.size()) >= maxSize) {
+        return;
+    }
+
+    packets.push_back(packet);
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PacketPool.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PacketPool.h b/depends/libhdfs3/src/client/PacketPool.h
new file mode 100644
index 0000000..6194911
--- /dev/null
+++ b/depends/libhdfs3/src/client/PacketPool.h
@@ -0,0 +1,71 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_
+#define _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_
+#include "Memory.h"
+
+#include <deque>
+
+namespace Hdfs {
+namespace Internal {
+
+class Packet;
+
+/*
+ * A simple packet pool implementation.
+ *
+ * Packet is created here if no packet is available.
+ * And then add to Pipeline's packet queue to wait for the ack.
+ * The Pipeline's packet queue size is not larger than the PacketPool's max size,
+ * otherwise the write operation will be pending for the ack.
+ * Once the ack is received, packet will reutrn back to the PacketPool to reuse.
+ */
+class PacketPool {
+public:
+    PacketPool(int size);
+    shared_ptr<Packet> getPacket(int pktSize, int chunksPerPkt,
+                                 int64_t offsetInBlock, int64_t seqno, int checksumSize);
+    void relesePacket(shared_ptr<Packet> packet);
+
+    void setMaxSize(int size) {
+        this->maxSize = size;
+    }
+
+    int getMaxSize() const {
+        return maxSize;
+    }
+
+private:
+    int maxSize;
+    std::deque<shared_ptr<Packet> > packets;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PACKETPOOL_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PeerCache.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PeerCache.cpp b/depends/libhdfs3/src/client/PeerCache.cpp
new file mode 100644
index 0000000..98884fe
--- /dev/null
+++ b/depends/libhdfs3/src/client/PeerCache.cpp
@@ -0,0 +1,82 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <inttypes.h>
+
+#include "client/PeerCache.h"
+
+namespace Hdfs {
+namespace Internal {
+
+LruMap<std::string, PeerCache::value_type> PeerCache::Map;
+
+PeerCache::PeerCache(const SessionConfig& conf)
+    : cacheSize(conf.getSocketCacheCapacity()),
+      expireTimeInterval(conf.getSocketCacheExpiry()) {
+  Map.setMaxSize(cacheSize);
+}
+
+std::string PeerCache::buildKey(const DatanodeInfo& datanode) {
+  std::stringstream ss;
+  ss.imbue(std::locale::classic());
+  ss << datanode.getIpAddr() << datanode.getXferPort()
+     << datanode.getDatanodeId();
+  return ss.str();
+}
+
+shared_ptr<Socket> PeerCache::getConnection(const DatanodeInfo& datanode) {
+  std::string key = buildKey(datanode);
+  value_type value;
+  int64_t elipsed;
+
+  if (!Map.findAndErase(key, &value)) {
+    LOG(DEBUG1, "PeerCache miss for datanode %s uuid(%s).",
+        datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
+    return shared_ptr<Socket>();
+  } else if ((elipsed = ToMilliSeconds(value.second, steady_clock::now())) >
+             expireTimeInterval) {
+    LOG(DEBUG1, "PeerCache expire for datanode %s uuid(%s).",
+        datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
+    return shared_ptr<Socket>();
+  }
+
+  LOG(DEBUG1, "PeerCache hit for datanode %s uuid(%s), elipsed %" PRId64,
+      datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str(),
+      elipsed);
+  return value.first;
+}
+
+void PeerCache::addConnection(shared_ptr<Socket> peer,
+                              const DatanodeInfo& datanode) {
+  std::string key = buildKey(datanode);
+  value_type value(peer, steady_clock::now());
+  Map.insert(key, value);
+  LOG(DEBUG1, "PeerCache add for datanode %s uuid(%s).",
+      datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
+}
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/PeerCache.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/PeerCache.h b/depends/libhdfs3/src/client/PeerCache.h
new file mode 100644
index 0000000..6c2352c
--- /dev/null
+++ b/depends/libhdfs3/src/client/PeerCache.h
@@ -0,0 +1,65 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_
+#define _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_
+
+#include <string>
+#include <utility>
+
+#include "common/DateTime.h"
+#include "common/LruMap.h"
+#include "common/Memory.h"
+#include "common/SessionConfig.h"
+#include "network/Socket.h"
+#include "server/DatanodeInfo.h"
+
+namespace Hdfs {
+namespace Internal {
+
+class PeerCache {
+ public:
+  explicit PeerCache(const SessionConfig& conf);
+
+  shared_ptr<Socket> getConnection(const DatanodeInfo& datanode);
+
+  void addConnection(shared_ptr<Socket> peer, const DatanodeInfo& datanode);
+
+  typedef std::pair<shared_ptr<Socket>, steady_clock::time_point> value_type;
+
+ private:
+  std::string buildKey(const DatanodeInfo& datanode);
+
+ private:
+  const int cacheSize;
+  int64_t expireTimeInterval;  // milliseconds
+  static LruMap<std::string, value_type> Map;
+};
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_PEERCACHE_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Permission.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Permission.cpp b/depends/libhdfs3/src/client/Permission.cpp
new file mode 100644
index 0000000..09db3ee
--- /dev/null
+++ b/depends/libhdfs3/src/client/Permission.cpp
@@ -0,0 +1,48 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Permission.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+namespace Hdfs {
+
+Permission::Permission(uint16_t mode) {
+    if (mode >> 10) {
+        THROW(InvalidParameter,
+              "Invalid parameter: cannot convert %u to \"Permission\"",
+              static_cast<unsigned int>(mode));
+    }
+
+    userAction = (Action)((mode >> 6) & 7);
+    groupAction = (Action)((mode >> 3) & 7);
+    otherAction = (Action)(mode & 7);
+    stickyBit = (((mode >> 9) & 1) == 1);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/src/client/Permission.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Permission.h b/depends/libhdfs3/src/client/Permission.h
new file mode 100644
index 0000000..895868d
--- /dev/null
+++ b/depends/libhdfs3/src/client/Permission.h
@@ -0,0 +1,224 @@
+/********************************************************************
+ * Copyright (c) 2013 - 2014, Pivotal Inc.
+ * All rights reserved.
+ *
+ * Author: Zhanwei Wang
+ ********************************************************************/
+/********************************************************************
+ * 2014 -
+ * open source under Apache License Version 2.0
+ ********************************************************************/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+#define _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+
+#include <string>
+
+namespace Hdfs {
+
+/**
+ * Action is used to describe a action the user is permitted to apply on a file.
+ */
+enum Action {
+    NONE, //("---"),
+    EXECUTE, //("--x"),
+    WRITE, //("-w-"),
+    WRITE_EXECUTE, //("-wx"),
+    READ, //("r--"),
+    READ_EXECUTE, //("r-x"),
+    READ_WRITE, //("rw-"),
+    ALL //("rwx");
+};
+
+/**
+ * To test Action a if implies Action b
+ * @param a Action to be tested.
+ * @param b Action target.
+ * @return return true if a implies b.
+ */
+static inline bool implies(const Action & a, const Action & b) {
+    return (a & b) == b;
+}
+
+/**
+ * To construct a new Action using a and b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator &(const Action & a, const Action & b) {
+    return (Action)(((unsigned int) a) & (unsigned int) b);
+}
+/**
+ * To construct a new Action using a or b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator |(const Action & a, const Action & b) {
+    return (Action)(((unsigned int) a) | (unsigned int) b);
+}
+/**
+ * To construct a new Action of complementary of a given Action
+ * @param a Action to be used.
+ * @return return a new Action
+ */
+static inline Action operator ~(const Action & a) {
+    return (Action)(7 - (unsigned int) a);
+}
+
+/**
+ * To convert a Action to a readable string.
+ * @param a the Action to be convert.
+ * @return a readable string
+ */
+static inline std::string toString(const Action & a) {
+    switch (a) {
+    case NONE:
+        return "---";
+
+    case EXECUTE:
+        return "--x";
+
+    case WRITE:
+        return "-w-";
+
+    case WRITE_EXECUTE:
+        return "-wx";
+
+    case READ:
+        return "r--";
+
+    case READ_EXECUTE:
+        return "r-x";
+
+    case READ_WRITE:
+        return "rw-";
+
+    case ALL:
+        return "rwx";
+    }
+}
+
+/**
+ * Permission is used to describe a file permission.
+ */
+class Permission {
+public:
+    /**
+     * To construct a Permission.
+     * @param u owner permission.
+     * @param g group permission.
+     * @param o other user permission.
+     */
+    Permission(const Action & u, const Action & g, const Action & o) :
+        userAction(u), groupAction(g), otherAction(o), stickyBit(false) {
+    }
+
+    /**
+     * To construct a Permission from a uint16.
+     * @param mode permission flag.
+     */
+    Permission(uint16_t mode);
+
+public:
+    /**
+     * To get group permission
+     * @return the group permission
+     */
+    Action getGroupAction() const {
+        return groupAction;
+    }
+
+    /**
+     * To set group permission
+     * @param groupAction the group permission
+     */
+    void setGroupAction(Action groupAction) {
+        this->groupAction = groupAction;
+    }
+
+    /**
+     * To get other user permission
+     * @return other user permission
+     */
+    Action getOtherAction() const {
+        return otherAction;
+    }
+
+    /**
+     * To set other user permission
+     * @param otherAction other user permission
+     */
+    void setOtherAction(Action otherAction) {
+        this->otherAction = otherAction;
+    }
+
+    /**
+     * To get owner permission
+     * @return the owner permission
+     */
+    Action getUserAction() const {
+        return userAction;
+    }
+
+    /**
+     * To set owner permission
+     * @param userAction the owner permission
+     */
+    void setUserAction(Action userAction) {
+        this->userAction = userAction;
+    }
+
+    /**
+     * To convert a Permission to a readable string
+     * @return a readable string
+     */
+    std::string toString() const {
+        return Hdfs::toString(userAction) + Hdfs::toString(groupAction)
+               + Hdfs::toString(otherAction);
+    }
+
+    /**
+     * To convert a Permission to a uint16 flag
+     * @return a uint16 flag
+     */
+    uint16_t toShort() const {
+        return (uint16_t)((((uint16_t) userAction) << 6)
+                          + (((uint16_t) groupAction) << 3) + (((uint16_t) otherAction))
+                          + ((stickyBit ? 1 << 9 : 0)));
+    }
+
+    bool operator ==(const Permission & other) const {
+        return userAction == other.userAction
+               && groupAction == other.groupAction
+               && otherAction == other.otherAction
+               && stickyBit == other.stickyBit;
+    }
+
+private:
+    Action userAction;
+    Action groupAction;
+    Action otherAction;
+
+    bool stickyBit;
+};
+
+}
+#endif /* _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_ */


Mime
View raw message