orc-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From majetideepak <...@git.apache.org>
Subject [GitHub] orc pull request #149: ORC-224: Implement column writers of primitive types
Date Mon, 21 Aug 2017 19:31:25 GMT
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/149#discussion_r134308863
  
    --- Diff: c++/src/ColumnWriter.cc ---
    @@ -468,25 +472,1099 @@ namespace orc {
         rleEncoder->recordPosition(rowIndexPosition.get());
       }
     
    -  std::unique_ptr<ColumnWriter> buildWriter(
    -                                            const Type& type,
    -                                            const StreamsFactory& factory,
    -                                            const WriterOptions& options) {
    -    switch (static_cast<int64_t>(type.getKind())) {
    -      case STRUCT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new StructColumnWriter(
    -                                 type,
    -                                 factory,
    -                                 options));
    -      case INT:
    -      case LONG:
    -      case SHORT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new IntegerColumnWriter(
    -                                  type,
    -                                  factory,
    -                                  options));
    +  class ByteColumnWriter : public ColumnWriter {
    +  public:
    +    ByteColumnWriter(const Type& type,
    +                     const StreamsFactory& factory,
    +                     const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +            std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> byteRleEncoder;
    +  };
    +
    +  ByteColumnWriter::ByteColumnWriter(
    +                        const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                             ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +                                  factory.createStream(proto::Stream_Kind_DATA);
    +    byteRleEncoder = createByteRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                             uint64_t offset,
    +                             uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch =
    +                               dynamic_cast<LongVectorBatch&>(rowBatch);
    +
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    byteRleEncoder->add(byteData, numValues, notNull);
    +
    +    IntegerColumnStatisticsImpl* intStats =
    +        dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        intStats->increase(1);
    +        intStats->update(static_cast<int64_t>(byteData[i]), 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    intStats->setHasNull(hasNull);
    +  }
    +
    +  void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(byteRleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t ByteColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += byteRleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void ByteColumnWriter::getColumnEncoding(
    +    std::vector<proto::ColumnEncoding>& encodings) const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void ByteColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    byteRleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class BooleanColumnWriter : public ColumnWriter {
    +  public:
    +    BooleanColumnWriter(const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> rleEncoder;
    +  };
    +
    +  BooleanColumnWriter::BooleanColumnWriter(
    +                           const Type& type,
    +                           const StreamsFactory& factory,
    +                           const WriterOptions& options) :
    +                               ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +      factory.createStream(proto::Stream_Kind_DATA);
    +    rleEncoder = createBooleanRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                uint64_t offset,
    +                                uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    rleEncoder->add(byteData, numValues, notNull);
    +
    +    BooleanColumnStatisticsImpl* boolStats =
    +        dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        boolStats->increase(1);
    +        boolStats->update(byteData[i], 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    boolStats->setHasNull(hasNull);
    +  }
    +
    +  void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(rleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t BooleanColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += rleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void BooleanColumnWriter::getColumnEncoding(
    +                       std::vector<proto::ColumnEncoding>& encodings) const
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void BooleanColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    rleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class DoubleColumnWriter : public ColumnWriter {
    +  public:
    +    DoubleColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options,
    +                       bool isFloat);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    bool isFloat;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    DataBuffer<char> buffer;
    +  };
    +
    +  DoubleColumnWriter::DoubleColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options,
    +                          bool isFloatType) :
    +                              ColumnWriter(type, factory, options),
    +                              isFloat(isFloatType),
    +                              buffer(*options.getMemoryPool()) {
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +                             factory.createStream(proto::Stream_Kind_DATA)));
    +    buffer.resize(isFloat ? 4 : 8);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    const DoubleVectorBatch& dblBatch =
    +                             dynamic_cast<const DoubleVectorBatch&>(rowBatch);
    +
    +    const double* doubleData = dblBatch.data.data() + offset;
    +    const char* notNull = dblBatch.hasNulls ?
    +                          dblBatch.notNull.data() + offset : nullptr;
    +
    +    size_t bytes = isFloat ? 4 : 8;
    +    char* data = buffer.data();
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        if (isFloat) {
    +          // to avoid float-double cast
    +          const int32_t* intBits =
    +            reinterpret_cast<const int32_t*>(&static_cast<const float&>(
    +              doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        } else {
    +          const int64_t* intBits =
    +            reinterpret_cast<const int64_t*>(&(doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        }
    +        dataStream->write(data, bytes);
    +      }
    +    }
    +
    +    DoubleColumnStatisticsImpl* doubleStats =
    +        dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        doubleStats->increase(1);
    +        doubleStats->update(doubleData[i]);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    doubleStats->setHasNull(hasNull);
    +  }
    +
    +  void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(dataStream->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t DoubleColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += dataStream->getSize();
    +    return size;
    +  }
    +
    +  void DoubleColumnWriter::getColumnEncoding(
    +                      std::vector<proto::ColumnEncoding>& encodings) const
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void DoubleColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    dataStream->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class StringColumnWriter : public ColumnWriter {
    +  public:
    +    StringColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> lengthEncoder;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    RleVersion rleVersion;
    +  };
    +
    +  StringColumnWriter::StringColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options) :
    +                              ColumnWriter(type, factory, options),
    +                              rleVersion(RleVersion_1) {
    +    std::unique_ptr<BufferedOutputStream> lengthStream =
    +        factory.createStream(proto::Stream_Kind_LENGTH);
    +    lengthEncoder = createRleEncoder(std::move(lengthStream),
    +                                     false,
    +                                     rleVersion,
    +                                     memPool);
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +        factory.createStream(proto::Stream_Kind_DATA)));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    const StringVectorBatch & stringBatch =
    +      dynamic_cast<const StringVectorBatch &>(rowBatch);
    +
    +    char *const * data = stringBatch.data.data() + offset;
    +    const int64_t* length = stringBatch.length.data() + offset;
    +    const char* notNull = stringBatch.hasNulls ?
    +                          stringBatch.notNull.data() + offset : nullptr;
    +
    +    lengthEncoder->add(length, numValues, notNull);
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        dataStream->write(data[i], static_cast<size_t>(length[i]));
    +      }
    +    }
    +
    +    StringColumnStatisticsImpl* strStats =
    +        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    --- End diff --
    
    Can this loop be fused with the above loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message