Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B44DF200D3E for ; Thu, 16 Nov 2017 22:26:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B2C31160BF4; Thu, 16 Nov 2017 21:26:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0B6E41609EF for ; Thu, 16 Nov 2017 22:26:11 +0100 (CET) Received: (qmail 7888 invoked by uid 500); 16 Nov 2017 21:26:11 -0000 Mailing-List: contact commits-help@impala.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.apache.org Delivered-To: mailing list commits@impala.apache.org Received: (qmail 7879 invoked by uid 99); 16 Nov 2017 21:26:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Nov 2017 21:26:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3DACF1A2044 for ; Thu, 16 Nov 2017 21:26:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id G--H5O0soRXv for ; Thu, 16 Nov 2017 21:26:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 9230C5F568 for ; Thu, 16 Nov 2017 21:26:02 +0000 (UTC) Received: (qmail 6788 invoked by uid 99); 16 Nov 2017 21:26:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Nov 2017 21:26:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EDBFBF5EFE; Thu, 16 Nov 2017 21:26:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Date: Thu, 16 Nov 2017 21:26:01 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-impala git commit: IMPALA-4177, IMPALA-6039: batched bit reading and rle decoding archived-at: Thu, 16 Nov 2017 21:26:13 -0000 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index d5b0d01..3c83c23 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -49,6 +49,7 @@ using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::thrift; using namespace parquet; +using impala::RleBatchDecoder; using std::min; using impala::PARQUET_VERSION_NUMBER; @@ -119,15 +120,6 @@ string GetSchema(const FileMetaData& md) { return ss.str(); } -// Inherit from RleDecoder to get access to repeat_count_, which is protected. -class ParquetLevelReader : public impala::RleDecoder { - public: - ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) - : RleDecoder(buffer, buffer_len, bit_width) {} - - uint32_t repeat_count() const { return repeat_count_; } -}; - // Performs sanity checking on the contents of data pages, to ensure that: // - Compressed pages can be uncompressed successfully. // - The number of def levels matches num_values in the page header when using RLE. @@ -163,18 +155,19 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_ // Parquet data pages always start with the encoded definition level data, and // RLE sections in Parquet always start with a 4 byte length followed by the data. int num_def_level_bytes = *reinterpret_cast(data); - ParquetLevelReader def_levels(const_cast(data) + sizeof(int32_t), + RleBatchDecoder def_levels(const_cast(data) + sizeof(int32_t), num_def_level_bytes, sizeof(uint8_t)); uint8_t level; for (int i = 0; i < header.data_page_header.num_values; ++i) { - if (!def_levels.Get(&level)) { + if (!def_levels.GetSingleValue(&level)) { cerr << "Error: Decoding of def levels failed.\n"; exit(1); } - if (i + def_levels.repeat_count() + 1 > header.data_page_header.num_values) { - cerr << "Error: More def levels encoded (" << (i + def_levels.repeat_count() + 1) - << ") than num_values (" << header.data_page_header.num_values << ").\n"; + if (i + def_levels.NextNumRepeats() + 1 > header.data_page_header.num_values) { + cerr << "Error: More def levels encoded (" + << (i + def_levels.NextNumRepeats() + 1) << ") than num_values (" + << header.data_page_header.num_values << ").\n"; exit(1); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 2e2dd7f..861bf8e 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -76,49 +76,116 @@ namespace impala { /// (total 26 bytes, 1 byte overhead) // -/// Decoder class for RLE encoded data. -class RleDecoder { +/// RLE decoder with a batch-oriented interface that enables fast decoding. +/// Users of this class must first initialize the class to point to a buffer of +/// RLE-encoded data, passed into the constructor or Reset(). Then they can +/// decode data by checking NextNumRepeats()/NextNumLiterals() to see if the +/// next run is a repeated or literal run, then calling GetRepeatedValue() +/// or GetLiteralValues() respectively to read the values. +/// +/// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. +/// Other decoding errors are signalled by functions returning false. If an +/// error is encountered then it is not valid to read any more data until +/// Reset() is called. +template +class RleBatchDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - DCHECK_GE(bit_width_, 0); - DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH); + RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { + Reset(buffer, buffer_len, bit_width); } + RleBatchDecoder() : bit_width_(-1) {} + + /// Reset the decoder to read from a new buffer. + void Reset(uint8_t* buffer, int buffer_len, int bit_width); + + /// Return the size of the current repeated run. Returns zero if the current run is + /// a literal run or if no more runs can be read from the input. + int32_t NextNumRepeats(); + + /// Get the value of the current repeated run and consume the given number of repeats. + /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot + /// be greater than the remaining number of repeats in the run. + T GetRepeatedValue(int32_t num_repeats_to_consume); + + /// Return the size of the current literal run. Returns zero if the current run is + /// a repeated run or if no more runs can be read from the input. + int32_t NextNumLiterals(); + + /// Consume 'num_literals_to_consume' literals from the current literal run, + /// copying the values to 'values'. 'num_literals_to_consume' must be <= + /// NextNumLiterals(). Returns true if the requested number of literals were + /// successfully read or false if an error was encountered, e.g. the input was + /// truncated. + bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; + + /// Consume 'num_literals_to_consume' literals from the current literal run, + /// decoding them using 'dict' and outputting them to 'values'. + /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if + /// the requested number of literals were successfully read or false if an error + /// was encountered, e.g. the input was truncated or the value was not present + /// in the dictionary. Errors can only be recovered from by calling Reset() + /// to read from a new buffer. + template + bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict, + int64_t dict_len, OutType* values) WARN_UNUSED_RESULT; + + /// Convenience method to get the next value. Not efficient. Returns true on success + /// or false if no more values can be read from the input or an error was encountered + /// decoding the values. + bool GetSingleValue(T* val) WARN_UNUSED_RESULT; - RleDecoder() : bit_width_(-1) {} + private: + BatchedBitReader bit_reader_; - void Reset(uint8_t* buffer, int buffer_len, int bit_width) { - DCHECK_GE(bit_width, 0); - DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; - } + /// Number of bits needed to encode the value. Must be between 0 and 64. + int bit_width_; - /// Gets the next value. Returns false if there are no more. - template - bool Get(T* val); + /// If a repeated run, the number of repeats remaining in the current run to be read. + /// If the current run is a literal run, this is 0. + int32_t repeat_count_; - protected: - /// Fills literal_count_ and repeat_count_ with next values. Returns false if there - /// are no more. - template - bool NextCounts(); + /// If a literal run, the number of literals remaining in the current run to be read. + /// If the current run is a repeated run, this is 0. + int32_t literal_count_; - BitReader bit_reader_; - /// Number of bits needed to encode the value. Must be between 0 and 64. - int bit_width_; - uint64_t current_value_; - uint32_t repeat_count_; - uint32_t literal_count_; + /// If a repeated run, the current repeated value. + T repeated_value_; + + /// Size of buffer for literal values. Large enough to decode a full batch of 32 + /// literals. The buffer is needed to allow clients to read in batches that are not + /// multiples of 32. + static constexpr int LITERAL_BUFFER_LEN = 32; + + /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the + /// position of the next literal to be read from the buffer. + T literal_buffer_[LITERAL_BUFFER_LEN]; + int num_buffered_literals_; + int literal_buffer_pos_; + + /// Called when both 'literal_count_' and 'repeat_count_' have been exhausted. + /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal + /// or repeated run, or leaves both at 0 if no more values can be read (either because + /// the end of the input was reached or an error was encountered decoding). + void NextCounts(); + + /// Fill the literal buffer. Invalid to call if there are already buffered literals. + /// Return false if the input was truncated. This does not advance 'literal_count_'. + bool FillLiteralBuffer() WARN_UNUSED_RESULT; + + bool HaveBufferedLiterals() const { + return literal_buffer_pos_ < num_buffered_literals_; + } + + /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing + /// 'literal_count_'. Returns the number of literals outputted. + int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); + + /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing + /// 'literal_count_'. Returns the number of literals outputted or 0 if a + /// decoding error is encountered. + template + int32_t DecodeBufferedLiterals( + int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values); }; /// Class to incrementally build the rle data. This class does not allocate any memory. @@ -153,7 +220,8 @@ class RleEncoder { int max_literal_run_size = 1 + BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8); /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value. - int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8); + int max_repeated_run_size = + BatchedBitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8); return std::max(max_literal_run_size, max_repeated_run_size); } @@ -167,7 +235,7 @@ class RleEncoder { /// Encode value. Returns true if the value fits in buffer, false otherwise. /// This value must be representable with bit_width_ bits. - bool Put(uint64_t value); + bool Put(uint64_t value) WARN_UNUSED_RESULT; /// Flushes any pending values to the underlying buffer. /// Returns the total number of bytes written @@ -245,53 +313,6 @@ class RleEncoder { uint8_t* literal_indicator_byte_; }; -// Force inlining - this is used in perf-critical loops in Parquet and GCC often -// doesn't inline it in cases where it's beneficial. -template -ALWAYS_INLINE inline bool RleDecoder::Get(T* val) { - DCHECK_GE(bit_width_, 0); - // Profiling has shown that the quality and performance of the generated code is very - // sensitive to the exact shape of this check. For example, the version below performs - // significantly better than UNLIKELY(literal_count_ == 0 && repeat_count_ == 0) - if (repeat_count_ == 0) { - if (literal_count_ == 0) { - if (!NextCounts()) return false; - } - } - - if (LIKELY(repeat_count_ > 0)) { - *val = current_value_; - --repeat_count_; - } else { - DCHECK_GT(literal_count_, 0); - if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false; - --literal_count_; - } - - return true; -} - -template -bool RleDecoder::NextCounts() { - // Read the next run's indicator int, it could be a literal or repeated run. - // The int is encoded as a vlq-encoded value. - int32_t indicator_value = 0; - if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false; - - // lsb indicates if it is a literal run or repeated run - bool is_literal = indicator_value & 1; - if (is_literal) { - literal_count_ = (indicator_value >> 1) * 8; - if (UNLIKELY(literal_count_ == 0)) return false; - } else { - repeat_count_ = indicator_value >> 1; - bool result = bit_reader_.GetAligned( - BitUtil::Ceil(bit_width_, 8), reinterpret_cast(¤t_value_)); - if (UNLIKELY(!result || repeat_count_ == 0)) return false; - } - return true; -} - /// This function buffers input values 8 at a time. After seeing all 8 values, /// it decides whether they should be encoded as a literal or repeated run. inline bool RleEncoder::Put(uint64_t value) { @@ -444,5 +465,197 @@ inline void RleEncoder::Clear() { bit_writer_.Clear(); } +template +inline void RleBatchDecoder::Reset(uint8_t* buffer, int buffer_len, int bit_width) { + DCHECK_GE(bit_width, 0); + DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH); + bit_reader_.Reset(buffer, buffer_len); + bit_width_ = bit_width; + repeat_count_ = 0; + literal_count_ = 0; + num_buffered_literals_ = 0; + literal_buffer_pos_ = 0; +} + +template +inline int32_t RleBatchDecoder::NextNumRepeats() { + if (repeat_count_ > 0) return repeat_count_; + if (literal_count_ == 0) NextCounts(); + return repeat_count_; +} + +template +inline T RleBatchDecoder::GetRepeatedValue(int32_t num_repeats_to_consume) { + DCHECK_GT(num_repeats_to_consume, 0); + DCHECK_GE(repeat_count_, num_repeats_to_consume); + repeat_count_ -= num_repeats_to_consume; + return repeated_value_; +} + +template +inline int32_t RleBatchDecoder::NextNumLiterals() { + if (literal_count_ > 0) return literal_count_; + if (repeat_count_ == 0) NextCounts(); + return literal_count_; } + +template +inline bool RleBatchDecoder::GetLiteralValues( + int32_t num_literals_to_consume, T* values) { + DCHECK_GE(num_literals_to_consume, 0); + DCHECK_GE(literal_count_, num_literals_to_consume); + int32_t num_consumed = 0; + // Copy any buffered literals left over from previous calls. + if (HaveBufferedLiterals()) { + num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); + } + + int32_t num_remaining = num_literals_to_consume - num_consumed; + // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. + // Need to round to a batch of 32 if the caller is consuming only part of the current + // run avoid ending on a non-byte boundary. + int32_t num_to_bypass = std::min(literal_count_, + BitUtil::RoundDownToPowerOf2(num_remaining, 32)); + if (num_to_bypass > 0) { + int num_read = + bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); + // If we couldn't read the expected number, that means the input was truncated. + if (num_read < num_to_bypass) return false; + literal_count_ -= num_to_bypass; + num_consumed += num_to_bypass; + num_remaining = num_literals_to_consume - num_consumed; + } + + if (num_remaining > 0) { + // We weren't able to copy all the literals requested directly from the input. + // Buffer literals and copy over the requested number. + if (UNLIKELY(!FillLiteralBuffer())) return false; + int32_t num_copied = OutputBufferedLiterals(num_remaining, values + num_consumed); + DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals"; + } + return true; +} + +template +template +inline bool RleBatchDecoder::DecodeLiteralValues( + int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* values) { + DCHECK_GE(num_literals_to_consume, 0); + DCHECK_GE(literal_count_, num_literals_to_consume); + int32_t num_consumed = 0; + // Decode any buffered literals left over from previous calls. + if (HaveBufferedLiterals()) { + num_consumed = + DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values); + if (UNLIKELY(num_consumed == 0)) return false; + } + + int32_t num_remaining = num_literals_to_consume - num_consumed; + // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. + // Need to round to a batch of 32 if the caller is consuming only part of the current + // run avoid ending on a non-byte boundery. + int32_t num_to_bypass = + std::min(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); + if (num_to_bypass > 0) { + int num_read = bit_reader_.UnpackAndDecodeBatch( + bit_width_, dict, dict_len, num_to_bypass, values + num_consumed); + // If we couldn't read the expected number, that means the input was truncated. + if (num_read < num_to_bypass) return false; + literal_count_ -= num_to_bypass; + num_consumed += num_to_bypass; + num_remaining = num_literals_to_consume - num_consumed; + } + + if (num_remaining > 0) { + // We weren't able to copy all the literals requested directly from the input. + // Buffer literals and copy over the requested number. + if (UNLIKELY(!FillLiteralBuffer())) return false; + int32_t num_copied = + DecodeBufferedLiterals(num_remaining, dict, dict_len, values + num_consumed); + if (UNLIKELY(num_copied == 0)) return false; + DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals"; + } + return true; +} + +template +inline bool RleBatchDecoder::GetSingleValue(T* val) { + if (NextNumRepeats() > 0) { + DCHECK_EQ(0, NextNumLiterals()); + *val = GetRepeatedValue(1); + return true; + } + if (NextNumLiterals() > 0) { + DCHECK_EQ(0, NextNumRepeats()); + return GetLiteralValues(1, val); + } + return false; +} + +template +inline void RleBatchDecoder::NextCounts() { + DCHECK_EQ(0, literal_count_); + DCHECK_EQ(0, repeat_count_); + // Read the next run's indicator int, it could be a literal or repeated run. + // The int is encoded as a vlq-encoded value. + int32_t indicator_value = 0; + if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return; + + // lsb indicates if it is a literal run or repeated run + bool is_literal = indicator_value & 1; + if (is_literal) { + literal_count_ = (indicator_value >> 1) * 8; + } else { + int32_t repeat_count = indicator_value >> 1; + if (UNLIKELY(repeat_count == 0)) return; + bool result = + bit_reader_.GetBytes(BitUtil::Ceil(bit_width_, 8), &repeated_value_); + if (UNLIKELY(!result)) return; + repeat_count_ = repeat_count; + } +} + +template +inline bool RleBatchDecoder::FillLiteralBuffer() { + DCHECK(!HaveBufferedLiterals()); + int32_t num_to_buffer = std::min(LITERAL_BUFFER_LEN, literal_count_); + num_buffered_literals_ = + bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); + // If we couldn't read the expected number, that means the input was truncated. + if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; + literal_buffer_pos_ = 0; + return true; +} + +template +inline int32_t RleBatchDecoder::OutputBufferedLiterals( + int32_t max_to_output, T* values) { + int32_t num_to_output = + std::min(max_to_output, num_buffered_literals_ - literal_buffer_pos_); + memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); + literal_buffer_pos_ += num_to_output; + literal_count_ -= num_to_output; + return num_to_output; +} + +template +template +inline int32_t RleBatchDecoder::DecodeBufferedLiterals( + int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) { + int32_t num_to_output = + std::min(max_to_output, num_buffered_literals_ - literal_buffer_pos_); + for (int32_t i = 0; i < num_to_output; ++i) { + T idx = literal_buffer_[literal_buffer_pos_ + i]; + if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0; + memcpy(&values[i], &dict[idx], sizeof(OutType)); + } + literal_buffer_pos_ += num_to_output; + literal_count_ -= num_to_output; + return num_to_output; +} + +template +constexpr int RleBatchDecoder::LITERAL_BUFFER_LEN; +} + #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index 0501d71..b8a1d94 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -30,7 +30,7 @@ namespace impala { -const int MAX_WIDTH = BitReader::MAX_BITWIDTH; +const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH; TEST(BitArray, TestBool) { const int len = 8; @@ -69,29 +69,24 @@ TEST(BitArray, TestBool) { EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); // Use the reader and validate - BitReader reader(buffer, len); + BatchedBitReader reader(buffer, len); + // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { - for (int i = 0; i < 8; ++i) { - bool val = false; - bool result = reader.GetValue(1, &val); - EXPECT_TRUE(result); - EXPECT_EQ(val, i % 2); - } + bool batch_vals[16]; + EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals)); + for (int i = 0; i < 8; ++i) EXPECT_EQ(batch_vals[i], i % 2); for (int i = 0; i < 8; ++i) { - bool val = false; - bool result = reader.GetValue(1, &val); - EXPECT_TRUE(result); switch (i) { case 0: case 1: case 4: case 5: - EXPECT_EQ(val, false); + EXPECT_EQ(batch_vals[8 + i], false); break; default: - EXPECT_EQ(val, true); + EXPECT_EQ(batch_vals[8 + i], true); break; } } @@ -113,17 +108,30 @@ void TestBitArrayValues(int bit_width, int num_vals) { writer.Flush(); EXPECT_EQ(writer.bytes_written(), len); - BitReader reader(buffer, len); + BatchedBitReader reader(buffer, len); + BatchedBitReader reader2(reader); // Test copy constructor. // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { + // Unpack all values at once with one batched reader and in small batches with the + // other batched reader. + vector batch_vals(num_vals); + const int BATCH_SIZE = 32; + vector batch_vals2(BATCH_SIZE); + EXPECT_EQ(num_vals, + reader.UnpackBatch(bit_width, num_vals, batch_vals.data())); for (int i = 0; i < num_vals; ++i) { - int64_t val; - bool result = reader.GetValue(bit_width, &val); - EXPECT_TRUE(result); - EXPECT_EQ(val, i % mod); + if (i % BATCH_SIZE == 0) { + int num_to_unpack = min(BATCH_SIZE, num_vals - i); + EXPECT_EQ(num_to_unpack, + reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data())); + } + EXPECT_EQ(i % mod, batch_vals[i]); + EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]); } EXPECT_EQ(reader.bytes_left(), 0); + EXPECT_EQ(reader2.bytes_left(), 0); reader.Reset(buffer, len); + reader2.Reset(buffer, len); } } @@ -137,45 +145,32 @@ TEST(BitArray, TestValues) { } } -// Test some mixed values -TEST(BitArray, TestMixed) { - const int len = 1024; - uint8_t buffer[len]; - bool parity = true; - - BitWriter writer(buffer, len); - for (int i = 0; i < len; ++i) { - bool result; - if (i % 2 == 0) { - result = writer.PutValue(parity, 1); - parity = !parity; - } else { - result = writer.PutValue(i, 10); - } - EXPECT_TRUE(result); - } - writer.Flush(); - - parity = true; - BitReader reader(buffer, len); - // Ensure it returns the same results after Reset(). - for (int trial = 0; trial < 2; ++trial) { - for (int i = 0; i < len; ++i) { - bool result; - if (i % 2 == 0) { - bool val; - result = reader.GetValue(1, &val); - EXPECT_EQ(val, parity); - parity = !parity; - } else { - int val; - result = reader.GetValue(10, &val); - EXPECT_EQ(val, i); +/// Get many values from a batch RLE decoder. +template +static bool GetRleValues(RleBatchDecoder* decoder, int num_vals, T* vals) { + int decoded = 0; + // Decode repeated and literal runs until we've filled the output. + while (decoded < num_vals) { + if (decoder->NextNumRepeats() > 0) { + EXPECT_EQ(0, decoder->NextNumLiterals()); + int num_repeats_to_output = + min(decoder->NextNumRepeats(), num_vals - decoded); + T repeated_val = decoder->GetRepeatedValue(num_repeats_to_output); + for (int i = 0; i < num_repeats_to_output; ++i) { + *vals = repeated_val; + ++vals; } - EXPECT_TRUE(result); + decoded += num_repeats_to_output; + continue; } - reader.Reset(buffer, len); + int num_literals_to_output = + min(decoder->NextNumLiterals(), num_vals - decoded); + if (num_literals_to_output == 0) return false; + if (!decoder->GetLiteralValues(num_literals_to_output, vals)) return false; + decoded += num_literals_to_output; + vals += num_literals_to_output; } + return true; } // Validates encoding of values by encoding and decoding them. If @@ -203,16 +198,32 @@ void ValidateRle(const vector& values, int bit_width, } // Verify read - RleDecoder decoder(buffer, len, bit_width); + RleBatchDecoder decoder(buffer, len, bit_width); + RleBatchDecoder decoder2(buffer, len, bit_width); // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { for (int i = 0; i < values.size(); ++i) { uint64_t val; - bool result = decoder.Get(&val); - EXPECT_TRUE(result); - EXPECT_EQ(values[i], val); + EXPECT_TRUE(decoder.GetSingleValue(&val)); + EXPECT_EQ(values[i], val) << i; + } + // Unpack everything at once from the second batch decoder. + vector decoded_values(values.size()); + EXPECT_TRUE(GetRleValues(&decoder2, values.size(), decoded_values.data())); + for (int i = 0; i < values.size(); ++i) { + EXPECT_EQ(values[i], decoded_values[i]) << i; } decoder.Reset(buffer, len, bit_width); + decoder2.Reset(buffer, len, bit_width); + } +} + +/// Basic test case for literal unpacking - two literals in a run. +TEST(Rle, TwoLiteralRun) { + vector values{1, 0}; + ValidateRle(values, 1, nullptr, -1); + for (int width = 1; width <= MAX_WIDTH; ++width) { + ValidateRle(values, width, nullptr, -1); } } @@ -287,16 +298,22 @@ TEST(Rle, BitWidthZeroRepeated) { uint8_t buffer[1]; const int num_values = 15; buffer[0] = num_values << 1; // repeated indicator byte - RleDecoder decoder(buffer, sizeof(buffer), 0); + RleBatchDecoder decoder(buffer, sizeof(buffer), 0); // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { uint8_t val; for (int i = 0; i < num_values; ++i) { - bool result = decoder.Get(&val); - EXPECT_TRUE(result); - EXPECT_EQ(val, 0); // can only encode 0s with bit width 0 + EXPECT_TRUE(decoder.GetSingleValue(&val)); + EXPECT_EQ(val, 0); } - EXPECT_FALSE(decoder.Get(&val)); + EXPECT_FALSE(decoder.GetSingleValue(&val)); + + // Test decoding all values in a batch. + decoder.Reset(buffer, sizeof(buffer), 0); + uint8_t decoded_values[num_values]; + EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values)); + for (int i = 0; i < num_values; i++) EXPECT_EQ(0, decoded_values[i]) << i; + EXPECT_FALSE(decoder.GetSingleValue(&val)); decoder.Reset(buffer, sizeof(buffer), 0); } } @@ -305,17 +322,23 @@ TEST(Rle, BitWidthZeroLiteral) { uint8_t buffer[1]; const int num_groups = 4; buffer[0] = num_groups << 1 | 1; // literal indicator byte - RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0); + RleBatchDecoder decoder(buffer, sizeof(buffer), 0); // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { const int num_values = num_groups * 8; uint8_t val; for (int i = 0; i < num_values; ++i) { - bool result = decoder.Get(&val); - EXPECT_TRUE(result); + EXPECT_TRUE(decoder.GetSingleValue(&val)); EXPECT_EQ(val, 0); // can only encode 0s with bit width 0 } - EXPECT_FALSE(decoder.Get(&val)); + + // Test decoding the whole batch at once. + decoder.Reset(buffer, sizeof(buffer), 0); + uint8_t decoded_values[num_values]; + EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values)); + for (int i = 0; i < num_values; ++i) EXPECT_EQ(0, decoded_values[i]); + + EXPECT_FALSE(GetRleValues(&decoder, 1, decoded_values)); decoder.Reset(buffer, sizeof(buffer), 0); } } @@ -402,20 +425,25 @@ TEST(BitRle, Overflow) { EXPECT_LE(bytes_written, len); EXPECT_GT(num_added, 0); - RleDecoder decoder(buffer, bytes_written, bit_width); + RleBatchDecoder decoder(buffer, bytes_written, bit_width); // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { parity = true; uint32_t v; for (int i = 0; i < num_added; ++i) { - bool result = decoder.Get(&v); - EXPECT_TRUE(result); + EXPECT_TRUE(decoder.GetSingleValue(&v)); EXPECT_EQ(v, parity); parity = !parity; } // Make sure we get false when reading past end a couple times. - EXPECT_FALSE(decoder.Get(&v)); - EXPECT_FALSE(decoder.Get(&v)); + EXPECT_FALSE(decoder.GetSingleValue(&v)); + EXPECT_FALSE(decoder.GetSingleValue(&v)); + + decoder.Reset(buffer, bytes_written, bit_width); + uint32_t decoded_values[num_added]; + EXPECT_TRUE(GetRleValues(&decoder, num_added, decoded_values)); + for (int i = 0; i < num_added; ++i) EXPECT_EQ(i % 2 == 0, decoded_values[i]) << i; + decoder.Reset(buffer, bytes_written, bit_width); } } @@ -426,21 +454,20 @@ TEST(BitRle, Overflow) { TEST(Rle, ZeroLiteralOrRepeatCount) { const int len = 1024; uint8_t buffer[len]; - RleDecoder decoder(buffer, len, 0); - uint64_t val; - + RleBatchDecoder decoder(buffer, len, 0); // Test the RLE repeated values path. memset(buffer, 0, len); for (int i = 0; i < 10; ++i) { - bool result = decoder.Get(&val); - EXPECT_FALSE(result); + EXPECT_EQ(0, decoder.NextNumLiterals()); + EXPECT_EQ(0, decoder.NextNumRepeats()); } // Test the RLE literal values path memset(buffer, 1, len); + decoder.Reset(buffer, len, 0); for (int i = 0; i < 10; ++i) { - bool result = decoder.Get(&val); - EXPECT_FALSE(result); + EXPECT_EQ(0, decoder.NextNumLiterals()); + EXPECT_EQ(0, decoder.NextNumRepeats()); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/README ---------------------------------------------------------------------- diff --git a/testdata/data/README b/testdata/data/README index e1dc496..8b9db3a 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -123,3 +123,12 @@ Generated using parquet-mr and contents verified using parquet-tools-1.9.1. Contains decimals stored as variable sized BYTE_ARRAY with both dictionary and non-dictionary encoding respectively. +alltypes_agg_bitpacked_def_levels.parquet: +Generated by hacking Impala's Parquet writer to write out bitpacked def levels instead +of the standard RLE-encoded levels. See +https://github.com/timarmstrong/incubator-impala/tree/hack-bit-packed-levels. This +is a single file containing all of the alltypesagg data, which includes a mix of +null and non-null values. This is not actually a valid Parquet file because the +bit-packed levels are written in the reverse order specified in the Parquet spec +for BIT_PACKED. However, this is the order that Impala attempts to read the levels +in - see IMPALA-3006. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet ---------------------------------------------------------------------- diff --git a/testdata/data/alltypes_agg_bitpacked_def_levels.parquet b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet new file mode 100644 index 0000000..bd2d9d8 Binary files /dev/null and b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet differ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test new file mode 100644 index 0000000..d48c333 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test @@ -0,0 +1,52 @@ +==== +---- QUERY +# Verify that total counts of non-null values are correct. +select count(id), count(tinyint_col), count(smallint_col), count(int_col), + count(bigint_col), count(float_col), count(double_col), count(date_string_col), + count(string_col), count(timestamp_col), count(year), count(month), count(day) +from alltypesagg +---- TYPES +BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT +---- RESULTS +11000,9000,10800,10980,10980,10980,10980,11000,11000,11000,11000,11000,10000 +==== +---- QUERY +# Spot-check a subset of values. +select * +from alltypesagg +where year = 2010 and month = 1 and int_col is null or int_col % 1000 = 77 +order by id, year, month, day +---- TYPES +INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT,INT +---- RESULTS +0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,1 +0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,NULL +77,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/01/10','77',2010-01-01 01:17:29.260000000,2010,1,1 +1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,2 +1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,NULL +1077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/02/10','77',2010-01-02 01:17:29.260000000,2010,1,2 +2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,3 +2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,NULL +2077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/03/10','77',2010-01-03 01:17:29.260000000,2010,1,3 +3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,4 +3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,NULL +3077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/04/10','77',2010-01-04 01:17:29.260000000,2010,1,4 +4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,5 +4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,NULL +4077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/05/10','77',2010-01-05 01:17:29.260000000,2010,1,5 +5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,6 +5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,NULL +5077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/06/10','77',2010-01-06 01:17:29.260000000,2010,1,6 +6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,7 +6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,NULL +6077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/07/10','77',2010-01-07 01:17:29.260000000,2010,1,7 +7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,8 +7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,NULL +7077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/08/10','77',2010-01-08 01:17:29.260000000,2010,1,8 +8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,9 +8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,NULL +8077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/09/10','77',2010-01-09 01:17:29.260000000,2010,1,9 +9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,10 +9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,NULL +9077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/10/10','77',2010-01-10 01:17:29.260000000,2010,1,10 +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 17b9503..fe0577a 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -388,6 +388,22 @@ class TestParquet(ImpalaTestSuite): self.run_test_case('QueryTest/parquet-corrupt-rle-counts-abort', vector, unique_database) + def test_bitpacked_def_levels(self, vector, unique_database): + """Test that Impala can read a Parquet file with the deprecated bit-packed def + level encoding.""" + self.client.execute(("""CREATE TABLE {0}.alltypesagg ( + id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, + int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, + date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP, + year INT, month INT, day INT) STORED AS PARQUET""").format(unique_database)) + alltypesagg_loc = get_fs_path( + "/test-warehouse/{0}.db/{1}".format(unique_database, "alltypesagg")) + check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] + + "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", alltypesagg_loc]) + self.client.execute("refresh {0}.alltypesagg".format(unique_database)); + + self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database) + @SkipIfS3.hdfs_block_size @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size