orc-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From majetideepak <...@git.apache.org>
Subject [GitHub] orc pull request #149: ORC-224: Implement column writers of primitive types
Date Mon, 21 Aug 2017 19:31:25 GMT
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/149#discussion_r134308560
  
    --- Diff: c++/src/ColumnWriter.cc ---
    @@ -468,25 +472,1099 @@ namespace orc {
         rleEncoder->recordPosition(rowIndexPosition.get());
       }
     
    -  std::unique_ptr<ColumnWriter> buildWriter(
    -                                            const Type& type,
    -                                            const StreamsFactory& factory,
    -                                            const WriterOptions& options) {
    -    switch (static_cast<int64_t>(type.getKind())) {
    -      case STRUCT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new StructColumnWriter(
    -                                 type,
    -                                 factory,
    -                                 options));
    -      case INT:
    -      case LONG:
    -      case SHORT:
    -        return std::unique_ptr<ColumnWriter>(
    -          new IntegerColumnWriter(
    -                                  type,
    -                                  factory,
    -                                  options));
    +  class ByteColumnWriter : public ColumnWriter {
    +  public:
    +    ByteColumnWriter(const Type& type,
    +                     const StreamsFactory& factory,
    +                     const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +            std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> byteRleEncoder;
    +  };
    +
    +  ByteColumnWriter::ByteColumnWriter(
    +                        const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                             ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +                                  factory.createStream(proto::Stream_Kind_DATA);
    +    byteRleEncoder = createByteRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                             uint64_t offset,
    +                             uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch =
    +                               dynamic_cast<LongVectorBatch&>(rowBatch);
    +
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    byteRleEncoder->add(byteData, numValues, notNull);
    +
    +    IntegerColumnStatisticsImpl* intStats =
    +        dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        intStats->increase(1);
    +        intStats->update(static_cast<int64_t>(byteData[i]), 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    intStats->setHasNull(hasNull);
    +  }
    +
    +  void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(byteRleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t ByteColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += byteRleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void ByteColumnWriter::getColumnEncoding(
    +    std::vector<proto::ColumnEncoding>& encodings) const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void ByteColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    byteRleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class BooleanColumnWriter : public ColumnWriter {
    +  public:
    +    BooleanColumnWriter(const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    std::unique_ptr<ByteRleEncoder> rleEncoder;
    +  };
    +
    +  BooleanColumnWriter::BooleanColumnWriter(
    +                           const Type& type,
    +                           const StreamsFactory& factory,
    +                           const WriterOptions& options) :
    +                               ColumnWriter(type, factory, options) {
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +      factory.createStream(proto::Stream_Kind_DATA);
    +    rleEncoder = createBooleanRleEncoder(std::move(dataStream));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                uint64_t offset,
    +                                uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    LongVectorBatch& byteBatch = dynamic_cast<LongVectorBatch&>(rowBatch);
    +    int64_t* data = byteBatch.data.data() + offset;
    +    const char* notNull = byteBatch.hasNulls ?
    +                          byteBatch.notNull.data() + offset : nullptr;
    +
    +    char* byteData = reinterpret_cast<char*>(data);
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      byteData[i] = static_cast<char>(data[i]);
    +    }
    +    rleEncoder->add(byteData, numValues, notNull);
    +
    +    BooleanColumnStatisticsImpl* boolStats =
    +        dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        boolStats->increase(1);
    +        boolStats->update(byteData[i], 1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    boolStats->setHasNull(hasNull);
    +  }
    +
    +  void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(rleEncoder->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t BooleanColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += rleEncoder->getBufferSize();
    +    return size;
    +  }
    +
    +  void BooleanColumnWriter::getColumnEncoding(
    +                       std::vector<proto::ColumnEncoding>& encodings) const
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void BooleanColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    rleEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class DoubleColumnWriter : public ColumnWriter {
    +  public:
    +    DoubleColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options,
    +                       bool isFloat);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  private:
    +    bool isFloat;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    DataBuffer<char> buffer;
    +  };
    +
    +  DoubleColumnWriter::DoubleColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options,
    +                          bool isFloatType) :
    +                              ColumnWriter(type, factory, options),
    +                              isFloat(isFloatType),
    +                              buffer(*options.getMemoryPool()) {
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +                             factory.createStream(proto::Stream_Kind_DATA)));
    +    buffer.resize(isFloat ? 4 : 8);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    const DoubleVectorBatch& dblBatch =
    +                             dynamic_cast<const DoubleVectorBatch&>(rowBatch);
    +
    +    const double* doubleData = dblBatch.data.data() + offset;
    +    const char* notNull = dblBatch.hasNulls ?
    +                          dblBatch.notNull.data() + offset : nullptr;
    +
    +    size_t bytes = isFloat ? 4 : 8;
    +    char* data = buffer.data();
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        if (isFloat) {
    +          // to avoid float-double cast
    +          const int32_t* intBits =
    +            reinterpret_cast<const int32_t*>(&static_cast<const float&>(
    +              doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        } else {
    +          const int64_t* intBits =
    +            reinterpret_cast<const int64_t*>(&(doubleData[i]));
    +          for (size_t j = 0; j < bytes; ++j) {
    +            data[j] = static_cast<char>(((*intBits) >> (8 * j)) & 0xff);
    +          }
    +        }
    +        dataStream->write(data, bytes);
    +      }
    +    }
    +
    +    DoubleColumnStatisticsImpl* doubleStats =
    +        dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        doubleStats->increase(1);
    +        doubleStats->update(doubleData[i]);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    doubleStats->setHasNull(hasNull);
    +  }
    +
    +  void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream stream;
    +    stream.set_kind(proto::Stream_Kind_DATA);
    +    stream.set_column(static_cast<uint32_t>(columnId));
    +    stream.set_length(dataStream->flush());
    +    streams.push_back(stream);
    +  }
    +
    +  uint64_t DoubleColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += dataStream->getSize();
    +    return size;
    +  }
    +
    +  void DoubleColumnWriter::getColumnEncoding(
    +                      std::vector<proto::ColumnEncoding>& encodings) const
{
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void DoubleColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    dataStream->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class StringColumnWriter : public ColumnWriter {
    +  public:
    +    StringColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> lengthEncoder;
    +    std::unique_ptr<AppendOnlyBufferedStream> dataStream;
    +    RleVersion rleVersion;
    +  };
    +
    +  StringColumnWriter::StringColumnWriter(
    +                          const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options) :
    +                              ColumnWriter(type, factory, options),
    +                              rleVersion(RleVersion_1) {
    +    std::unique_ptr<BufferedOutputStream> lengthStream =
    +        factory.createStream(proto::Stream_Kind_LENGTH);
    +    lengthEncoder = createRleEncoder(std::move(lengthStream),
    +                                     false,
    +                                     rleVersion,
    +                                     memPool);
    +    dataStream.reset(new AppendOnlyBufferedStream(
    +        factory.createStream(proto::Stream_Kind_DATA)));
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    const StringVectorBatch & stringBatch =
    +      dynamic_cast<const StringVectorBatch &>(rowBatch);
    +
    +    char *const * data = stringBatch.data.data() + offset;
    +    const int64_t* length = stringBatch.length.data() + offset;
    +    const char* notNull = stringBatch.hasNulls ?
    +                          stringBatch.notNull.data() + offset : nullptr;
    +
    +    lengthEncoder->add(length, numValues, notNull);
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        dataStream->write(data[i], static_cast<size_t>(length[i]));
    +      }
    +    }
    +
    +    StringColumnStatisticsImpl* strStats =
    +        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        strStats->update(data[i],
    +                         static_cast<size_t>(length[i]));
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
    +    ColumnWriter::flush(streams);
    +
    +    proto::Stream length;
    +    length.set_kind(proto::Stream_Kind_LENGTH);
    +    length.set_column(static_cast<uint32_t>(columnId));
    +    length.set_length(lengthEncoder->flush());
    +    streams.push_back(length);
    +
    +    proto::Stream data;
    +    data.set_kind(proto::Stream_Kind_DATA);
    +    data.set_column(static_cast<uint32_t>(columnId));
    +    data.set_length(dataStream->flush());
    +    streams.push_back(data);
    +  }
    +
    +  uint64_t StringColumnWriter::getEstimatedSize() const {
    +    uint64_t size = ColumnWriter::getEstimatedSize();
    +    size += lengthEncoder->getBufferSize();
    +    size += dataStream->getSize();
    +    return size;
    +  }
    +
    +  void StringColumnWriter::getColumnEncoding(
    +    std::vector<proto::ColumnEncoding>& encodings) const {
    +    proto::ColumnEncoding encoding;
    +    encoding.set_kind(rleVersion == RleVersion_1 ?
    +                      proto::ColumnEncoding_Kind_DIRECT :
    +                      proto::ColumnEncoding_Kind_DIRECT_V2);
    +    encoding.set_dictionarysize(0);
    +    encodings.push_back(encoding);
    +  }
    +
    +  void StringColumnWriter::recordPosition() const {
    +    ColumnWriter::recordPosition();
    +    dataStream->recordPosition(rowIndexPosition.get());
    +    lengthEncoder->recordPosition(rowIndexPosition.get());
    +  }
    +
    +  class CharColumnWriter : public StringColumnWriter {
    +  public:
    +    CharColumnWriter(const Type& type,
    +                     const StreamsFactory& factory,
    +                     const WriterOptions& options) :
    +                         StringColumnWriter(type, factory, options),
    +                         fixedLength(type.getMaximumLength()),
    +                         padBuffer(*options.getMemoryPool(),
    +                                   type.getMaximumLength()) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +  private:
    +    uint64_t fixedLength;
    +    DataBuffer<char> padBuffer;
    +  };
    +
    +  void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                             uint64_t offset,
    +                             uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    StringVectorBatch& charsBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    +
    +    char** data = charsBatch.data.data() + offset;
    +    int64_t* length = charsBatch.length.data() + offset;
    +    const char* notNull = charsBatch.hasNulls ?
    +                          charsBatch.notNull.data() + offset : nullptr;
    +
    +    StringColumnStatisticsImpl* strStats =
    +        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        char *charData = data[i];
    +        uint64_t oriLength = static_cast<uint64_t>(length[i]);
    +        if (oriLength < fixedLength) {
    +          memcpy(padBuffer.data(), data[i], oriLength);
    +          memset(padBuffer.data() + oriLength, ' ', fixedLength - oriLength);
    +          charData = padBuffer.data();
    +        }
    +        length[i] = static_cast<int64_t>(fixedLength);
    +        dataStream->write(charData, fixedLength);
    +
    +        strStats->update(charData, fixedLength);
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  class VarCharColumnWriter : public StringColumnWriter {
    +  public:
    +    VarCharColumnWriter(const Type& type,
    +                        const StreamsFactory& factory,
    +                        const WriterOptions& options) :
    +                            StringColumnWriter(type, factory, options),
    +                            maxLength(type.getMaximumLength()) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +  private:
    +    uint64_t maxLength;
    +  };
    +
    +  void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                uint64_t offset,
    +                                uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +    StringVectorBatch& charsBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
    +
    +    char* const* data = charsBatch.data.data() + offset;
    +    int64_t* length = charsBatch.length.data() + offset;
    +    const char* notNull = charsBatch.hasNulls ?
    +                          charsBatch.notNull.data() + offset : nullptr;
    +
    +    StringColumnStatisticsImpl* strStats =
    +        dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
    +    bool hasNull = false;
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (!notNull || notNull[i]) {
    +        if (length[i] > static_cast<int64_t>(maxLength)) {
    +          length[i] = static_cast<int64_t>(maxLength);
    +        }
    +        dataStream->write(data[i], static_cast<size_t>(length[i]));
    +
    +        strStats->update(data[i], static_cast<size_t>(length[i]));
    +        strStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    strStats->setHasNull(hasNull);
    +  }
    +
    +  class BinaryColumnWriter : public StringColumnWriter {
    +  public:
    +    BinaryColumnWriter(const Type& type,
    +                       const StreamsFactory& factory,
    +                       const WriterOptions& options) :
    +                           StringColumnWriter(type, factory, options) {
    +      // PASS
    +    }
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +  };
    +
    +  void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                               uint64_t offset,
    +                               uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    StringVectorBatch & binBatch = dynamic_cast<StringVectorBatch &>(rowBatch);
    +    char** data = binBatch.data.data() + offset;
    +    int64_t* length = binBatch.length.data() + offset;
    +    const char* notNull = binBatch.hasNulls ?
    +                          binBatch.notNull.data() + offset : nullptr;
    +
    +    BinaryColumnStatisticsImpl* binStats =
    +        dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
    +
    +    bool hasNull = false;
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
    +      if (!notNull || notNull[i]) {
    +        dataStream->write(data[i], unsignedLength);
    +
    +        binStats->update(unsignedLength);
    +        binStats->increase(1);
    +      } else if (!hasNull) {
    +        hasNull = true;
    +      }
    +    }
    +    lengthEncoder->add(length, numValues, notNull);
    +    binStats->setHasNull(hasNull);
    +  }
    +
    +  class TimestampColumnWriter : public ColumnWriter {
    +  public:
    +    TimestampColumnWriter(const Type& type,
    +                          const StreamsFactory& factory,
    +                          const WriterOptions& options);
    +
    +    virtual void add(ColumnVectorBatch& rowBatch,
    +                     uint64_t offset,
    +                     uint64_t numValues) override;
    +
    +    virtual void flush(std::vector<proto::Stream>& streams) override;
    +
    +    virtual uint64_t getEstimatedSize() const override;
    +
    +    virtual void getColumnEncoding(
    +        std::vector<proto::ColumnEncoding>& encodings) const override;
    +
    +    virtual void recordPosition() const override;
    +
    +  protected:
    +    std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
    +
    +  private:
    +    RleVersion rleVersion;
    +    const Timezone& timezone;
    +  };
    +
    +  TimestampColumnWriter::TimestampColumnWriter(
    +                             const Type& type,
    +                             const StreamsFactory& factory,
    +                             const WriterOptions& options) :
    +                                 ColumnWriter(type, factory, options),
    +                                 rleVersion(RleVersion_1),
    +                                 timezone(getLocalTimezone()){
    +    std::unique_ptr<BufferedOutputStream> dataStream =
    +        factory.createStream(proto::Stream_Kind_DATA);
    +    std::unique_ptr<BufferedOutputStream> secondaryStream =
    +        factory.createStream(proto::Stream_Kind_SECONDARY);
    +    secRleEncoder = createRleEncoder(std::move(dataStream),
    +                                     true,
    +                                     rleVersion,
    +                                     memPool);
    +    nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
    +                                      false,
    +                                      rleVersion,
    +                                      memPool);
    +
    +    if (enableIndex) {
    +      recordPosition();
    +    }
    +  }
    +
    +  static int64_t formatNano(int64_t nanos) {
    +    if (nanos == 0) {
    +      return 0;
    +    } else if (nanos % 100 != 0) {
    +      return (nanos) << 3;
    +    } else {
    +      nanos /= 100;
    +      int64_t trailingZeros = 1;
    +      while (nanos % 10 == 0 && trailingZeros < 7) {
    +        nanos /= 10;
    +        trailingZeros += 1;
    +      }
    +      return (nanos) << 3 | trailingZeros;
    +    }
    +  }
    +
    +  void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
    +                                  uint64_t offset,
    +                                  uint64_t numValues) {
    +    ColumnWriter::add(rowBatch, offset, numValues);
    +
    +    TimestampVectorBatch& tsBatch =
    +      dynamic_cast<TimestampVectorBatch &>(rowBatch);
    +
    +    const char* notNull = tsBatch.hasNulls ?
    +                          tsBatch.notNull.data() + offset : nullptr;
    +    int64_t *secs = tsBatch.data.data() + offset;
    +    int64_t *nanos = tsBatch.nanoseconds.data() + offset;
    +
    +    TimestampColumnStatisticsImpl* tsStats =
    +        dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
    +    for (uint64_t i = 0; i < numValues; ++i) {
    +      if (notNull == nullptr || notNull[i]) {
    +        // TimestampVectorBatch already stores data in UTC
    +        int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
    +        tsStats->increase(1);
    +        tsStats->update(millsUTC);
    +      } else if (!tsStats->hasNull()) {
    +        tsStats->setHasNull(true);
    +      }
    +    }
    +
    +    for (uint64_t i = 0; i < numValues; ++i) {
    --- End diff --
    
    can this loop be fused with the above loop? Applicable to other Writers below as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message