orc-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From majetideepak <...@git.apache.org>
Subject [GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream
Date Thu, 18 May 2017 21:51:01 GMT
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117354986
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1
: 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    --- End diff --
    
    Use `ParseError` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message