orc-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From majetideepak <...@git.apache.org>
Subject [GitHub] orc pull request #128: ORC-178 Implement Basic C++ Writer and Writer Option
Date Tue, 06 Jun 2017 21:20:22 GMT
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/128#discussion_r120476831
  
    --- Diff: c++/src/Writer.cc ---
    @@ -0,0 +1,659 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +#include "orc/Common.hh"
    +#include "orc/OrcFile.hh"
    +
    +#include "ColumnWriter.hh"
    +#include "Timezone.hh"
    +
    +#include <memory>
    +
    +namespace orc {
    +
    +  struct WriterOptionsPrivate {
    +    uint64_t stripeSize;
    +    uint64_t blockSize;
    +    uint64_t rowIndexStride;
    +    uint64_t bufferSize;
    +    bool blockPadding;
    +    CompressionKind compression;
    +    EncodingStrategy encodingStrategy;
    +    CompressionStrategy compressionStrategy;
    +    MemoryPool *memoryPool;
    +    WriterVersion version;
    +    double paddingTolerance;
    +    std::ostream* errorStream;
    +    RleVersion rleVersion;
    +    double dictionaryKeySizeThreshold;
    +    bool enableStats;
    +    bool enableStrStatsCmp;
    +    bool enableIndex;
    +    const Timezone* timezone;
    +
    +    WriterOptionsPrivate() {
    +      stripeSize = 64 * 1024 * 1024; // 64M
    +      blockSize = 256 * 1024; // 256K
    +      rowIndexStride = 10000;
    +      bufferSize = 4 * 1024 * 1024; // 4M
    +      blockPadding = false;
    +      compression = CompressionKind_ZLIB;
    +      encodingStrategy = EncodingStrategy_SPEED;
    +      compressionStrategy = CompressionStrategy_SPEED;
    +      memoryPool = getDefaultPool();
    +      version = WriterVersion_ORC_135;
    +      paddingTolerance = 0.0;
    +      errorStream = &std::cerr;
    +      rleVersion = RleVersion_1;
    +      dictionaryKeySizeThreshold = 0.0;
    +      enableStats = true;
    +      enableStrStatsCmp = false;
    +      enableIndex = true;
    +      timezone = &getLocalTimezone();
    +    }
    +  };
    +
    +  WriterOptions::WriterOptions():
    +    privateBits(std::unique_ptr<WriterOptionsPrivate>
    +                (new WriterOptionsPrivate())) {
    +    // PASS
    +  }
    +
    +  WriterOptions::WriterOptions(const WriterOptions& rhs):
    +    privateBits(std::unique_ptr<WriterOptionsPrivate>
    +                (new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
    +    // PASS
    +  }
    +
    +  WriterOptions::WriterOptions(WriterOptions& rhs) {
    +    // swap privateBits with rhs
    +    WriterOptionsPrivate* l = privateBits.release();
    +    privateBits.reset(rhs.privateBits.release());
    +    rhs.privateBits.reset(l);
    +  }
    +
    +  WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
    +    if (this != &rhs) {
    +      privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get())));
    +    }
    +    return *this;
    +  }
    +
    +  WriterOptions::~WriterOptions() {
    +    // PASS
    +  }
    +
    +  WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
    +    privateBits->stripeSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getStripeSize() const {
    +    return privateBits->stripeSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setBlockSize(uint64_t size) {
    +    privateBits->blockSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getBlockSize() const {
    +    return privateBits->blockSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
    +    privateBits->rowIndexStride = stride;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getRowIndexStride() const {
    +    return privateBits->rowIndexStride;
    +  }
    +
    +  WriterOptions& WriterOptions::setBufferSize(uint64_t size) {
    +    privateBits->bufferSize = size;
    +    return *this;
    +  }
    +
    +  uint64_t WriterOptions::getBufferSize() const {
    +    return privateBits->bufferSize;
    +  }
    +
    +  WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
    +    privateBits->dictionaryKeySizeThreshold = val;
    +    return *this;
    +  }
    +
    +  double WriterOptions::getDictionaryKeySizeThreshold() const {
    +    return privateBits->dictionaryKeySizeThreshold;
    +  }
    +
    +  WriterOptions& WriterOptions::setBlockPadding(bool padding) {
    +    privateBits->blockPadding = padding;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getBlockPadding() const {
    +    return privateBits->blockPadding;
    +  }
    +
    +  WriterOptions& WriterOptions::setRleVersion(RleVersion version) {
    +    privateBits->rleVersion = version;
    +    return *this;
    +  }
    +
    +  RleVersion WriterOptions::getRleVersion() const {
    +    return privateBits->rleVersion;
    +  }
    +
    +  WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
    +    privateBits->compression = comp;
    +    return *this;
    +  }
    +
    +  CompressionKind WriterOptions::getCompression() const {
    +    return privateBits->compression;
    +  }
    +
    +  WriterOptions& WriterOptions::setEncodingStrategy(EncodingStrategy strategy) {
    +    privateBits->encodingStrategy = strategy;
    +    return *this;
    +  }
    +
    +  EncodingStrategy WriterOptions::getEncodingStrategy() const {
    +    return privateBits->encodingStrategy;
    +  }
    +
    +  WriterOptions& WriterOptions::setCompressionStrategy(
    +    CompressionStrategy strategy) {
    +    privateBits->compressionStrategy = strategy;
    +    return *this;
    +  }
    +
    +  CompressionStrategy WriterOptions::getCompressionStrategy() const {
    +    return privateBits->compressionStrategy;
    +  }
    +
    +  WriterOptions& WriterOptions::setWriterVersion(WriterVersion version) {
    +    privateBits->version = version;
    +    return *this;
    +  }
    +
    +  WriterVersion WriterOptions::getWriterVersion() const {
    +    return privateBits->version;
    +  }
    +
    +  WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
    +    privateBits->paddingTolerance = tolerance;
    +    return *this;
    +  }
    +
    +  double WriterOptions::getPaddingTolerance() const {
    +    return privateBits->paddingTolerance;
    +  }
    +
    +  WriterOptions& WriterOptions::setMemoryPool(MemoryPool * memoryPool) {
    +    privateBits->memoryPool = memoryPool;
    +    return *this;
    +  }
    +
    +  MemoryPool * WriterOptions::getMemoryPool() const {
    +    return privateBits->memoryPool;
    +  }
    +
    +  WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
    +    privateBits->errorStream = &errStream;
    +    return *this;
    +  }
    +
    +  std::ostream * WriterOptions::getErrorStream() const {
    +    return privateBits->errorStream;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableStats(bool enable) {
    +    privateBits->enableStats = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableStats() const {
    +    return privateBits->enableStats;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableStrStatsCmp(bool enable) {
    +    privateBits->enableStrStatsCmp = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableStrStatsCmp() const {
    +    return privateBits->enableStrStatsCmp;
    +  }
    +
    +  WriterOptions& WriterOptions::setEnableIndex(bool enable) {
    +    privateBits->enableIndex = enable;
    +    return *this;
    +  }
    +
    +  bool WriterOptions::getEnableIndex() const {
    +    return privateBits->enableIndex;
    +  }
    +
    +  const Timezone* WriterOptions::getTimezone() const {
    +    return privateBits->timezone;
    +  }
    +
    +  WriterOptions&  WriterOptions::setTimezone(const std::string& zone) {
    +    privateBits->timezone = &getTimezoneByName(zone);
    +    return *this;
    +  }
    +
    +  Writer::~Writer() {
    +    // PASS
    +  }
    +
    +  class WriterImpl : public Writer {
    +  private:
    +    std::unique_ptr<ColumnWriter> columnWriter;
    +    std::unique_ptr<BufferedOutputStream> compressionStream;
    +    std::unique_ptr<BufferedOutputStream> bufferedStream;
    +    std::unique_ptr<StreamsFactory> streamsFactory;
    +    OutputStream * outStream;
    +    WriterOptions options;
    +    const Type& type;
    +    uint64_t stripeRows, totalRows, indexRows;
    +    uint64_t currentOffset;
    +    proto::Footer fileFooter;
    +    proto::PostScript postScript;
    +    proto::StripeInformation stripeInfo;
    +    proto::Metadata metadata;
    +
    +    static const char * magicId;
    +
    +  public:
    +    WriterImpl(
    +               const Type& type,
    +               OutputStream* stream,
    +               const WriterOptions& options);
    +
    +    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
    +                                                            const override;
    +
    +    void add(ColumnVectorBatch& rowsToAdd) override;
    +
    +    void close() override;
    +
    +  private:
    +    void Init();
    +    void InitStripe();
    +    void WriteStripe();
    +    void WriteMetadata();
    +    void WriteFileFooter();
    +    void WritePostscript();
    +    void BuildFooterType(const Type& t, proto::Footer& footer, uint32_t&
index);
    +    static proto::CompressionKind convertCompressionKind(
    +                                                  const CompressionKind& kind);
    +  };
    +
    +  const char * WriterImpl::magicId = "ORC";
    +
    +  WriterImpl::WriterImpl(
    +                         const Type& t,
    +                         OutputStream * stream,
    +                         const WriterOptions& opts) :
    +                         outStream(stream),
    +                         options(opts),
    +                         type(t) {
    +    streamsFactory = createStreamsFactory(options, outStream);
    +    columnWriter = buildWriter(type, *streamsFactory, options);
    +    stripeRows = totalRows = indexRows = 0;
    +    currentOffset = 0;
    +
    +    uint64_t bufferCapacity = 4 * 1024 * 1024; // 4M
    +    compressionStream = createCompressor(
    +                                  options.getCompression(),
    +                                  outStream,
    +                                  options.getCompressionStrategy(),
    +                                  bufferCapacity,
    +                                  options.getBlockSize(),
    +                                  *options.getMemoryPool());
    +    bufferedStream.reset(new BufferedOutputStream(
    +                                            *options.getMemoryPool(),
    +                                            outStream,
    +                                            bufferCapacity,
    +                                            options.getBlockSize()));
    +
    +    Init();
    +  }
    +
    +  std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size)
    +                                                                         const {
    +    return type.createRowBatch(size, *options.getMemoryPool());
    +  }
    +
    +  void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
    +    if (options.getEnableIndex()) {
    +      uint64_t pos = 0;
    +      uint64_t chunkSize = 0;
    +      uint64_t rowIndexStride = options.getRowIndexStride();
    +      while (pos < rowsToAdd.numElements) {
    +        chunkSize = std::min(rowsToAdd.numElements - pos,
    +                             rowIndexStride - indexRows);
    +        columnWriter->add(rowsToAdd, pos, chunkSize);
    +
    +        pos += chunkSize;
    +        indexRows += chunkSize;
    +        stripeRows += chunkSize;
    +
    +        if (indexRows >= rowIndexStride) {
    +          columnWriter->createRowIndexEntry();
    +          indexRows = 0;
    +        }
    +      }
    +    } else {
    +      stripeRows += rowsToAdd.numElements;
    +      columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
    +    }
    +
    +    if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
    +      WriteStripe();
    +    }
    --- End diff --
    
    throw an exception() otherwise indicating insufficient StripeSize


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message