From commits-return-8674-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Wed Mar 25 18:10:20 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2B73C18065D for ; Wed, 25 Mar 2020 19:10:20 +0100 (CET) Received: (qmail 55905 invoked by uid 500); 25 Mar 2020 18:10:19 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 55896 invoked by uid 99); 25 Mar 2020 18:10:19 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Mar 2020 18:10:19 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id DC915819F7; Wed, 25 Mar 2020 18:10:18 +0000 (UTC) Date: Wed, 25 Mar 2020 18:10:18 +0000 To: "commits@kudu.apache.org" Subject: [kudu] branch master updated: wire_protocol: some simplification and optimization for rowwise encoding MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158515981866.27232.13672682903520455908@gitbox.apache.org> From: todd@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d42c1439390bbf99a74e8ec046539bbd730e146c X-Git-Newrev: 3ae02a92ccb99aa1a6b0c3d05deea75756aaa6e3 X-Git-Rev: 3ae02a92ccb99aa1a6b0c3d05deea75756aaa6e3 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git The following commit(s) were added to refs/heads/master by this push: new 3ae02a9 wire_protocol: some simplification and optimization for rowwise encoding 3ae02a9 is described below commit 3ae02a92ccb99aa1a6b0c3d05deea75756aaa6e3 Author: Todd Lipcon AuthorDate: Tue Mar 24 14:23:34 2020 -0700 wire_protocol: some simplification and optimization for rowwise encoding * Re-implement GetSelectedRows based on the new ForEachSetBit(...) utility, which operates word-by-word instead of byte-by-byte. * use a boolean return value which indicates the common case of "all rows are selected". Currently the rowwise serialization code path doesn't use this special value (and just reproduces the old std::iota() call to generate the sequence of all indexes), but the columnar code path will special case this as a memcpy. * Avoid one call to CountSelected() in SerializeRowBlock() by calculating num_rows from the size of the row index vector. * Change SerializeRowBlock() to return an int indicating the number of rows selected, instead of accumulating it into the protobuf. This value can then be re-used to eliminate one extra call to CountSelected in ScanResultCopier::HandleRowBlock(). After this change, the protobuf is no longer used by SerializeRowBlock, so I removed the parameter, which required a bit of fixup in the tests. Change-Id: I24dfb1bd036fde514ca6494bae0ddc171dd225dd Reviewed-on: http://gerrit.cloudera.org:8080/15550 Reviewed-by: Grant Henke Tested-by: Todd Lipcon --- src/kudu/common/rowblock-test.cc | 14 ++++++------ src/kudu/common/rowblock.cc | 42 +++++++++++++++++++---------------- src/kudu/common/rowblock.h | 8 ++++++- src/kudu/common/wire_protocol-test.cc | 26 ++++++++++------------ src/kudu/common/wire_protocol.cc | 35 ++++++++++++++++++++--------- src/kudu/common/wire_protocol.h | 12 +++++----- src/kudu/tserver/tablet_service.cc | 21 +++++++++--------- 7 files changed, 90 insertions(+), 68 deletions(-) diff --git a/src/kudu/common/rowblock-test.cc b/src/kudu/common/rowblock-test.cc index c2d5a84..f9e9aca 100644 --- a/src/kudu/common/rowblock-test.cc +++ b/src/kudu/common/rowblock-test.cc @@ -18,6 +18,7 @@ #include "kudu/common/rowblock.h" #include +#include #include #include @@ -59,28 +60,27 @@ TEST(TestSelectionVector, TestNonByteAligned) { ASSERT_EQ(sv.nrows(), sv.CountSelected()); ASSERT_TRUE(sv.AnySelected()); - vector sel; - sv.GetSelectedRows(&sel); - ASSERT_EQ(sv.nrows(), sel.size()); + vector sel; + ASSERT_FALSE(sv.GetSelectedRows(&sel)); for (size_t i = 0; i < sv.nrows(); i++) { sv.SetRowUnselected(i); } ASSERT_EQ(0, sv.CountSelected()); ASSERT_FALSE(sv.AnySelected()); - sv.GetSelectedRows(&sel); + ASSERT_TRUE(sv.GetSelectedRows(&sel)); ASSERT_EQ(0, sel.size()); } TEST(TestSelectionVector, TestGetSelectedRows) { - vector expected = {1, 4, 9, 10, 18}; + vector expected = {1, 4, 9, 10, 18}; SelectionVector sv(20); sv.SetAllFalse(); for (int i : expected) { sv.SetRowSelected(i); } - vector selected; - sv.GetSelectedRows(&selected); + vector selected; + ASSERT_TRUE(sv.GetSelectedRows(&selected)); ASSERT_EQ(expected, selected); } diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc index 7e8b22b..74d1964 100644 --- a/src/kudu/common/rowblock.cc +++ b/src/kudu/common/rowblock.cc @@ -16,7 +16,7 @@ // under the License. #include "kudu/common/rowblock.h" -#include +#include #include #include @@ -74,31 +74,35 @@ void SelectionVector::ClearToSelectAtMost(size_t max_rows) { } } -void SelectionVector::GetSelectedRows(vector* selected) const { + +// TODO(todd) this is a bit faster when implemented with target "bmi" enabled. +// Consider duplicating it and doing runtime switching. +static void GetSelectedRowsInternal(const uint8_t* __restrict__ bitmap, + int n_bytes, + uint16_t* __restrict__ dst) { + ForEachSetBit(bitmap, n_bytes * 8, + [&](int bit) { + *dst++ = bit; + }); +} + +bool SelectionVector::GetSelectedRows(vector* selected) const { + CHECK_LE(n_rows_, std::numeric_limits::max()); int n_selected = CountSelected(); - selected->resize(n_selected); - if (n_selected == 0) { - return; - } if (n_selected == n_rows_) { - std::iota(selected->begin(), selected->end(), 0); - return; + selected->clear(); + return false; } - const uint8_t* bitmap = &bitmap_[0]; - int* dst = selected->data(); - // Within each byte, keep flipping the least significant non-zero bit and adding - // the bit index to the output until none are set. - for (int i = 0; i < n_bytes_; i++) { - uint8_t bm = *bitmap++; - while (bm != 0) { - int bit = Bits::FindLSBSetNonZero(bm); - *dst++ = (i * 8) + bit; - bm ^= (1 << bit); - } + selected->resize(n_selected); + if (n_selected == 0) { + return true; } + GetSelectedRowsInternal(&bitmap_[0], n_bytes_, selected->data()); + return true; } + size_t SelectionVector::CountSelected() const { return Bits::Count(&bitmap_[0], n_bytes_); } diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h index 04fa419..cedc4a7 100644 --- a/src/kudu/common/rowblock.h +++ b/src/kudu/common/rowblock.h @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -27,6 +28,7 @@ #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" #include "kudu/gutil/strings/stringpiece.h" #include "kudu/util/bitmap.h" #include "kudu/util/status.h" @@ -102,7 +104,11 @@ class SelectionVector { // Sets '*selected' to the indices of all rows marked as selected // in this selection vector. - void GetSelectedRows(std::vector* selected) const; + // + // NOTE: in the case that all rows are selected, a fast path is triggered + // in which false is returned with an empty 'selected'. Otherwise, returns + // true. + bool GetSelectedRows(std::vector* selected) const WARN_UNUSED_RESULT; uint8_t *mutable_bitmap() { return &bitmap_[0]; diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index d85e947..4c9ac0d 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -44,7 +44,6 @@ #include "kudu/util/hash.pb.h" #include "kudu/util/hexdump.h" #include "kudu/util/memory/arena.h" -#include "kudu/util/pb_util.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" // IWYU pragma: keep @@ -157,14 +156,12 @@ class WireProtocolTest : public KuduTest, FillRowBlockForBenchmark(&block); SelectRandomRowsWithRate(&block, select_rate); - RowwiseRowBlockPB pb; faststring direct, indirect; int64_t cycle_start = CycleClock::Now(); for (int i = 0; i < kNumTrials; ++i) { - pb.Clear(); direct.clear(); indirect.clear(); - SerializeRowBlock(block, &pb, nullptr, &direct, &indirect); + SerializeRowBlock(block, nullptr, &direct, &indirect); } int64_t cycle_end = CycleClock::Now(); LOG(INFO) << Substitute( @@ -315,19 +312,20 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPB) { FillRowBlockWithTestRows(&block); // Convert to PB. - RowwiseRowBlockPB pb; faststring direct, indirect; - SerializeRowBlock(block, &pb, nullptr, &direct, &indirect); - SCOPED_TRACE(pb_util::SecureDebugString(pb)); + int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect); SCOPED_TRACE("Row data: " + direct.ToString()); SCOPED_TRACE("Indirect data: " + indirect.ToString()); // Convert back to a row, ensure that the resulting row is the same // as the one we put in. + RowwiseRowBlockPB pb; + pb.set_num_rows(num_rows); + vector row_ptrs; Slice direct_sidecar = direct; ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect, - &direct_sidecar, &row_ptrs)); + &direct_sidecar, &row_ptrs)); ASSERT_EQ(block.nrows(), row_ptrs.size()); for (int i = 0; i < block.nrows(); ++i) { ConstContiguousRow row_roundtripped(&schema_, row_ptrs[i]); @@ -375,16 +373,17 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) { ColumnSchema("col3", INT32, true /* nullable */)}, 0); // Convert to PB. - RowwiseRowBlockPB pb; faststring direct, indirect; - SerializeRowBlock(block, &pb, &proj_schema, &direct, &indirect, true /* pad timestamps */); - SCOPED_TRACE(pb_util::SecureDebugString(pb)); + int num_rows = SerializeRowBlock(block, &proj_schema, &direct, &indirect, + true /* pad timestamps */); SCOPED_TRACE("Row data: " + HexDump(direct)); SCOPED_TRACE("Indirect data: " + HexDump(indirect)); // Convert back to a row, ensure that the resulting row is the same // as the one we put in. Can't reuse the decoding methods since we // won't support decoding padded rows within Kudu. + RowwiseRowBlockPB pb; + pb.set_num_rows(num_rows); vector row_ptrs; Slice direct_sidecar = direct; Slice indirect_sidecar = indirect; @@ -476,10 +475,9 @@ TEST_F(WireProtocolTest, TestBlockWithNoColumns) { ASSERT_EQ(900, block.selection_vector()->CountSelected()); // Convert it to protobuf, ensure that the results look right. - RowwiseRowBlockPB pb; faststring direct, indirect; - SerializeRowBlock(block, &pb, nullptr, &direct, &indirect); - ASSERT_EQ(900, pb.num_rows()); + int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect); + ASSERT_EQ(900, num_rows); } TEST_F(WireProtocolTest, TestColumnDefaultValue) { diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc index 533de57..5b2bc3f 100644 --- a/src/kudu/common/wire_protocol.cc +++ b/src/kudu/common/wire_protocol.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -891,7 +892,7 @@ static void CopyColumn( const ColumnBlock& column_block, int dst_col_idx, uint8_t* __restrict__ dst_base, faststring* indirect_data, const Schema* dst_schema, size_t row_stride, size_t schema_byte_size, size_t column_offset, - const vector& row_idx_select) { + const vector& row_idx_select) { DCHECK(dst_schema); uint8_t* dst = dst_base + column_offset; size_t offset_to_non_null_bitmap = schema_byte_size - column_offset; @@ -927,13 +928,28 @@ static void CopyColumn( // Because we use a faststring here, ASAN tests become unbearably slow // with the extra verifications. ATTRIBUTE_NO_ADDRESS_SAFETY_ANALYSIS -void SerializeRowBlock(const RowBlock& block, - RowwiseRowBlockPB* rowblock_pb, - const Schema* projection_schema, - faststring* data_buf, - faststring* indirect_data, - bool pad_unixtime_micros_to_16_bytes) { +int SerializeRowBlock(const RowBlock& block, + const Schema* projection_schema, + faststring* data_buf, + faststring* indirect_data, + bool pad_unixtime_micros_to_16_bytes) { DCHECK_GT(block.nrows(), 0); + + vector selected_row_indexes; + bool all_selected = !block.selection_vector()->GetSelectedRows( + &selected_row_indexes); + if (all_selected) { + // TODO(todd): add a fast-path for this in the 'Copy' functions. + selected_row_indexes.resize(block.nrows()); + std::iota(selected_row_indexes.begin(), + selected_row_indexes.end(), 0); + } + size_t num_rows = selected_row_indexes.size(); + + // Fast-path empty blocks (eg because the predicate didn't match any rows or + // all rows in the block were deleted) + if (num_rows == 0) return 0; + const Schema* tablet_schema = block.schema(); if (projection_schema == nullptr) { @@ -958,7 +974,6 @@ void SerializeRowBlock(const RowBlock& block, size_t old_size = data_buf->size(); size_t row_stride = ContiguousRowHelper::row_size(*projection_schema) + total_padding; - size_t num_rows = block.selection_vector()->CountSelected(); size_t schema_byte_size = projection_schema->byte_size() + total_padding; size_t additional_size = row_stride * num_rows; @@ -971,8 +986,6 @@ void SerializeRowBlock(const RowBlock& block, memset(base, 0, additional_size); } - vector selected_row_indexes; - block.selection_vector()->GetSelectedRows(&selected_row_indexes); size_t t_schema_idx = 0; size_t padding_so_far = 0; for (int p_schema_idx = 0; p_schema_idx < projection_schema->num_columns(); p_schema_idx++) { @@ -1009,7 +1022,7 @@ void SerializeRowBlock(const RowBlock& block, padding_so_far += 8; } } - rowblock_pb->set_num_rows(rowblock_pb->num_rows() + num_rows); + return num_rows; } string StartTimeToString(const ServerRegistrationPB& reg) { diff --git a/src/kudu/common/wire_protocol.h b/src/kudu/common/wire_protocol.h index 6a88b3d..7ac76fd 100644 --- a/src/kudu/common/wire_protocol.h +++ b/src/kudu/common/wire_protocol.h @@ -158,7 +158,7 @@ Status ParseInt32Config(const std::string& name, const std::string& value, int32 Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb, google::protobuf::Map* configs); -// Encode the given row block into the provided protobuf and data buffers. +// Encode the given row block into the provided data buffers. // // All data (both direct and indirect) for each selected row in the RowBlock is // copied into the protobuf and faststrings. @@ -172,10 +172,12 @@ Status ExtraConfigPBToPBMap(const TableExtraConfigPB& pb, // schema will be padded to the right by 8 (zero'd) bytes for a total of 16 bytes. // // Requires that block.nrows() > 0 -void SerializeRowBlock(const RowBlock& block, RowwiseRowBlockPB* rowblock_pb, - const Schema* projection_schema, - faststring* data_buf, faststring* indirect_data, - bool pad_unixtime_micros_to_16_bytes = false); +// +// Returns the number of rows serialized. +int SerializeRowBlock(const RowBlock& block, + const Schema* projection_schema, + faststring* data_buf, faststring* indirect_data, + bool pad_unixtime_micros_to_16_bytes = false); // Rewrites the data pointed-to by row data slice 'row_data_slice' by replacing // relative indirect data pointers with absolute ones in 'indirect_data_slice'. diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index e3f861b..c69e0b3 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -91,7 +91,6 @@ #include "kudu/tserver/tserver_admin.pb.h" #include "kudu/tserver/tserver_service.pb.h" #include "kudu/util/auto_release_pool.h" -#include "kudu/util/bitset.h" #include "kudu/util/crc.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/faststring.h" @@ -758,16 +757,16 @@ class ScanResultCopier : public ScanResultCollector { pad_unixtime_micros_to_16_bytes_(false) {} void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override { - int64_t num_selected = row_block.selection_vector()->CountSelected(); - // Fast-path empty blocks (eg because the predicate didn't match any rows or - // all rows in the block were deleted) - if (num_selected == 0) return; - - num_rows_returned_ += num_selected; - scanner->add_num_rows_returned(num_selected); - SerializeRowBlock(row_block, rowblock_pb_, scanner->client_projection_schema(), - rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_); - SetLastRow(row_block, &last_primary_key_); + int num_selected = SerializeRowBlock( + row_block, scanner->client_projection_schema(), + rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_); + + if (num_selected > 0) { + rowblock_pb_->set_num_rows(rowblock_pb_->num_rows() + num_selected); + num_rows_returned_ += num_selected; + scanner->add_num_rows_returned(num_selected); + SetLastRow(row_block, &last_primary_key_); + } } // Returns number of bytes buffered to return.